[FLINK-9701] [state] (follow up) Use StateTtlConfiguration.DISABLED instead of 
null, make it Serializable and add
convenience methods to its builder

This closes #6331.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1632681e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1632681e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1632681e

Branch: refs/heads/master
Commit: 1632681e41cbc1092a6b4d47a58adfffba6af5d4
Parents: 57872d5
Author: Andrey Zagrebin <azagre...@gmail.com>
Authored: Thu Jul 12 17:12:18 2018 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Fri Jul 13 18:32:57 2018 +0200

----------------------------------------------------------------------
 .../flink/api/common/state/StateDescriptor.java | 14 +++++---
 .../api/common/state/StateTtlConfiguration.java | 36 +++++++++++++++++++-
 .../runtime/state/ttl/TtlStateFactory.java      |  6 ++--
 3 files changed, 47 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1632681e/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 956fd05..191eb6f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.ByteArrayInputStream;
@@ -94,8 +95,8 @@ public abstract class StateDescriptor<S extends State, T> 
implements Serializabl
        private String queryableStateName;
 
        /** Name for queries against state created from this StateDescriptor. */
-       @Nullable
-       private StateTtlConfiguration ttlConfig;
+       @Nonnull
+       private StateTtlConfiguration ttlConfig = 
StateTtlConfiguration.DISABLED;
 
        /** The default value returned by the state when no other value is 
bound to a key. */
        @Nullable
@@ -208,7 +209,8 @@ public abstract class StateDescriptor<S extends State, T> 
implements Serializabl
         * @throws IllegalStateException If queryable state name already set
         */
        public void setQueryable(String queryableStateName) {
-               Preconditions.checkArgument(ttlConfig == null,
+               Preconditions.checkArgument(
+                       ttlConfig.getTtlUpdateType() == 
StateTtlConfiguration.TtlUpdateType.Disabled,
                        "Queryable state is currently not supported with TTL");
                if (this.queryableStateName == null) {
                        this.queryableStateName = 
Preconditions.checkNotNull(queryableStateName, "Registration name");
@@ -247,12 +249,14 @@ public abstract class StateDescriptor<S extends State, T> 
implements Serializabl
         */
        public void enableTimeToLive(StateTtlConfiguration ttlConfig) {
                Preconditions.checkNotNull(ttlConfig);
-               Preconditions.checkArgument(queryableStateName == null,
+               Preconditions.checkArgument(
+                       ttlConfig.getTtlUpdateType() != 
StateTtlConfiguration.TtlUpdateType.Disabled &&
+                               queryableStateName == null,
                        "Queryable state is currently not supported with TTL");
                this.ttlConfig = ttlConfig;
        }
 
-       @Nullable
+       @Nonnull
        @Internal
        public StateTtlConfiguration getTtlConfig() {
                return ttlConfig;

http://git-wip-us.apache.org/repos/asf/flink/blob/1632681e/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
index 9bd8b15..55ec29c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
@@ -21,6 +21,8 @@ package org.apache.flink.api.common.state;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.util.Preconditions;
 
+import java.io.Serializable;
+
 import static 
org.apache.flink.api.common.state.StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired;
 import static 
org.apache.flink.api.common.state.StateTtlConfiguration.TtlTimeCharacteristic.ProcessingTime;
 import static 
org.apache.flink.api.common.state.StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite;
@@ -28,11 +30,19 @@ import static 
org.apache.flink.api.common.state.StateTtlConfiguration.TtlUpdateT
 /**
  * Configuration of state TTL logic.
  */
-public class StateTtlConfiguration {
+public class StateTtlConfiguration implements Serializable {
+
+       private static final long serialVersionUID = -7592693245044289793L;
+
+       public static final StateTtlConfiguration DISABLED =
+               
newBuilder(Time.milliseconds(Long.MAX_VALUE)).setTtlUpdateType(TtlUpdateType.Disabled).build();
+
        /**
         * This option value configures when to update last access timestamp 
which prolongs state TTL.
         */
        public enum TtlUpdateType {
+               /** TTL is disabled. State does not expire. */
+               Disabled,
                /** Last access timestamp is initialised when state is created 
and updated on every write operation. */
                OnCreateAndWrite,
                /** The same as <code>OnCreateAndWrite</code> but also updated 
on read. */
@@ -91,6 +101,10 @@ public class StateTtlConfiguration {
                return timeCharacteristic;
        }
 
+       public boolean isEnabled() {
+               return ttlUpdateType != TtlUpdateType.Disabled;
+       }
+
        @Override
        public String toString() {
                return "StateTtlConfiguration{" +
@@ -129,6 +143,14 @@ public class StateTtlConfiguration {
                        return this;
                }
 
+               public Builder updateTtlOnCreateAndWrite() {
+                       return setTtlUpdateType(TtlUpdateType.OnCreateAndWrite);
+               }
+
+               public Builder updateTtlOnReadAndWrite() {
+                       return setTtlUpdateType(TtlUpdateType.OnReadAndWrite);
+               }
+
                /**
                 * Sets the state visibility.
                 *
@@ -139,6 +161,14 @@ public class StateTtlConfiguration {
                        return this;
                }
 
+               public Builder returnExpiredIfNotCleanedUp() {
+                       return 
setStateVisibility(TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
+               }
+
+               public Builder neverReturnExpired() {
+                       return 
setStateVisibility(TtlStateVisibility.NeverReturnExpired);
+               }
+
                /**
                 * Sets the time characteristic.
                 *
@@ -149,6 +179,10 @@ public class StateTtlConfiguration {
                        return this;
                }
 
+               public Builder useProcessingTime() {
+                       return 
setTimeCharacteristic(TtlTimeCharacteristic.ProcessingTime);
+               }
+
                /**
                 * Sets the ttl time.
                 * @param ttl The ttl time.

http://git-wip-us.apache.org/repos/asf/flink/blob/1632681e/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
index 5909ac7..e12ba90 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
@@ -54,10 +54,10 @@ public class TtlStateFactory {
                Preconditions.checkNotNull(stateDesc);
                Preconditions.checkNotNull(originalStateFactory);
                Preconditions.checkNotNull(timeProvider);
-               return stateDesc.getTtlConfig() == null ?
-                       
originalStateFactory.createInternalState(namespaceSerializer, stateDesc) :
+               return  stateDesc.getTtlConfig().isEnabled() ?
                        new TtlStateFactory(originalStateFactory, 
stateDesc.getTtlConfig(), timeProvider)
-                               .createState(namespaceSerializer, stateDesc);
+                               .createState(namespaceSerializer, stateDesc) :
+                       
originalStateFactory.createInternalState(namespaceSerializer, stateDesc);
        }
 
        private final Map<Class<? extends StateDescriptor>, KeyedStateFactory> 
stateFactories;

Reply via email to