This is an automated email from the ASF dual-hosted git repository. tangyun pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 161defe0bb2dc8136133e07699b6ac433d52dc65 Author: Rui Fan <[email protected]> AuthorDate: Tue Feb 27 13:17:25 2024 +0800 [FLINK-34522][core] Changing the Time to Duration for StateTtlConfig.Builder.cleanupInRocksdbCompactFilter --- docs/content.zh/docs/dev/datastream/fault-tolerance/state.md | 9 +++++---- docs/content/docs/dev/datastream/fault-tolerance/state.md | 9 +++++---- .../org/apache/flink/api/common/state/StateTtlConfig.java | 12 ++++++------ .../apache/flink/api/common/state/StateTtlConfigTest.java | 3 ++- flink-python/pyflink/datastream/state.py | 8 ++++---- .../org/apache/flink/streaming/api/utils/ProtoUtilsTest.java | 3 ++- .../streaming/state/ttl/RocksDbTtlCompactFiltersManager.java | 2 +- 7 files changed, 25 insertions(+), 21 deletions(-) diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md index c7aaaf0c27f..80f444b13c5 100644 --- a/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md +++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md @@ -538,7 +538,7 @@ import org.apache.flink.api.common.state.StateTtlConfig; StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) - .cleanupInRocksdbCompactFilter(1000, Time.hours(1)) + .cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1)) .build(); ``` {{< /tab >}} @@ -548,18 +548,19 @@ import org.apache.flink.api.common.state.StateTtlConfig val ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) - .cleanupInRocksdbCompactFilter(1000, Time.hours(1)) + .cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1)) .build ``` {{< /tab >}} {{< tab "Python" >}} ```python +from pyflink.common import Duration from pyflink.common.time import Time from pyflink.datastream.state import StateTtlConfig ttl_config = StateTtlConfig \ .new_builder(Time.seconds(1)) \ - .cleanup_in_rocksdb_compact_filter(1000, Time.hours(1)) \ + .cleanup_in_rocksdb_compact_filter(1000, Duration.of_hours(1)) \ .build() ``` {{< /tab >}} @@ -573,7 +574,7 @@ RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一 定期压缩可以加速过期状态条目的清理,特别是对于很少访问的状态条目。 比这个值早的文件将被选取进行压缩,并重新写入与之前相同的 Level 中。 该功能可以确保文件定期通过压缩过滤器压缩。 -您可以通过`StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Time periodicCompactionTime)` +您可以通过`StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Duration periodicCompactionTime)` 方法设定定期压缩的时间。 定期压缩的时间的默认值是 30 天。 您可以将其设置为 0 以关闭定期压缩或设置一个较小的值以加速过期状态条目的清理,但它将会触发更多压缩。 diff --git a/docs/content/docs/dev/datastream/fault-tolerance/state.md b/docs/content/docs/dev/datastream/fault-tolerance/state.md index 24f14953b1c..0d5705ee196 100644 --- a/docs/content/docs/dev/datastream/fault-tolerance/state.md +++ b/docs/content/docs/dev/datastream/fault-tolerance/state.md @@ -601,7 +601,7 @@ import org.apache.flink.api.common.state.StateTtlConfig; StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) - .cleanupInRocksdbCompactFilter(1000, Time.hours(1)) + .cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1)) .build(); ``` {{< /tab >}} @@ -611,18 +611,19 @@ import org.apache.flink.api.common.state.StateTtlConfig val ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) - .cleanupInRocksdbCompactFilter(1000, Time.hours(1)) + .cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1)) .build ``` {{< /tab >}} {{< tab "Python" >}} ```python +from pyflink.common import Duration from pyflink.common.time import Time from pyflink.datastream.state import StateTtlConfig ttl_config = StateTtlConfig \ .new_builder(Time.seconds(1)) \ - .cleanup_in_rocksdb_compact_filter(1000, Time.hours(1)) \ + .cleanup_in_rocksdb_compact_filter(1000, Duration.of_hours(1)) \ .build() ``` {{< /tab >}} @@ -640,7 +641,7 @@ Periodic compaction could speed up expired state entries cleanup, especially for Files older than this value will be picked up for compaction, and re-written to the same level as they were before. It makes sure a file goes through compaction filters periodically. You can change it and pass a custom value to -`StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Time periodicCompactionTime)` method. +`StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Duration periodicCompactionTime)` method. The default value of Periodic compaction seconds is 30 days. You could set it to 0 to turn off periodic compaction or set a small value to speed up expired state entries cleanup, but it would trigger more compactions. diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java index 26ef85f49e1..d8cc99a5735 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java @@ -341,7 +341,7 @@ public class StateTtlConfig implements Serializable { */ @Nonnull public Builder cleanupInRocksdbCompactFilter( - long queryTimeAfterNumEntries, Time periodicCompactionTime) { + long queryTimeAfterNumEntries, Duration periodicCompactionTime) { strategies.put( CleanupStrategies.Strategies.ROCKSDB_COMPACTION_FILTER, new RocksdbCompactFilterCleanupStrategy( @@ -354,7 +354,7 @@ public class StateTtlConfig implements Serializable { * * <p>If some specific cleanup is configured, e.g. {@link #cleanupIncrementally(int, * boolean)} or {@link #cleanupInRocksdbCompactFilter(long)} or {@link - * #cleanupInRocksdbCompactFilter(long, Time)} , this setting does not disable it. + * #cleanupInRocksdbCompactFilter(long, Duration)} , this setting does not disable it. */ @Nonnull public Builder disableCleanupInBackground() { @@ -497,7 +497,7 @@ public class StateTtlConfig implements Serializable { * Default value is 30 days so that every file goes through the compaction process at least * once every 30 days if not compacted sooner. */ - static final Time DEFAULT_PERIODIC_COMPACTION_TIME = Time.days(30); + static final Duration DEFAULT_PERIODIC_COMPACTION_TIME = Duration.ofDays(30); static final RocksdbCompactFilterCleanupStrategy DEFAULT_ROCKSDB_COMPACT_FILTER_CLEANUP_STRATEGY = @@ -515,14 +515,14 @@ public class StateTtlConfig implements Serializable { * and re-written to the same level as they were before. It makes sure a file goes through * compaction filters periodically. 0 means turning off periodic compaction. */ - private final Time periodicCompactionTime; + private final Duration periodicCompactionTime; private RocksdbCompactFilterCleanupStrategy(long queryTimeAfterNumEntries) { this(queryTimeAfterNumEntries, DEFAULT_PERIODIC_COMPACTION_TIME); } private RocksdbCompactFilterCleanupStrategy( - long queryTimeAfterNumEntries, Time periodicCompactionTime) { + long queryTimeAfterNumEntries, Duration periodicCompactionTime) { this.queryTimeAfterNumEntries = queryTimeAfterNumEntries; this.periodicCompactionTime = periodicCompactionTime; } @@ -531,7 +531,7 @@ public class StateTtlConfig implements Serializable { return queryTimeAfterNumEntries; } - public Time getPeriodicCompactionTime() { + public Duration getPeriodicCompactionTime() { return periodicCompactionTime; } } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/StateTtlConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/StateTtlConfigTest.java index a90040c4629..8050ffb7eba 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/state/StateTtlConfigTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/StateTtlConfigTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.time.Time; import org.junit.Assert; import org.junit.Test; +import java.time.Duration; import java.util.Arrays; import java.util.List; @@ -74,7 +75,7 @@ public class StateTtlConfigTest { assertThat(incrementalCleanupStrategy.getCleanupSize(), is(5)); assertThat(incrementalCleanupStrategy.runCleanupForEveryRecord(), is(false)); assertThat(rocksdbCleanupStrategy.getQueryTimeAfterNumEntries(), is(1000L)); - assertThat(rocksdbCleanupStrategy.getPeriodicCompactionTime(), is(Time.days(30))); + assertThat(rocksdbCleanupStrategy.getPeriodicCompactionTime(), is(Duration.ofDays(30))); } @Test diff --git a/flink-python/pyflink/datastream/state.py b/flink-python/pyflink/datastream/state.py index 8450e41df66..a3dc9f6df84 100644 --- a/flink-python/pyflink/datastream/state.py +++ b/flink-python/pyflink/datastream/state.py @@ -19,7 +19,7 @@ from abc import ABC, abstractmethod from enum import Enum from typing import TypeVar, Generic, Iterable, List, Iterator, Dict, Tuple, Optional -from pyflink.common.time import Time +from pyflink.common.time import Duration, Time from pyflink.common.typeinfo import TypeInformation, Types __all__ = [ @@ -809,7 +809,7 @@ class StateTtlConfig(object): def cleanup_in_rocksdb_compact_filter( self, query_time_after_num_entries, - periodic_compaction_time=Time.days(30)) -> \ + periodic_compaction_time=Duration.of_days(30)) -> \ 'StateTtlConfig.Builder': """ Cleanup expired state while Rocksdb compaction is running. @@ -925,14 +925,14 @@ class StateTtlConfig(object): def __init__(self, query_time_after_num_entries: int, - periodic_compaction_time=Time.days(30)): + periodic_compaction_time=Duration.of_days(30)): self._query_time_after_num_entries = query_time_after_num_entries self._periodic_compaction_time = periodic_compaction_time def get_query_time_after_num_entries(self) -> int: return self._query_time_after_num_entries - def get_periodic_compaction_time(self) -> Time: + def get_periodic_compaction_time(self) -> Duration: return self._periodic_compaction_time EMPTY_STRATEGY = EmptyCleanupStrategy() diff --git a/flink-python/src/test/java/org/apache/flink/streaming/api/utils/ProtoUtilsTest.java b/flink-python/src/test/java/org/apache/flink/streaming/api/utils/ProtoUtilsTest.java index 0907896d801..5e81a4b9fef 100644 --- a/flink-python/src/test/java/org/apache/flink/streaming/api/utils/ProtoUtilsTest.java +++ b/flink-python/src/test/java/org/apache/flink/streaming/api/utils/ProtoUtilsTest.java @@ -25,6 +25,7 @@ import org.apache.flink.python.util.ProtoUtils; import org.junit.jupiter.api.Test; +import java.time.Duration; import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; @@ -118,6 +119,6 @@ class ProtoUtilsTest { assertThat(rocksdbCompactFilterCleanupStrategy.getQueryTimeAfterNumEntries()) .isEqualTo(1000); assertThat(rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime()) - .isEqualTo(Time.days(30)); + .isEqualTo(Duration.ofDays(30)); } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java index e89fe738a89..168da2f2af1 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java @@ -126,7 +126,7 @@ public class RocksDbTtlCompactFiltersManager { columnFamilyOptionsMap.get(stateDesc.getName()); Preconditions.checkNotNull(columnFamilyOptions); columnFamilyOptions.setPeriodicCompactionSeconds( - rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime().toSeconds()); + rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime().getSeconds()); long queryTimeAfterNumEntries = rocksdbCompactFilterCleanupStrategy.getQueryTimeAfterNumEntries();
