This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 161defe0bb2dc8136133e07699b6ac433d52dc65
Author: Rui Fan <[email protected]>
AuthorDate: Tue Feb 27 13:17:25 2024 +0800

    [FLINK-34522][core] Changing the Time to Duration for 
StateTtlConfig.Builder.cleanupInRocksdbCompactFilter
---
 docs/content.zh/docs/dev/datastream/fault-tolerance/state.md |  9 +++++----
 docs/content/docs/dev/datastream/fault-tolerance/state.md    |  9 +++++----
 .../org/apache/flink/api/common/state/StateTtlConfig.java    | 12 ++++++------
 .../apache/flink/api/common/state/StateTtlConfigTest.java    |  3 ++-
 flink-python/pyflink/datastream/state.py                     |  8 ++++----
 .../org/apache/flink/streaming/api/utils/ProtoUtilsTest.java |  3 ++-
 .../streaming/state/ttl/RocksDbTtlCompactFiltersManager.java |  2 +-
 7 files changed, 25 insertions(+), 21 deletions(-)

diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md 
b/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md
index c7aaaf0c27f..80f444b13c5 100644
--- a/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md
+++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md
@@ -538,7 +538,7 @@ import org.apache.flink.api.common.state.StateTtlConfig;
 
 StateTtlConfig ttlConfig = StateTtlConfig
     .newBuilder(Time.seconds(1))
-    .cleanupInRocksdbCompactFilter(1000, Time.hours(1))
+    .cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1))
     .build();
 ```
 {{< /tab >}}
@@ -548,18 +548,19 @@ import org.apache.flink.api.common.state.StateTtlConfig
 
 val ttlConfig = StateTtlConfig
     .newBuilder(Time.seconds(1))
-    .cleanupInRocksdbCompactFilter(1000, Time.hours(1))
+    .cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1))
     .build
 ```
 {{< /tab >}}
 {{< tab "Python" >}}
 ```python
+from pyflink.common import Duration
 from pyflink.common.time import Time
 from pyflink.datastream.state import StateTtlConfig
 
 ttl_config = StateTtlConfig \
   .new_builder(Time.seconds(1)) \
-  .cleanup_in_rocksdb_compact_filter(1000, Time.hours(1)) \
+  .cleanup_in_rocksdb_compact_filter(1000, Duration.of_hours(1)) \
   .build()
 ```
 {{< /tab >}}
@@ -573,7 +574,7 @@ RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一
 定期压缩可以加速过期状态条目的清理,特别是对于很少访问的状态条目。
 比这个值早的文件将被选取进行压缩,并重新写入与之前相同的 Level 中。 
 该功能可以确保文件定期通过压缩过滤器压缩。
-您可以通过`StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long 
queryTimeAfterNumEntries, Time periodicCompactionTime)` 
+您可以通过`StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long 
queryTimeAfterNumEntries, Duration periodicCompactionTime)` 
 方法设定定期压缩的时间。
 定期压缩的时间的默认值是 30 天。
 您可以将其设置为 0 以关闭定期压缩或设置一个较小的值以加速过期状态条目的清理,但它将会触发更多压缩。
diff --git a/docs/content/docs/dev/datastream/fault-tolerance/state.md 
b/docs/content/docs/dev/datastream/fault-tolerance/state.md
index 24f14953b1c..0d5705ee196 100644
--- a/docs/content/docs/dev/datastream/fault-tolerance/state.md
+++ b/docs/content/docs/dev/datastream/fault-tolerance/state.md
@@ -601,7 +601,7 @@ import org.apache.flink.api.common.state.StateTtlConfig;
 
 StateTtlConfig ttlConfig = StateTtlConfig
     .newBuilder(Time.seconds(1))
-    .cleanupInRocksdbCompactFilter(1000, Time.hours(1))
+    .cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1))
     .build();
 ```
 {{< /tab >}}
@@ -611,18 +611,19 @@ import org.apache.flink.api.common.state.StateTtlConfig
 
 val ttlConfig = StateTtlConfig
     .newBuilder(Time.seconds(1))
-    .cleanupInRocksdbCompactFilter(1000, Time.hours(1))
+    .cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1))
     .build
 ```
 {{< /tab >}}
 {{< tab "Python" >}}
 ```python
+from pyflink.common import Duration
 from pyflink.common.time import Time
 from pyflink.datastream.state import StateTtlConfig
 
 ttl_config = StateTtlConfig \
   .new_builder(Time.seconds(1)) \
-  .cleanup_in_rocksdb_compact_filter(1000, Time.hours(1)) \
+  .cleanup_in_rocksdb_compact_filter(1000, Duration.of_hours(1)) \
   .build()
 ```
 {{< /tab >}}
@@ -640,7 +641,7 @@ Periodic compaction could speed up expired state entries 
cleanup, especially for
 Files older than this value will be picked up for compaction, and re-written 
to the same level as they were before. 
 It makes sure a file goes through compaction filters periodically.
 You can change it and pass a custom value to
-`StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long 
queryTimeAfterNumEntries, Time periodicCompactionTime)` method.
+`StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long 
queryTimeAfterNumEntries, Duration periodicCompactionTime)` method.
 The default value of Periodic compaction seconds is 30 days.
 You could set it to 0 to turn off periodic compaction or set a small value to 
speed up expired state entries cleanup, but it
 would trigger more compactions.
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
index 26ef85f49e1..d8cc99a5735 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
@@ -341,7 +341,7 @@ public class StateTtlConfig implements Serializable {
          */
         @Nonnull
         public Builder cleanupInRocksdbCompactFilter(
-                long queryTimeAfterNumEntries, Time periodicCompactionTime) {
+                long queryTimeAfterNumEntries, Duration 
periodicCompactionTime) {
             strategies.put(
                     CleanupStrategies.Strategies.ROCKSDB_COMPACTION_FILTER,
                     new RocksdbCompactFilterCleanupStrategy(
@@ -354,7 +354,7 @@ public class StateTtlConfig implements Serializable {
          *
          * <p>If some specific cleanup is configured, e.g. {@link 
#cleanupIncrementally(int,
          * boolean)} or {@link #cleanupInRocksdbCompactFilter(long)} or {@link
-         * #cleanupInRocksdbCompactFilter(long, Time)} , this setting does not 
disable it.
+         * #cleanupInRocksdbCompactFilter(long, Duration)} , this setting does 
not disable it.
          */
         @Nonnull
         public Builder disableCleanupInBackground() {
@@ -497,7 +497,7 @@ public class StateTtlConfig implements Serializable {
          * Default value is 30 days so that every file goes through the 
compaction process at least
          * once every 30 days if not compacted sooner.
          */
-        static final Time DEFAULT_PERIODIC_COMPACTION_TIME = Time.days(30);
+        static final Duration DEFAULT_PERIODIC_COMPACTION_TIME = 
Duration.ofDays(30);
 
         static final RocksdbCompactFilterCleanupStrategy
                 DEFAULT_ROCKSDB_COMPACT_FILTER_CLEANUP_STRATEGY =
@@ -515,14 +515,14 @@ public class StateTtlConfig implements Serializable {
          * and re-written to the same level as they were before. It makes sure 
a file goes through
          * compaction filters periodically. 0 means turning off periodic 
compaction.
          */
-        private final Time periodicCompactionTime;
+        private final Duration periodicCompactionTime;
 
         private RocksdbCompactFilterCleanupStrategy(long 
queryTimeAfterNumEntries) {
             this(queryTimeAfterNumEntries, DEFAULT_PERIODIC_COMPACTION_TIME);
         }
 
         private RocksdbCompactFilterCleanupStrategy(
-                long queryTimeAfterNumEntries, Time periodicCompactionTime) {
+                long queryTimeAfterNumEntries, Duration 
periodicCompactionTime) {
             this.queryTimeAfterNumEntries = queryTimeAfterNumEntries;
             this.periodicCompactionTime = periodicCompactionTime;
         }
@@ -531,7 +531,7 @@ public class StateTtlConfig implements Serializable {
             return queryTimeAfterNumEntries;
         }
 
-        public Time getPeriodicCompactionTime() {
+        public Duration getPeriodicCompactionTime() {
             return periodicCompactionTime;
         }
     }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/StateTtlConfigTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/StateTtlConfigTest.java
index a90040c4629..8050ffb7eba 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/state/StateTtlConfigTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/StateTtlConfigTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.time.Time;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
 
@@ -74,7 +75,7 @@ public class StateTtlConfigTest {
         assertThat(incrementalCleanupStrategy.getCleanupSize(), is(5));
         assertThat(incrementalCleanupStrategy.runCleanupForEveryRecord(), 
is(false));
         assertThat(rocksdbCleanupStrategy.getQueryTimeAfterNumEntries(), 
is(1000L));
-        assertThat(rocksdbCleanupStrategy.getPeriodicCompactionTime(), 
is(Time.days(30)));
+        assertThat(rocksdbCleanupStrategy.getPeriodicCompactionTime(), 
is(Duration.ofDays(30)));
     }
 
     @Test
diff --git a/flink-python/pyflink/datastream/state.py 
b/flink-python/pyflink/datastream/state.py
index 8450e41df66..a3dc9f6df84 100644
--- a/flink-python/pyflink/datastream/state.py
+++ b/flink-python/pyflink/datastream/state.py
@@ -19,7 +19,7 @@ from abc import ABC, abstractmethod
 from enum import Enum
 from typing import TypeVar, Generic, Iterable, List, Iterator, Dict, Tuple, 
Optional
 
-from pyflink.common.time import Time
+from pyflink.common.time import Duration, Time
 from pyflink.common.typeinfo import TypeInformation, Types
 
 __all__ = [
@@ -809,7 +809,7 @@ class StateTtlConfig(object):
         def cleanup_in_rocksdb_compact_filter(
                 self,
                 query_time_after_num_entries,
-                periodic_compaction_time=Time.days(30)) -> \
+                periodic_compaction_time=Duration.of_days(30)) -> \
                 'StateTtlConfig.Builder':
             """
             Cleanup expired state while Rocksdb compaction is running.
@@ -925,14 +925,14 @@ class StateTtlConfig(object):
 
             def __init__(self,
                          query_time_after_num_entries: int,
-                         periodic_compaction_time=Time.days(30)):
+                         periodic_compaction_time=Duration.of_days(30)):
                 self._query_time_after_num_entries = 
query_time_after_num_entries
                 self._periodic_compaction_time = periodic_compaction_time
 
             def get_query_time_after_num_entries(self) -> int:
                 return self._query_time_after_num_entries
 
-            def get_periodic_compaction_time(self) -> Time:
+            def get_periodic_compaction_time(self) -> Duration:
                 return self._periodic_compaction_time
 
         EMPTY_STRATEGY = EmptyCleanupStrategy()
diff --git 
a/flink-python/src/test/java/org/apache/flink/streaming/api/utils/ProtoUtilsTest.java
 
b/flink-python/src/test/java/org/apache/flink/streaming/api/utils/ProtoUtilsTest.java
index 0907896d801..5e81a4b9fef 100644
--- 
a/flink-python/src/test/java/org/apache/flink/streaming/api/utils/ProtoUtilsTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/streaming/api/utils/ProtoUtilsTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.python.util.ProtoUtils;
 
 import org.junit.jupiter.api.Test;
 
+import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -118,6 +119,6 @@ class ProtoUtilsTest {
         
assertThat(rocksdbCompactFilterCleanupStrategy.getQueryTimeAfterNumEntries())
                 .isEqualTo(1000);
         
assertThat(rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime())
-                .isEqualTo(Time.days(30));
+                .isEqualTo(Duration.ofDays(30));
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java
index e89fe738a89..168da2f2af1 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java
@@ -126,7 +126,7 @@ public class RocksDbTtlCompactFiltersManager {
                     columnFamilyOptionsMap.get(stateDesc.getName());
             Preconditions.checkNotNull(columnFamilyOptions);
             columnFamilyOptions.setPeriodicCompactionSeconds(
-                    
rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime().toSeconds());
+                    
rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime().getSeconds());
 
             long queryTimeAfterNumEntries =
                     
rocksdbCompactFilterCleanupStrategy.getQueryTimeAfterNumEntries();

Reply via email to