This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 30994a1 [FLINK-25466][state] Enable state descriptor to handle
StateTtlConfig#DISABLED
30994a1 is described below
commit 30994a1788085034ed1b467a5df6253ee44b1da6
Author: Yun Tang <[email protected]>
AuthorDate: Tue Dec 28 15:53:33 2021 +0800
[FLINK-25466][state] Enable state descriptor to handle
StateTtlConfig#DISABLED
---
.../org/apache/flink/api/common/state/StateDescriptor.java | 9 +++++----
.../apache/flink/api/common/state/StateDescriptorTest.java | 13 +++++++++++++
2 files changed, 18 insertions(+), 4 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 05495e9..c985ca3 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -264,10 +264,11 @@ public abstract class StateDescriptor<S extends State, T>
implements Serializabl
*/
public void enableTimeToLive(StateTtlConfig ttlConfig) {
Preconditions.checkNotNull(ttlConfig);
- Preconditions.checkArgument(
- ttlConfig.getUpdateType() != StateTtlConfig.UpdateType.Disabled
- && queryableStateName == null,
- "Queryable state is currently not supported with TTL");
+ if (ttlConfig.isEnabled()) {
+ Preconditions.checkArgument(
+ queryableStateName == null,
+ "Queryable state is currently not supported with TTL");
+ }
this.ttlConfig = ttlConfig;
}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
index b8c7162..07f9b85 100644
---
a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
+++
b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
@@ -19,8 +19,10 @@
package org.apache.flink.api.common.state;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
@@ -251,6 +253,17 @@ public class StateDescriptorTest {
threads.clear();
}
+ @Test
+ public void testStateTTlConfig() {
+ ValueStateDescriptor<Integer> stateDescriptor =
+ new ValueStateDescriptor<>("test-state",
IntSerializer.INSTANCE);
+
stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.minutes(60)).build());
+ assertTrue(stateDescriptor.getTtlConfig().isEnabled());
+
+ stateDescriptor.enableTimeToLive(StateTtlConfig.DISABLED);
+ assertFalse(stateDescriptor.getTtlConfig().isEnabled());
+ }
+
// ------------------------------------------------------------------------
// Mock implementations and test types
// ------------------------------------------------------------------------