This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 30994a1  [FLINK-25466][state] Enable state descriptor to handle 
StateTtlConfig#DISABLED
30994a1 is described below

commit 30994a1788085034ed1b467a5df6253ee44b1da6
Author: Yun Tang <[email protected]>
AuthorDate: Tue Dec 28 15:53:33 2021 +0800

    [FLINK-25466][state] Enable state descriptor to handle 
StateTtlConfig#DISABLED
---
 .../org/apache/flink/api/common/state/StateDescriptor.java  |  9 +++++----
 .../apache/flink/api/common/state/StateDescriptorTest.java  | 13 +++++++++++++
 2 files changed, 18 insertions(+), 4 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 05495e9..c985ca3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -264,10 +264,11 @@ public abstract class StateDescriptor<S extends State, T> 
implements Serializabl
      */
     public void enableTimeToLive(StateTtlConfig ttlConfig) {
         Preconditions.checkNotNull(ttlConfig);
-        Preconditions.checkArgument(
-                ttlConfig.getUpdateType() != StateTtlConfig.UpdateType.Disabled
-                        && queryableStateName == null,
-                "Queryable state is currently not supported with TTL");
+        if (ttlConfig.isEnabled()) {
+            Preconditions.checkArgument(
+                    queryableStateName == null,
+                    "Queryable state is currently not supported with TTL");
+        }
         this.ttlConfig = ttlConfig;
     }
 
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
index b8c7162..07f9b85 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
@@ -19,8 +19,10 @@
 package org.apache.flink.api.common.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
@@ -251,6 +253,17 @@ public class StateDescriptorTest {
         threads.clear();
     }
 
+    @Test
+    public void testStateTTlConfig() {
+        ValueStateDescriptor<Integer> stateDescriptor =
+                new ValueStateDescriptor<>("test-state", 
IntSerializer.INSTANCE);
+        
stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.minutes(60)).build());
+        assertTrue(stateDescriptor.getTtlConfig().isEnabled());
+
+        stateDescriptor.enableTimeToLive(StateTtlConfig.DISABLED);
+        assertFalse(stateDescriptor.getTtlConfig().isEnabled());
+    }
+
     // ------------------------------------------------------------------------
     //  Mock implementations and test types
     // ------------------------------------------------------------------------

Reply via email to