Myasuka commented on code in PR #21835:
URL: https://github.com/apache/flink/pull/21835#discussion_r1241076668
##########
docs/content/docs/dev/datastream/fault-tolerance/state.md:
##########
@@ -633,6 +633,15 @@ Updating the timestamp more often can improve cleanup speed
but it decreases compaction performance because it uses JNI call from native
code.
The default background cleanup for RocksDB backend queries the current
timestamp each time 1000 entries have been processed.
+Periodic compaction could speed up expired state entries cleanup, especially
for state entries rarely accessed.
Review Comment:
`Periodic compaction` could only work when TTL is enabled, I think we shall
tell this clearly in the doc.
##########
docs/content/docs/dev/datastream/fault-tolerance/state.md:
##########
@@ -633,6 +633,15 @@ Updating the timestamp more often can improve cleanup speed
but it decreases compaction performance because it uses JNI call from native
code.
The default background cleanup for RocksDB backend queries the current
timestamp each time 1000 entries have been processed.
+Periodic compaction could speed up expired state entries cleanup, especially
for state entries rarely accessed.
+Files older than this value will be picked up for compaction, and re-written
to the same level as they were before.
+It makes sure a file goes through compaction filters periodically.
+You can change it and pass a custom value to
+`StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long
queryTimeAfterNumEntries, Time periodicCompactionTime)` method.
+The default value of Periodic compaction seconds is 0xfffffffffffffffeL which
lets RocksDB control this feature as needed.
Review Comment:
I think `The default value is 30 days` is better than `0xfffffffffffffffeL`
##########
docs/content.zh/docs/dev/datastream/fault-tolerance/state.md:
##########
@@ -568,6 +568,14 @@ Flink 处理一定条数的状态数据后,会使用当前时间戳来检测 R
时间戳更新的越频繁,状态的清理越及时,但由于压缩会有调用 JNI 的开销,因此会影响整体的压缩性能。
RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一次。
+定期压缩可以加速过期状态条目的清理,特别是对于很少访问的状态条目。
+比这个值早的文件将被选取进行压缩,并重新写入与之前相同的 Level 中。
+该功能可以确保文件定期通过压缩过滤器压缩。
+您可以通过`StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long
queryTimeAfterNumEntries, Time periodicCompactionTime)`
+方法设定定期压缩的时间。
+定期压缩的时间的默认值是 Time.seconds(0xfffffffffffffffeL),即让 RocksDB 根据需要控制这个特性。
Review Comment:
Please update the chinese docs as my comments wrote.
##########
flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java:
##########
@@ -298,12 +298,40 @@ public Builder cleanupInRocksdbCompactFilter(long
queryTimeAfterNumEntries) {
return this;
}
+ /**
+ * Cleanup expired state while Rocksdb compaction is running.
+ *
+ * <p>RocksDB compaction filter will query current timestamp, used to
check expiration, from
+ * Flink every time after processing {@code queryTimeAfterNumEntries}
number of state
+ * entries. Updating the timestamp more often can improve cleanup
speed but it decreases
+ * compaction performance because it uses JNI call from native code.
+ *
+ * <p>Periodic compaction could speed up expired state entries
cleanup, especially for state
+ * entries rarely accessed. Files older than this value will be picked
up for compaction,
+ * and re-written to the same level as they were before. It makes sure
a file goes through
+ * compaction filters periodically.
+ *
+ * @param queryTimeAfterNumEntries number of state entries to process
by compaction filter
+ * before updating current timestamp
+ * @param periodicCompactionTime periodic compaction which could speed
up expired state
+ * cleanup. 0 means turning off periodic compaction.
+ */
+ @Nonnull
+ public Builder cleanupInRocksdbCompactFilter(
+ long queryTimeAfterNumEntries, Time periodicCompactionTime) {
+ strategies.put(
+ CleanupStrategies.Strategies.ROCKSDB_COMPACTION_FILTER,
+ new RocksdbCompactFilterCleanupStrategy(
+ queryTimeAfterNumEntries, periodicCompactionTime));
+ return this;
+ }
+
/**
* Disable default cleanup of expired state in background (enabled by
default).
*
* <p>If some specific cleanup is configured, e.g. {@link
#cleanupIncrementally(int,
- * boolean)} or {@link #cleanupInRocksdbCompactFilter(long)}, this
setting does not disable
- * it.
+ * boolean)} or {@link #cleanupInRocksdbCompactFilter(long)} or {@link
+ * #cleanupInRocksdbCompactFilter(long, long)} , this setting does not
disable it.
Review Comment:
Just a reminder, the docs here is not correct, it should be
`#cleanupInRocksdbCompactFilter(long, time)}`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]