[
https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17402322#comment-17402322
]
Yun Tang commented on FLINK-23721:
----------------------------------
[~lmagics] Whether enable incremental checkpoints has no realationship with
RocksDB state TTL, could you share the configurations of your state backends?
> Flink SQL state TTL has no effect when using non-incremental
> RocksDBStateBackend
> --------------------------------------------------------------------------------
>
> Key: FLINK-23721
> URL: https://issues.apache.org/jira/browse/FLINK-23721
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends, Table SQL / Runtime
> Affects Versions: 1.13.0
> Reporter: Q Kang
> Priority: Major
>
> Take the following deduplication SQL program as an example:
> {code:java}
> SET table.exec.state.ttl=30s;
> INSERT INTO tmp.blackhole_order_done_log
> SELECT t.* FROM (
> SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC)
> AS rn
> FROM rtdw_ods.kafka_order_done_log
> ) AS t WHERE rn = 1;
> {code}
> When using RocksDBStateBackend with incremental checkpoint enabled, the size
> of deduplication state seems OK.
> FlinkCompactionFilter is also working, regarding to logs below:
> {code:java}
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter
> [] - RocksDB filter native code log: Call
> FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3481026D01, Value
> type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter
> [] - RocksDB filter native code log: Last access timestamp:
> 1628673475181 ms, ttlWithoutOverflow: 30000 ms, Current timestamp:
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter
> [] - RocksDB filter native code log: Decision: 1
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter
> [] - RocksDB filter native code log: Call
> FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3484064901, Value
> type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter
> [] - RocksDB filter native code log: Last access timestamp:
> 1628673672777 ms, ttlWithoutOverflow: 30000 ms, Current timestamp:
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter
> [] - RocksDB filter native code log: Decision: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter
> [] - RocksDB filter native code log: Call
> FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3483341D01, Value
> type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter
> [] - RocksDB filter native code log: Last access timestamp:
> 1628673618973 ms, ttlWithoutOverflow: 30000 ms, Current timestamp:
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter
> [] - RocksDB filter native code log: Decision: 1
> {code}
> However, after turning off incremental checkpoint, the state TTL seems not
> effective at all: FlinkCompactionFilter logs are not printed, and the size of
> deduplication state grows steadily up to several GBs (Kafka traffic is
> somewhat heavy, at about 1K records per sec).
> In contrast, FsStateBackend always works well.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)