[ 
https://issues.apache.org/jira/browse/FLINK-35738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864324#comment-17864324
 ] 

Samrat Deb commented on FLINK-35738:
------------------------------------

Hi i have verified FLINK-26050, Steps done to validate the issue is properly 
fixed 

 

*Step 1:* Added the following configuration in 
[{{conf/flink-conf.yaml}}|https://nightlies.apache.org/flink/flink-docs-stable/deployment/config.html]:

{{state.backend: rocksdb}}

{{state.backend.incremental: true}}

{{state.checkpoints.dir: <local_path>}}

{{}}

*Step 2:* submit a flink job  defined in 
[FLINK-26050](https://issues.apache.org/jira/browse/FLINK-26050)

 

After running for long time following logs and state of rocksdb is obtained 
 
{code:java}
2024-07-09 22:40:01,271 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - 
Using job/cluster config to configure application-defined state backend: 
RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 
'file:/Users/samrat/Code/oss/flink-verify/checkpoints', savepoints: 'null, 
fileStateThreshold: -1), 
localRocksDbDirectories=[/Users/samrat/Code/oss/flink-verify/flink/rocksdb_local_db],
 enableIncrementalCheckpointing=true, numberOfTransferThreads=4, 
writeBatchSize=2097152} 2024-07-09 22:40:01,271 INFO 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using 
predefined options: DEFAULT. 2024-07-09 22:40:01,272 INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Using application-defined 
state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend 
(checkpoints: 'file:/Users/samrat/Code/oss/flink-verify/checkpoints', 
savepoints: 'null, fileStateThreshold: 20480), 
localRocksDbDirectories=[/Users/samrat/Code/oss/flink-verify/flink/rocksdb_local_db],
 enableIncrementalCheckpointing=true, numberOfTransferThreads=4, 
writeBatchSize=2097152} 2024-07-09 22:40:01,272 INFO 
org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader 
loads the state backend as RocksDBStateBackend 2024-07-09 22:40:01,273 INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Using legacy state backend 
RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 
'file:/Users/samrat/Code/oss/flink-verify/checkpoints', savepoints: 'null, 
fileStateThreshold: 20480), 
localRocksDbDirectories=[/Users/samrat/Code/oss/flink-verify/flink/rocksdb_local_db],
 enableIncrementalCheckpointing=true, numberOfTransferThreads=4, 
writeBatchSize=2097152} as Job checkpoint storage 2024-07-09 22:40:01,273 WARN 
org.apache.flink.runtime.jobmaster.JobMaster [] - Checkpoint storage passed via 
StreamExecutionEnvironment is ignored because legacy state backend 
'org.apache.flink.contrib.streaming.state.RocksDBStateBackend' is used. Legacy 
state backends can also be used as checkpoint storage and take precedence for 
backward-compatibility reasons.{code}
 
 

*Step 3 :* update configuration in [{{conf/flink-conf.yaml 
}}|https://nightlies.apache.org/flink/flink-docs-stable/deployment/config.html]add
 
state.backend.rocksdb.manual-compaction.min-interval: 1
 

*Step 4 :* execute the stream job from step 2

Logs 
 
{code:java}
INFO [] - Using standard YAML parser to load flink configuration file from 
/Users/samrat/Code/oss/flink/flink-dist/target/flink-1.20-SNAPSHOT-bin/flink-1.20-SNAPSHOT/conf/config.yaml.
 INFO [] - Loading configuration property: taskmanager.memory.process.size, 
1728m INFO [] - Loading configuration property: taskmanager.bind-host, 
localhost INFO [] - Loading configuration property: state.incremental, true 
INFO [] - Loading configuration property: 
jobmanager.execution.failover-strategy, region INFO [] - Loading configuration 
property: jobmanager.rpc.address, localhost INFO [] - Loading configuration 
property: jobmanager.memory.process.size, 1600m INFO [] - Loading configuration 
property: jobmanager.rpc.port, 6123 INFO [] - Loading configuration property: 
rest.bind-address, localhost INFO [] - Loading configuration property: 
jobmanager.bind-host, localhost INFO [] - Loading configuration property: 
state.backend.rocksdb.manual-compaction.min-interval, 1 INFO [] - Loading 
configuration property: taskmanager.host, localhost INFO [] - Loading 
configuration property: parallelism.default, 1 INFO [] - Loading configuration 
property: taskmanager.numberOfTaskSlots, 1 INFO [] - Loading configuration 
property: rest.address, localhost INFO [] - Loading configuration property: 
state.backend, rocksdb INFO [] - Loading configuration property: 
env.java.opts.all, --add-exports=java.base/sun.net.util=ALL-UNNAMED 
--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED 
--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED 
--add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.net=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED 
--add-opens=java.base/java.text=ALL-UNNAMED 
--add-opens=java.base/java.time=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED INFO [] - Loading 
configuration property: state.checkpoints.dir, 
file:///Users/samrat/Code/oss/checkpoint INFO [] - The derived from fraction 
jvm overhead memory (172.800mb (181193935 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead INFO [] - Final 
TaskExecutor Memory configuration: INFO [] - Total Process Memory: 1.688gb 
(1811939328 bytes){code}
 
 

!Screenshot 2024-07-09 at 11.27.03 PM.png! Manual compaction trigger logs 
 
{code:java}
2024-07-09 23:21:48,713 INFO 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
Attempting to load RocksDB native library and store it under 
'/var/folders/84/hvzmt4pn71n09p56zlfdh8nw0000gn/T/tm_localhost:51988-7fc151/tmp'
 2024-07-09 23:21:49,335 INFO 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
Successfully loaded RocksDB native library 2024-07-09 23:21:49,337 INFO 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
Getting shared memory for RocksDB: shareScope=SLOT, managed=false 2024-07-09 
23:21:49,337 INFO 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
Getting shared memory for RocksDB: shareScope=SLOT, managed=false 2024-07-09 
23:21:49,340 INFO 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
Obtained shared RocksDB cache of size 67108865 bytes 2024-07-09 23:21:49,340 
INFO org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
Obtained shared RocksDB cache of size 67108865 bytes 2024-07-09 23:21:49,355 
INFO 
org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionManager
 [] - Creating RocksDBManualCompactionManager with settings: 
RocksDBManualCompactionConfig{minInterval=1, maxManualCompactions=5, 
maxFileSizeToCompact=50 kb, minFilesToCompact=5, maxFilesToCompact=30, 
maxOutputFileSize=64 mb, maxAutoCompactions=30} 2024-07-09 23:21:49,355 INFO 
org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionManager
 [] - Creating RocksDBManualCompactionManager with settings: 
RocksDBManualCompactionConfig{minInterval=1, maxManualCompactions=5, 
maxFileSizeToCompact=50 kb, minFilesToCompact=5, maxFilesToCompact=30, 
maxOutputFileSize=64 mb, maxAutoCompactions=30} 2024-07-09 23:21:49,359 INFO 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - 
Finished building RocksDB keyed state-backend at 
/Users/samrat/Code/oss/flink-verify/flink/rocksdb_local_db/job_075fa36c24b0216fd674935642f7843c_op_WindowOperator_20ba6b65f97481d5570070de90e4e791__2_2__uuid_0276fe1b-3694-44d0-aa8c-53834eb21580.
 2024-07-09 23:21:49,359 INFO 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - 
Finished building RocksDB keyed state-backend at 
/Users/samrat/Code/oss/flink-verify/flink/rocksdb_local_db/job_075fa36c24b0216fd674935642f7843c_op_WindowOperator_20ba6b65f97481d5570070de90e4e791__1_2__uuid_e2ce66b8-2c02-4297-982a-4d26e48db1ed.{code}
 
 

*Observation :*

Number of files in rocksdb path remains constant . 

 

> Release Testing: Verify FLINK-26050 Too many small sst files in rocksdb state 
> backend when using time window created in ascending order
> ---------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-35738
>                 URL: https://issues.apache.org/jira/browse/FLINK-35738
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / State Backends
>    Affects Versions: 1.20.0
>            Reporter: Rui Fan
>            Assignee: Samrat Deb
>            Priority: Major
>              Labels: release-testing
>             Fix For: 1.20.0
>
>         Attachments: Screenshot 2024-07-09 at 11.27.03 PM.png
>
>
> The problem occurs when using RocksDB and specific queries/jobs (please see 
> the ticket for the detailed description).
> To test the solution, run the following query with RocksDB as a state backend:
>  
> {code:java}
> INSERT INTO top_5_highest_view_time
> SELECT *
> FROM   (
>                 SELECT   *,
>                          ROW_NUMBER() OVER (PARTITION BY window_start, 
> window_end ORDER BY view_time DESC) AS rownum
>                 FROM     (
>                                   SELECT   window_start,
>                                            window_end,
>                                            product_id,
>                                            SUM(view_time) AS view_time,
>                                            COUNT(*)       AS cnt
>                                   FROM     TABLE(TUMBLE(TABLE 
> `shoe_clickstream`, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
>                                   GROUP BY window_start,
>                                            window_end,
>                                            product_id))
> WHERE  rownum <= 5;{code}
>  
> With the feature disabled (default), the number of files in rocksdb working 
> directory (as well as in the checkpoint) should grow indefinitely.
>  
> With feature enabled, the number of files should stays constant (as they 
> should get merged with each other).
> To enable the feature, set 
> {code:java}
> state.backend.rocksdb.manual-compaction.min-interval{code}
>  set to 1 minute for example.
>  
> Please consult 
> [https://github.com/apache/flink/blob/e7d7db3b6f87e53d9bace2a16cf95e5f7a79087a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionOptions.java#L29]
>  for other options if necessary.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to