[
https://issues.apache.org/jira/browse/FLINK-35738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864324#comment-17864324
]
Samrat Deb edited comment on FLINK-35738 at 7/9/24 6:05 PM:
------------------------------------------------------------
Hi i have verified FLINK-26050, Steps done to validate the issue is properly
fixed
*Step 1:* Added the following configuration in
[{{conf/flink-conf.yaml}}|https://nightlies.apache.org/flink/flink-docs-stable/deployment/config.html]:
{{state.backend: rocksdb}}
{{state.backend.incremental: true}}
{{state.checkpoints.dir: <local_path>}}
{{}}
*Step 2:* submit a flink job defined in
FLINK-26050(https://issues.apache.org/jira/browse/FLINK-26050)
After running for long time following logs and state of rocksdb is obtained
{code:java}
2024-07-09 22:40:01,271 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Using job/cluster config to configure application-defined state backend:
RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints:
'file:/Users/samrat/Code/oss/flink-verify/checkpoints', savepoints: 'null,
fileStateThreshold: -1),
localRocksDbDirectories=[/Users/samrat/Code/oss/flink-verify/flink/rocksdb_local_db],
enableIncrementalCheckpointing=true, numberOfTransferThreads=4,
writeBatchSize=2097152} 2024-07-09 22:40:01,271 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using
predefined options: DEFAULT. 2024-07-09 22:40:01,272 INFO
org.apache.flink.runtime.jobmaster.JobMaster [] - Using application-defined
state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend
(checkpoints: 'file:/Users/samrat/Code/oss/flink-verify/checkpoints',
savepoints: 'null, fileStateThreshold: 20480),
localRocksDbDirectories=[/Users/samrat/Code/oss/flink-verify/flink/rocksdb_local_db],
enableIncrementalCheckpointing=true, numberOfTransferThreads=4,
writeBatchSize=2097152} 2024-07-09 22:40:01,272 INFO
org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader
loads the state backend as RocksDBStateBackend 2024-07-09 22:40:01,273 INFO
org.apache.flink.runtime.jobmaster.JobMaster [] - Using legacy state backend
RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints:
'file:/Users/samrat/Code/oss/flink-verify/checkpoints', savepoints: 'null,
fileStateThreshold: 20480),
localRocksDbDirectories=[/Users/samrat/Code/oss/flink-verify/flink/rocksdb_local_db],
enableIncrementalCheckpointing=true, numberOfTransferThreads=4,
writeBatchSize=2097152} as Job checkpoint storage 2024-07-09 22:40:01,273 WARN
org.apache.flink.runtime.jobmaster.JobMaster [] - Checkpoint storage passed via
StreamExecutionEnvironment is ignored because legacy state backend
'org.apache.flink.contrib.streaming.state.RocksDBStateBackend' is used. Legacy
state backends can also be used as checkpoint storage and take precedence for
backward-compatibility reasons.{code}
*Step 3 :* update configuration in [{\{conf/flink-conf.yaml
}}|https://nightlies.apache.org/flink/flink-docs-stable/deployment/config.html]add
state.backend.rocksdb.manual-compaction.min-interval: 1
*Step 4 :* execute the stream job from step 2
Logs
{code:java}
INFO [] - Using standard YAML parser to load flink configuration file from
/Users/samrat/Code/oss/flink/flink-dist/target/flink-1.20-SNAPSHOT-bin/flink-1.20-SNAPSHOT/conf/config.yaml.
INFO [] - Loading configuration property: taskmanager.memory.process.size,
1728m INFO [] - Loading configuration property: taskmanager.bind-host,
localhost INFO [] - Loading configuration property: state.incremental, true
INFO [] - Loading configuration property:
jobmanager.execution.failover-strategy, region INFO [] - Loading configuration
property: jobmanager.rpc.address, localhost INFO [] - Loading configuration
property: jobmanager.memory.process.size, 1600m INFO [] - Loading configuration
property: jobmanager.rpc.port, 6123 INFO [] - Loading configuration property:
rest.bind-address, localhost INFO [] - Loading configuration property:
jobmanager.bind-host, localhost INFO [] - Loading configuration property:
state.backend.rocksdb.manual-compaction.min-interval, 1 INFO [] - Loading
configuration property: taskmanager.host, localhost INFO [] - Loading
configuration property: parallelism.default, 1 INFO [] - Loading configuration
property: taskmanager.numberOfTaskSlots, 1 INFO [] - Loading configuration
property: rest.address, localhost INFO [] - Loading configuration property:
state.backend, rocksdb INFO [] - Loading configuration property:
env.java.opts.all, --add-exports=java.base/sun.net.util=ALL-UNNAMED
--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.text=ALL-UNNAMED
--add-opens=java.base/java.time=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED INFO [] - Loading
configuration property: state.checkpoints.dir,
file:///Users/samrat/Code/oss/checkpoint INFO [] - The derived from fraction
jvm overhead memory (172.800mb (181193935 bytes)) is less than its min value
192.000mb (201326592 bytes), min value will be used instead INFO [] - Final
TaskExecutor Memory configuration: INFO [] - Total Process Memory: 1.688gb
(1811939328 bytes){code}
!Screenshot 2024-07-09 at 11.27.03 PM.png! Manual compaction trigger logs
{code:java}
2024-07-09 23:21:48,713 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
Attempting to load RocksDB native library and store it under
'/var/folders/84/hvzmt4pn71n09p56zlfdh8nw0000gn/T/tm_localhost:51988-7fc151/tmp'
2024-07-09 23:21:49,335 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
Successfully loaded RocksDB native library 2024-07-09 23:21:49,337 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
Getting shared memory for RocksDB: shareScope=SLOT, managed=false 2024-07-09
23:21:49,337 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
Getting shared memory for RocksDB: shareScope=SLOT, managed=false 2024-07-09
23:21:49,340 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
Obtained shared RocksDB cache of size 67108865 bytes 2024-07-09 23:21:49,340
INFO org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
Obtained shared RocksDB cache of size 67108865 bytes 2024-07-09 23:21:49,355
INFO
org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionManager
[] - Creating RocksDBManualCompactionManager with settings:
RocksDBManualCompactionConfig{minInterval=1, maxManualCompactions=5,
maxFileSizeToCompact=50 kb, minFilesToCompact=5, maxFilesToCompact=30,
maxOutputFileSize=64 mb, maxAutoCompactions=30} 2024-07-09 23:21:49,355 INFO
org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionManager
[] - Creating RocksDBManualCompactionManager with settings:
RocksDBManualCompactionConfig{minInterval=1, maxManualCompactions=5,
maxFileSizeToCompact=50 kb, minFilesToCompact=5, maxFilesToCompact=30,
maxOutputFileSize=64 mb, maxAutoCompactions=30} 2024-07-09 23:21:49,359 INFO
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] -
Finished building RocksDB keyed state-backend at
/Users/samrat/Code/oss/flink-verify/flink/rocksdb_local_db/job_075fa36c24b0216fd674935642f7843c_op_WindowOperator_20ba6b65f97481d5570070de90e4e791__2_2__uuid_0276fe1b-3694-44d0-aa8c-53834eb21580.
2024-07-09 23:21:49,359 INFO
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] -
Finished building RocksDB keyed state-backend at
/Users/samrat/Code/oss/flink-verify/flink/rocksdb_local_db/job_075fa36c24b0216fd674935642f7843c_op_WindowOperator_20ba6b65f97481d5570070de90e4e791__1_2__uuid_e2ce66b8-2c02-4297-982a-4d26e48db1ed.{code}
Stoping job :
{code:java}
./bin/flink stop 075fa36c24b0216fd674935642f7843c
Suspending job "075fa36c24b0216fd674935642f7843c" with a CANONICAL savepoint.
Triggering stop-with-savepoint for job 075fa36c24b0216fd674935642f7843c.
Waiting for response...
Savepoint completed. Path:
file:/Users/samrat/Code/oss/savepoint/savepoint-075fa3-9f7389205954{code}
*Observation :*
- Number of files in rocksdb path remains constant .
- Manual Compaction trigger logs observed
was (Author: samrat007):
Hi i have verified FLINK-26050, Steps done to validate the issue is properly
fixed
*Step 1:* Added the following configuration in
[{{conf/flink-conf.yaml}}|https://nightlies.apache.org/flink/flink-docs-stable/deployment/config.html]:
{{state.backend: rocksdb}}
{{state.backend.incremental: true}}
{{state.checkpoints.dir: <local_path>}}
{{}}
*Step 2:* submit a flink job defined in
[FLINK-26050](https://issues.apache.org/jira/browse/FLINK-26050)
After running for long time following logs and state of rocksdb is obtained
{code:java}
2024-07-09 22:40:01,271 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
Using job/cluster config to configure application-defined state backend:
RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints:
'file:/Users/samrat/Code/oss/flink-verify/checkpoints', savepoints: 'null,
fileStateThreshold: -1),
localRocksDbDirectories=[/Users/samrat/Code/oss/flink-verify/flink/rocksdb_local_db],
enableIncrementalCheckpointing=true, numberOfTransferThreads=4,
writeBatchSize=2097152} 2024-07-09 22:40:01,271 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using
predefined options: DEFAULT. 2024-07-09 22:40:01,272 INFO
org.apache.flink.runtime.jobmaster.JobMaster [] - Using application-defined
state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend
(checkpoints: 'file:/Users/samrat/Code/oss/flink-verify/checkpoints',
savepoints: 'null, fileStateThreshold: 20480),
localRocksDbDirectories=[/Users/samrat/Code/oss/flink-verify/flink/rocksdb_local_db],
enableIncrementalCheckpointing=true, numberOfTransferThreads=4,
writeBatchSize=2097152} 2024-07-09 22:40:01,272 INFO
org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader
loads the state backend as RocksDBStateBackend 2024-07-09 22:40:01,273 INFO
org.apache.flink.runtime.jobmaster.JobMaster [] - Using legacy state backend
RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints:
'file:/Users/samrat/Code/oss/flink-verify/checkpoints', savepoints: 'null,
fileStateThreshold: 20480),
localRocksDbDirectories=[/Users/samrat/Code/oss/flink-verify/flink/rocksdb_local_db],
enableIncrementalCheckpointing=true, numberOfTransferThreads=4,
writeBatchSize=2097152} as Job checkpoint storage 2024-07-09 22:40:01,273 WARN
org.apache.flink.runtime.jobmaster.JobMaster [] - Checkpoint storage passed via
StreamExecutionEnvironment is ignored because legacy state backend
'org.apache.flink.contrib.streaming.state.RocksDBStateBackend' is used. Legacy
state backends can also be used as checkpoint storage and take precedence for
backward-compatibility reasons.{code}
*Step 3 :* update configuration in [{{conf/flink-conf.yaml
}}|https://nightlies.apache.org/flink/flink-docs-stable/deployment/config.html]add
state.backend.rocksdb.manual-compaction.min-interval: 1
*Step 4 :* execute the stream job from step 2
Logs
{code:java}
INFO [] - Using standard YAML parser to load flink configuration file from
/Users/samrat/Code/oss/flink/flink-dist/target/flink-1.20-SNAPSHOT-bin/flink-1.20-SNAPSHOT/conf/config.yaml.
INFO [] - Loading configuration property: taskmanager.memory.process.size,
1728m INFO [] - Loading configuration property: taskmanager.bind-host,
localhost INFO [] - Loading configuration property: state.incremental, true
INFO [] - Loading configuration property:
jobmanager.execution.failover-strategy, region INFO [] - Loading configuration
property: jobmanager.rpc.address, localhost INFO [] - Loading configuration
property: jobmanager.memory.process.size, 1600m INFO [] - Loading configuration
property: jobmanager.rpc.port, 6123 INFO [] - Loading configuration property:
rest.bind-address, localhost INFO [] - Loading configuration property:
jobmanager.bind-host, localhost INFO [] - Loading configuration property:
state.backend.rocksdb.manual-compaction.min-interval, 1 INFO [] - Loading
configuration property: taskmanager.host, localhost INFO [] - Loading
configuration property: parallelism.default, 1 INFO [] - Loading configuration
property: taskmanager.numberOfTaskSlots, 1 INFO [] - Loading configuration
property: rest.address, localhost INFO [] - Loading configuration property:
state.backend, rocksdb INFO [] - Loading configuration property:
env.java.opts.all, --add-exports=java.base/sun.net.util=ALL-UNNAMED
--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.text=ALL-UNNAMED
--add-opens=java.base/java.time=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED INFO [] - Loading
configuration property: state.checkpoints.dir,
file:///Users/samrat/Code/oss/checkpoint INFO [] - The derived from fraction
jvm overhead memory (172.800mb (181193935 bytes)) is less than its min value
192.000mb (201326592 bytes), min value will be used instead INFO [] - Final
TaskExecutor Memory configuration: INFO [] - Total Process Memory: 1.688gb
(1811939328 bytes){code}
!Screenshot 2024-07-09 at 11.27.03 PM.png! Manual compaction trigger logs
{code:java}
2024-07-09 23:21:48,713 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
Attempting to load RocksDB native library and store it under
'/var/folders/84/hvzmt4pn71n09p56zlfdh8nw0000gn/T/tm_localhost:51988-7fc151/tmp'
2024-07-09 23:21:49,335 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
Successfully loaded RocksDB native library 2024-07-09 23:21:49,337 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
Getting shared memory for RocksDB: shareScope=SLOT, managed=false 2024-07-09
23:21:49,337 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
Getting shared memory for RocksDB: shareScope=SLOT, managed=false 2024-07-09
23:21:49,340 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
Obtained shared RocksDB cache of size 67108865 bytes 2024-07-09 23:21:49,340
INFO org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
Obtained shared RocksDB cache of size 67108865 bytes 2024-07-09 23:21:49,355
INFO
org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionManager
[] - Creating RocksDBManualCompactionManager with settings:
RocksDBManualCompactionConfig{minInterval=1, maxManualCompactions=5,
maxFileSizeToCompact=50 kb, minFilesToCompact=5, maxFilesToCompact=30,
maxOutputFileSize=64 mb, maxAutoCompactions=30} 2024-07-09 23:21:49,355 INFO
org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionManager
[] - Creating RocksDBManualCompactionManager with settings:
RocksDBManualCompactionConfig{minInterval=1, maxManualCompactions=5,
maxFileSizeToCompact=50 kb, minFilesToCompact=5, maxFilesToCompact=30,
maxOutputFileSize=64 mb, maxAutoCompactions=30} 2024-07-09 23:21:49,359 INFO
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] -
Finished building RocksDB keyed state-backend at
/Users/samrat/Code/oss/flink-verify/flink/rocksdb_local_db/job_075fa36c24b0216fd674935642f7843c_op_WindowOperator_20ba6b65f97481d5570070de90e4e791__2_2__uuid_0276fe1b-3694-44d0-aa8c-53834eb21580.
2024-07-09 23:21:49,359 INFO
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] -
Finished building RocksDB keyed state-backend at
/Users/samrat/Code/oss/flink-verify/flink/rocksdb_local_db/job_075fa36c24b0216fd674935642f7843c_op_WindowOperator_20ba6b65f97481d5570070de90e4e791__1_2__uuid_e2ce66b8-2c02-4297-982a-4d26e48db1ed.{code}
*Observation :*
Number of files in rocksdb path remains constant .
> Release Testing: Verify FLINK-26050 Too many small sst files in rocksdb state
> backend when using time window created in ascending order
> ---------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-35738
> URL: https://issues.apache.org/jira/browse/FLINK-35738
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / State Backends
> Affects Versions: 1.20.0
> Reporter: Rui Fan
> Assignee: Samrat Deb
> Priority: Major
> Labels: release-testing
> Fix For: 1.20.0
>
> Attachments: Screenshot 2024-07-09 at 11.27.03 PM.png
>
>
> The problem occurs when using RocksDB and specific queries/jobs (please see
> the ticket for the detailed description).
> To test the solution, run the following query with RocksDB as a state backend:
>
> {code:java}
> INSERT INTO top_5_highest_view_time
> SELECT *
> FROM (
> SELECT *,
> ROW_NUMBER() OVER (PARTITION BY window_start,
> window_end ORDER BY view_time DESC) AS rownum
> FROM (
> SELECT window_start,
> window_end,
> product_id,
> SUM(view_time) AS view_time,
> COUNT(*) AS cnt
> FROM TABLE(TUMBLE(TABLE
> `shoe_clickstream`, DESCRIPTOR($rowtime), INTERVAL '10' MINUTES))
> GROUP BY window_start,
> window_end,
> product_id))
> WHERE rownum <= 5;{code}
>
> With the feature disabled (default), the number of files in rocksdb working
> directory (as well as in the checkpoint) should grow indefinitely.
>
> With feature enabled, the number of files should stays constant (as they
> should get merged with each other).
> To enable the feature, set
> {code:java}
> state.backend.rocksdb.manual-compaction.min-interval{code}
> set to 1 minute for example.
>
> Please consult
> [https://github.com/apache/flink/blob/e7d7db3b6f87e53d9bace2a16cf95e5f7a79087a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionOptions.java#L29]
> for other options if necessary.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)