[FLINK-9701] Introduce TTL configuration in state descriptors

This closes #6313.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f45b7f7f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f45b7f7f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f45b7f7f

Branch: refs/heads/master
Commit: f45b7f7ff27df019e9045895e718fa112d12139c
Parents: b407ba7
Author: Andrey Zagrebin <azagre...@gmail.com>
Authored: Tue Jul 3 19:23:41 2018 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Thu Jul 12 20:18:23 2018 +0200

----------------------------------------------------------------------
 .../flink/api/common/state/StateDescriptor.java |  28 ++
 .../api/common/state/StateTtlConfiguration.java |   3 -
 .../common/typeutils/CompositeSerializer.java   |  56 ++-
 .../KVStateRequestSerializerRocksDBTest.java    |   9 +-
 .../network/KvStateRequestSerializerTest.java   |   9 +-
 .../network/KvStateServerHandlerTest.java       |   4 +-
 .../state/AbstractKeyedStateBackend.java        |  47 ++-
 .../runtime/state/AbstractStateBackend.java     |  16 +-
 .../flink/runtime/state/KeyedStateFactory.java  |   2 +-
 .../flink/runtime/state/StateBackend.java       |  39 ++-
 .../state/filesystem/FsStateBackend.java        |  21 +-
 .../state/heap/HeapKeyedStateBackend.java       |   9 +-
 .../state/memory/MemoryStateBackend.java        |   7 +-
 .../runtime/state/ttl/AbstractTtlDecorator.java |   2 -
 .../flink/runtime/state/ttl/TtlMapState.java    |  97 ++++--
 .../runtime/state/ttl/TtlReducingState.java     |   2 +-
 .../runtime/state/ttl/TtlStateFactory.java      |  22 +-
 .../runtime/state/ttl/TtlTimeProvider.java      |   4 +-
 .../CheckpointSettingsSerializableTest.java     |   4 +-
 ...HeapKeyedStateBackendAsyncByDefaultTest.java |   4 +-
 .../runtime/state/StateBackendTestBase.java     |  15 +-
 .../state/StateSnapshotCompressionTest.java     |  17 +-
 ...pKeyedStateBackendSnapshotMigrationTest.java |   4 +-
 .../state/heap/HeapStateBackendTestBase.java    |   4 +-
 .../runtime/state/ttl/HeapTtlStateTest.java     |  35 ++
 .../runtime/state/ttl/MockTimeProvider.java     |  28 --
 .../runtime/state/ttl/MockTtlStateTest.java     |  35 ++
 .../runtime/state/ttl/MockTtlTimeProvider.java  |  28 ++
 .../state/ttl/StateBackendTestContext.java      | 125 +++++++
 .../state/ttl/TtlAggregatingStateTest.java      |  90 -----
 .../ttl/TtlAggregatingStateTestContext.java     | 103 ++++++
 .../runtime/state/ttl/TtlFoldingStateTest.java  |  54 ---
 .../state/ttl/TtlFoldingStateTestContext.java   |  67 ++++
 .../runtime/state/ttl/TtlListStateTest.java     |  75 ----
 .../state/ttl/TtlListStateTestContext.java      |  87 +++++
 .../ttl/TtlMapStateAllEntriesTestContext.java   |  66 ++++
 .../state/ttl/TtlMapStatePerElementTest.java    |  42 ---
 .../ttl/TtlMapStatePerElementTestContext.java   |  53 +++
 .../runtime/state/ttl/TtlMapStateTest.java      |  55 ---
 .../runtime/state/ttl/TtlMapStateTestBase.java  |  33 --
 .../state/ttl/TtlMapStateTestContext.java       |  35 ++
 .../runtime/state/ttl/TtlMergingStateBase.java  | 126 -------
 .../state/ttl/TtlMergingStateTestContext.java   |  99 ++++++
 .../runtime/state/ttl/TtlReducingStateTest.java |  71 ----
 .../state/ttl/TtlReducingStateTestContext.java  |  84 +++++
 .../runtime/state/ttl/TtlStateTestBase.java     | 340 +++++++++++++++----
 .../state/ttl/TtlStateTestContextBase.java      |  52 +++
 .../runtime/state/ttl/TtlValueStateTest.java    |  51 ---
 .../state/ttl/TtlValueStateTestContext.java     |  64 ++++
 .../state/ttl/mock/MockInternalKvState.java     |  29 +-
 .../state/ttl/mock/MockInternalMapState.java    |  10 +-
 .../state/ttl/mock/MockKeyedStateBackend.java   | 218 ++++++++++++
 .../state/ttl/mock/MockKeyedStateFactory.java   |  64 ----
 .../state/ttl/mock/MockStateBackend.java        | 111 ++++++
 .../state/RocksDBKeyedStateBackend.java         |   9 +-
 .../streaming/state/RocksDBStateBackend.java    |   7 +-
 .../state/RocksDBStateBackendTest.java          |   4 +-
 .../streaming/state/RocksDBTtlStateTest.java    |  62 ++++
 .../StreamTaskStateInitializerImpl.java         |   4 +-
 .../StreamTaskStateInitializerImplTest.java     |   4 +-
 .../tasks/StreamTaskTerminationTest.java        |   4 +-
 .../tasks/TestSpyWrapperStateBackend.java       |   7 +-
 .../streaming/runtime/StateBackendITCase.java   |  16 +-
 63 files changed, 1933 insertions(+), 939 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 6c54e71..956fd05 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.state;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -92,6 +93,10 @@ public abstract class StateDescriptor<S extends State, T> 
implements Serializabl
        @Nullable
        private String queryableStateName;
 
+       /** Name for queries against state created from this StateDescriptor. */
+       @Nullable
+       private StateTtlConfiguration ttlConfig;
+
        /** The default value returned by the state when no other value is 
bound to a key. */
        @Nullable
        protected transient T defaultValue;
@@ -203,6 +208,8 @@ public abstract class StateDescriptor<S extends State, T> 
implements Serializabl
         * @throws IllegalStateException If queryable state name already set
         */
        public void setQueryable(String queryableStateName) {
+               Preconditions.checkArgument(ttlConfig == null,
+                       "Queryable state is currently not supported with TTL");
                if (this.queryableStateName == null) {
                        this.queryableStateName = 
Preconditions.checkNotNull(queryableStateName, "Registration name");
                } else {
@@ -230,6 +237,27 @@ public abstract class StateDescriptor<S extends State, T> 
implements Serializabl
                return queryableStateName != null;
        }
 
+       /**
+        * Configures optional activation of state time-to-live (TTL).
+        *
+        * <p>State user value will expire, become unavailable and be cleaned 
up in storage
+        * depending on configured {@link StateTtlConfiguration}.
+        *
+        * @param ttlConfig configuration of state TTL
+        */
+       public void enableTimeToLive(StateTtlConfiguration ttlConfig) {
+               Preconditions.checkNotNull(ttlConfig);
+               Preconditions.checkArgument(queryableStateName == null,
+                       "Queryable state is currently not supported with TTL");
+               this.ttlConfig = ttlConfig;
+       }
+
+       @Nullable
+       @Internal
+       public StateTtlConfiguration getTtlConfig() {
+               return ttlConfig;
+       }
+
        // 
------------------------------------------------------------------------
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
index 8ef2046..9bd8b15 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
@@ -27,15 +27,12 @@ import static 
org.apache.flink.api.common.state.StateTtlConfiguration.TtlUpdateT
 
 /**
  * Configuration of state TTL logic.
- * TODO: builder
  */
 public class StateTtlConfiguration {
        /**
         * This option value configures when to update last access timestamp 
which prolongs state TTL.
         */
        public enum TtlUpdateType {
-               /** TTL is disabled. State does not expire. */
-               Disabled,
                /** Last access timestamp is initialised when state is created 
and updated on every write operation. */
                OnCreateAndWrite,
                /** The same as <code>OnCreateAndWrite</code> but also updated 
on read. */

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
index 43c5533..2db7a30 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
@@ -50,7 +50,7 @@ public abstract class CompositeSerializer<T> extends 
TypeSerializer<T> {
        @SuppressWarnings("unchecked")
        protected CompositeSerializer(boolean immutableTargetType, 
TypeSerializer<?> ... fieldSerializers) {
                this(
-                       new PrecomputedParameters(immutableTargetType, 
(TypeSerializer<Object>[]) fieldSerializers),
+                       PrecomputedParameters.precompute(immutableTargetType, 
(TypeSerializer<Object>[]) fieldSerializers),
                        fieldSerializers);
        }
 
@@ -187,6 +187,7 @@ public abstract class CompositeSerializer<T> extends 
TypeSerializer<T> {
                return 31 * Boolean.hashCode(precomputed.immutableTargetType) + 
Arrays.hashCode(fieldSerializers);
        }
 
+       @SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
        @Override
        public boolean equals(Object obj) {
                if (canEqual(obj)) {
@@ -205,17 +206,12 @@ public abstract class CompositeSerializer<T> extends 
TypeSerializer<T> {
 
        @Override
        public TypeSerializerConfigSnapshot snapshotConfiguration() {
-               return new 
CompositeTypeSerializerConfigSnapshot(fieldSerializers) {
-                       @Override
-                       public int getVersion() {
-                               return 0;
-                       }
-               };
+               return new ConfigSnapshot(fieldSerializers);
        }
 
        @Override
        public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-               if (configSnapshot instanceof 
CompositeTypeSerializerConfigSnapshot) {
+               if (configSnapshot instanceof ConfigSnapshot) {
                        List<Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> previousSerializersAndConfigs =
                                ((CompositeTypeSerializerConfigSnapshot) 
configSnapshot).getNestedSerializersAndConfigs();
                        if (previousSerializersAndConfigs.size() == 
fieldSerializers.length) {
@@ -242,11 +238,7 @@ public abstract class CompositeSerializer<T> extends 
TypeSerializer<T> {
                                }
                        }
                }
-               PrecomputedParameters precomputed =
-                       new 
PrecomputedParameters(this.precomputed.immutableTargetType, convertSerializers);
-               return requiresMigration ?
-                       
CompatibilityResult.requiresMigration(createSerializerInstance(precomputed, 
convertSerializers)) :
-                       CompatibilityResult.compatible();
+               return requiresMigration ? 
createMigrationCompatResult(convertSerializers) : 
CompatibilityResult.compatible();
        }
 
        private CompatibilityResult<Object> resolveFieldCompatibility(
@@ -256,6 +248,12 @@ public abstract class CompositeSerializer<T> extends 
TypeSerializer<T> {
                        previousSerializersAndConfigs.get(index).f1, 
fieldSerializers[index]);
        }
 
+       private CompatibilityResult<T> 
createMigrationCompatResult(TypeSerializer<Object>[] convertSerializers) {
+               PrecomputedParameters precomputed =
+                       
PrecomputedParameters.precompute(this.precomputed.immutableTargetType, 
convertSerializers);
+               return 
CompatibilityResult.requiresMigration(createSerializerInstance(precomputed, 
convertSerializers));
+       }
+
        /** This class holds composite serializer parameters which can be 
precomputed in advanced for better performance. */
        protected static class PrecomputedParameters implements Serializable {
                private static final long serialVersionUID = 1L;
@@ -272,7 +270,14 @@ public abstract class CompositeSerializer<T> extends 
TypeSerializer<T> {
                /** Whether any field serializer is stateful. */
                final boolean stateful;
 
-               PrecomputedParameters(
+               private PrecomputedParameters(boolean immutableTargetType, 
boolean immutable, int length, boolean stateful) {
+                       this.immutableTargetType = immutableTargetType;
+                       this.immutable = immutable;
+                       this.length = length;
+                       this.stateful = stateful;
+               }
+
+               static PrecomputedParameters precompute(
                        boolean immutableTargetType,
                        TypeSerializer<Object>[] fieldSerializers) {
                        Preconditions.checkNotNull(fieldSerializers);
@@ -292,11 +297,26 @@ public abstract class CompositeSerializer<T> extends 
TypeSerializer<T> {
                                }
                                totalLength = totalLength >= 0 ? totalLength + 
fieldSerializer.getLength() : totalLength;
                        }
+                       return new PrecomputedParameters(immutableTargetType, 
fieldsImmutable, totalLength, stateful);
+               }
+       }
 
-                       this.immutableTargetType = immutableTargetType;
-                       this.immutable = immutableTargetType && fieldsImmutable;
-                       this.length = totalLength;
-                       this.stateful = stateful;
+       /** Snapshot field serializers of composite type. */
+       public static class ConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
+               private static final int VERSION = 0;
+
+               /** This empty nullary constructor is required for 
deserializing the configuration. */
+               @SuppressWarnings("unused")
+               public ConfigSnapshot() {
+               }
+
+               ConfigSnapshot(@Nonnull TypeSerializer<?>... nestedSerializers) 
{
+                       super(nestedSerializers);
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
index 9ea3198..176e55a 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -80,12 +81,13 @@ public final class KVStateRequestSerializerRocksDBTest {
                                new ExecutionConfig(),
                                false,
                                TestLocalRecoveryConfig.disabled(),
-                               RocksDBStateBackend.PriorityQueueStateType.HEAP
+                               RocksDBStateBackend.PriorityQueueStateType.HEAP,
+                               TtlTimeProvider.DEFAULT
                        );
                longHeapKeyedStateBackend.restore(null);
                longHeapKeyedStateBackend.setCurrentKey(key);
 
-               final InternalListState<Long, VoidNamespace, Long> listState = 
longHeapKeyedStateBackend.createState(VoidNamespaceSerializer.INSTANCE,
+               final InternalListState<Long, VoidNamespace, Long> listState = 
longHeapKeyedStateBackend.createInternalState(VoidNamespaceSerializer.INSTANCE,
                                new ListStateDescriptor<>("test", 
LongSerializer.INSTANCE));
 
                KvStateRequestSerializerTest.testListSerialization(key, 
listState);
@@ -121,7 +123,8 @@ public final class KVStateRequestSerializerRocksDBTest {
                                new ExecutionConfig(),
                                false,
                                TestLocalRecoveryConfig.disabled(),
-                               
RocksDBStateBackend.PriorityQueueStateType.HEAP);
+                               RocksDBStateBackend.PriorityQueueStateType.HEAP,
+                               TtlTimeProvider.DEFAULT);
                longHeapKeyedStateBackend.restore(null);
                longHeapKeyedStateBackend.setCurrentKey(key);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
index 73f8831..d539066 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -198,11 +199,12 @@ public class KvStateRequestSerializerTest {
                                async,
                                new ExecutionConfig(),
                                TestLocalRecoveryConfig.disabled(),
-                               new HeapPriorityQueueSetFactory(keyGroupRange, 
keyGroupRange.getNumberOfKeyGroups(), 128)
+                               new HeapPriorityQueueSetFactory(keyGroupRange, 
keyGroupRange.getNumberOfKeyGroups(), 128),
+                               TtlTimeProvider.DEFAULT
                        );
                longHeapKeyedStateBackend.setCurrentKey(key);
 
-               final InternalListState<Long, VoidNamespace, Long> listState = 
longHeapKeyedStateBackend.createState(
+               final InternalListState<Long, VoidNamespace, Long> listState = 
longHeapKeyedStateBackend.createInternalState(
                        VoidNamespaceSerializer.INSTANCE,
                        new ListStateDescriptor<>("test", 
LongSerializer.INSTANCE));
 
@@ -306,7 +308,8 @@ public class KvStateRequestSerializerTest {
                                async,
                                new ExecutionConfig(),
                                TestLocalRecoveryConfig.disabled(),
-                               new HeapPriorityQueueSetFactory(keyGroupRange, 
keyGroupRange.getNumberOfKeyGroups(), 128)
+                               new HeapPriorityQueueSetFactory(keyGroupRange, 
keyGroupRange.getNumberOfKeyGroups(), 128),
+                               TtlTimeProvider.DEFAULT
                        );
                longHeapKeyedStateBackend.setCurrentKey(key);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index 9947dac..adcf3ae 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
@@ -761,6 +762,7 @@ public class KvStateServerHandlerTest extends TestLogger {
                        IntSerializer.INSTANCE,
                        numKeyGroups,
                        new KeyGroupRange(0, 0),
-                       registry.createTaskRegistry(dummyEnv.getJobID(), 
dummyEnv.getJobVertexId()));
+                       registry.createTaskRegistry(dummyEnv.getJobID(), 
dummyEnv.getJobVertexId()),
+                       TtlTimeProvider.DEFAULT);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 8ce25b6..c7f1bd9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -27,6 +27,8 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.ttl.TtlStateFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -84,6 +86,8 @@ public abstract class AbstractKeyedStateBackend<K> implements
 
        private final ExecutionConfig executionConfig;
 
+       private final TtlTimeProvider ttlTimeProvider;
+
        /** Decorates the input and output streams to write key-groups 
compressed. */
        protected final StreamCompressionDecorator keyGroupCompressionDecorator;
 
@@ -93,7 +97,8 @@ public abstract class AbstractKeyedStateBackend<K> implements
                ClassLoader userCodeClassLoader,
                int numberOfKeyGroups,
                KeyGroupRange keyGroupRange,
-               ExecutionConfig executionConfig) {
+               ExecutionConfig executionConfig,
+               TtlTimeProvider ttlTimeProvider) {
 
                this.kvStateRegistry = kvStateRegistry;
                this.keySerializer = Preconditions.checkNotNull(keySerializer);
@@ -104,6 +109,7 @@ public abstract class AbstractKeyedStateBackend<K> 
implements
                this.keyValueStatesByName = new HashMap<>();
                this.executionConfig = executionConfig;
                this.keyGroupCompressionDecorator = 
determineStreamCompression(executionConfig);
+               this.ttlTimeProvider = 
Preconditions.checkNotNull(ttlTimeProvider);
        }
 
        private StreamCompressionDecorator 
determineStreamCompression(ExecutionConfig executionConfig) {
@@ -220,40 +226,33 @@ public abstract class AbstractKeyedStateBackend<K> 
implements
        public <N, S extends State, V> S getOrCreateKeyedState(
                        final TypeSerializer<N> namespaceSerializer,
                        StateDescriptor<S, V> stateDescriptor) throws Exception 
{
-
                checkNotNull(namespaceSerializer, "Namespace serializer");
+               checkNotNull(keySerializer, "State key serializer has not been 
configured in the config. " +
+                               "This operation cannot use partitioned state.");
 
-               if (keySerializer == null) {
-                       throw new UnsupportedOperationException(
-                                       "State key serializer has not been 
configured in the config. " +
-                                       "This operation cannot use partitioned 
state.");
-               }
-
-               if (!stateDescriptor.isSerializerInitialized()) {
-                       
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
-               }
-
-               InternalKvState<K, ?, ?> existing = 
keyValueStatesByName.get(stateDescriptor.getName());
-               if (existing != null) {
-                       @SuppressWarnings("unchecked")
-                       S typedState = (S) existing;
-                       return typedState;
+               InternalKvState<K, ?, ?> kvState = 
keyValueStatesByName.get(stateDescriptor.getName());
+               if (kvState == null) {
+                       if (!stateDescriptor.isSerializerInitialized()) {
+                               
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
+                       }
+                       kvState = 
TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
+                               namespaceSerializer, stateDescriptor, this, 
ttlTimeProvider);
+                       keyValueStatesByName.put(stateDescriptor.getName(), 
kvState);
+                       publishQueryableStateIfEnabled(stateDescriptor, 
kvState);
                }
+               return (S) kvState;
+       }
 
-               InternalKvState<K, N, ?> kvState = 
createState(namespaceSerializer, stateDescriptor);
-               keyValueStatesByName.put(stateDescriptor.getName(), kvState);
-
-               // Publish queryable state
+       private void publishQueryableStateIfEnabled(
+               StateDescriptor<?, ?> stateDescriptor,
+               InternalKvState<?, ?, ?> kvState) {
                if (stateDescriptor.isQueryable()) {
                        if (kvStateRegistry == null) {
                                throw new IllegalStateException("State backend 
has not been initialized for job.");
                        }
-
                        String name = stateDescriptor.getQueryableStateName();
                        kvStateRegistry.registerKvState(keyGroupRange, name, 
kvState);
                }
-
-               return (S) kvState;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 7e9c357..d397a88 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
 import java.io.IOException;
 
@@ -42,13 +43,14 @@ public abstract class AbstractStateBackend implements 
StateBackend, java.io.Seri
 
        @Override
        public abstract <K> AbstractKeyedStateBackend<K> 
createKeyedStateBackend(
-                       Environment env,
-                       JobID jobID,
-                       String operatorIdentifier,
-                       TypeSerializer<K> keySerializer,
-                       int numberOfKeyGroups,
-                       KeyGroupRange keyGroupRange,
-                       TaskKvStateRegistry kvStateRegistry) throws IOException;
+               Environment env,
+               JobID jobID,
+               String operatorIdentifier,
+               TypeSerializer<K> keySerializer,
+               int numberOfKeyGroups,
+               KeyGroupRange keyGroupRange,
+               TaskKvStateRegistry kvStateRegistry,
+               TtlTimeProvider ttlTimeProvider) throws IOException;
 
        @Override
        public abstract OperatorStateBackend createOperatorStateBackend(

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java
index dd251bd..de35979 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java
@@ -36,7 +36,7 @@ public interface KeyedStateFactory {
         * @param <S> The type of the public API state.
         * @param <IS> The type of internal state.
         */
-       <N, SV, S extends State, IS extends S> IS createState(
+       <N, SV, S extends State, IS extends S> IS createInternalState(
                TypeSerializer<N> namespaceSerializer,
                StateDescriptor<S, SV> stateDesc) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index f34cd9b..2775b71 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
 import java.io.IOException;
 
@@ -117,7 +118,7 @@ public interface StateBackend extends java.io.Serializable {
 
        /**
         * Creates a new {@link AbstractKeyedStateBackend} that is responsible 
for holding <b>keyed state</b>
-        * and checkpointing it.
+        * and checkpointing it. Uses default TTL time provider.
         *
         * <p><i>Keyed State</i> is state where each value is bound to a key.
         *
@@ -127,14 +128,46 @@ public interface StateBackend extends 
java.io.Serializable {
         *
         * @throws Exception This method may forward all exceptions that occur 
while instantiating the backend.
         */
-       <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+       default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
                        Environment env,
                        JobID jobID,
                        String operatorIdentifier,
                        TypeSerializer<K> keySerializer,
                        int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
-                       TaskKvStateRegistry kvStateRegistry) throws Exception;
+                       TaskKvStateRegistry kvStateRegistry) throws Exception {
+               return createKeyedStateBackend(
+                       env,
+                       jobID,
+                       operatorIdentifier,
+                       keySerializer,
+                       numberOfKeyGroups,
+                       keyGroupRange,
+                       kvStateRegistry,
+                       TtlTimeProvider.DEFAULT);
+       }
+
+       /**
+        * Creates a new {@link AbstractKeyedStateBackend} that is responsible 
for holding <b>keyed state</b>
+        * and checkpointing it.
+        *
+        * <p><i>Keyed State</i> is state where each value is bound to a key.
+        *
+        * @param <K> The type of the keys by which the state is organized.
+        *
+        * @return The Keyed State Backend for the given job, operator, and key 
group range.
+        *
+        * @throws Exception This method may forward all exceptions that occur 
while instantiating the backend.
+        */
+       <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+               Environment env,
+               JobID jobID,
+               String operatorIdentifier,
+               TypeSerializer<K> keySerializer,
+               int numberOfKeyGroups,
+               KeyGroupRange keyGroupRange,
+               TaskKvStateRegistry kvStateRegistry,
+               TtlTimeProvider ttlTimeProvider) throws Exception;
        
        /**
         * Creates a new {@link OperatorStateBackend} that can be used for 
storing operator state.

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index ad1581b..f5a86e1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.TernaryBoolean;
 
 import org.slf4j.LoggerFactory;
@@ -93,7 +94,7 @@ public class FsStateBackend extends AbstractFileStateBackend 
implements Configur
        private static final long serialVersionUID = -8191916350224044011L;
 
        /** Maximum size of state that is stored with the metadata, rather than 
in files (1 MiByte). */
-       public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
+       private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
 
        // 
------------------------------------------------------------------------
 
@@ -448,13 +449,14 @@ public class FsStateBackend extends 
AbstractFileStateBackend implements Configur
 
        @Override
        public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
-                       Environment env,
-                       JobID jobID,
-                       String operatorIdentifier,
-                       TypeSerializer<K> keySerializer,
-                       int numberOfKeyGroups,
-                       KeyGroupRange keyGroupRange,
-                       TaskKvStateRegistry kvStateRegistry) {
+               Environment env,
+               JobID jobID,
+               String operatorIdentifier,
+               TypeSerializer<K> keySerializer,
+               int numberOfKeyGroups,
+               KeyGroupRange keyGroupRange,
+               TaskKvStateRegistry kvStateRegistry,
+               TtlTimeProvider ttlTimeProvider) {
 
                TaskStateManager taskStateManager = env.getTaskStateManager();
                LocalRecoveryConfig localRecoveryConfig = 
taskStateManager.createLocalRecoveryConfig();
@@ -470,7 +472,8 @@ public class FsStateBackend extends 
AbstractFileStateBackend implements Configur
                                isUsingAsynchronousSnapshots(),
                                env.getExecutionConfig(),
                                localRecoveryConfig,
-                               priorityQueueSetFactory);
+                               priorityQueueSetFactory,
+                               ttlTimeProvider);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 562c93d..495dfe0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -63,6 +63,7 @@ import org.apache.flink.runtime.state.StateSnapshot;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -171,9 +172,11 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        boolean asynchronousSnapshots,
                        ExecutionConfig executionConfig,
                        LocalRecoveryConfig localRecoveryConfig,
-                       PriorityQueueSetFactory priorityQueueSetFactory) {
+                       PriorityQueueSetFactory priorityQueueSetFactory,
+                       TtlTimeProvider ttlTimeProvider) {
 
-               super(kvStateRegistry, keySerializer, userCodeClassLoader, 
numberOfKeyGroups, keyGroupRange, executionConfig);
+               super(kvStateRegistry, keySerializer, userCodeClassLoader,
+                       numberOfKeyGroups, keyGroupRange, executionConfig, 
ttlTimeProvider);
                this.localRecoveryConfig = 
Preconditions.checkNotNull(localRecoveryConfig);
 
                SnapshotStrategySynchronicityBehavior<K> synchronicityTrait = 
asynchronousSnapshots ?
@@ -241,7 +244,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        }
 
        @Override
-       public <N, SV, S extends State, IS extends S> IS createState(
+       public <N, SV, S extends State, IS extends S> IS createInternalState(
                TypeSerializer<N> namespaceSerializer,
                StateDescriptor<S, SV> stateDesc) throws Exception {
                StateFactory stateFactory = 
STATE_FACTORIES.get(stateDesc.getClass());

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index d78944c..1c464d7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.TernaryBoolean;
 
 import javax.annotation.Nullable;
@@ -307,7 +308,8 @@ public class MemoryStateBackend extends 
AbstractFileStateBackend implements Conf
                        TypeSerializer<K> keySerializer,
                        int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
-                       TaskKvStateRegistry kvStateRegistry) {
+                       TaskKvStateRegistry kvStateRegistry,
+                       TtlTimeProvider ttlTimeProvider) {
 
                TaskStateManager taskStateManager = env.getTaskStateManager();
                HeapPriorityQueueSetFactory priorityQueueSetFactory =
@@ -321,7 +323,8 @@ public class MemoryStateBackend extends 
AbstractFileStateBackend implements Conf
                                isUsingAsynchronousSnapshots(),
                                env.getExecutionConfig(),
                                taskStateManager.createLocalRecoveryConfig(),
-                               priorityQueueSetFactory);
+                               priorityQueueSetFactory,
+                               ttlTimeProvider);
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
index 29a575a..1b72c54 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
@@ -55,8 +55,6 @@ abstract class AbstractTtlDecorator<T> {
                Preconditions.checkNotNull(original);
                Preconditions.checkNotNull(config);
                Preconditions.checkNotNull(timeProvider);
-               Preconditions.checkArgument(config.getTtlUpdateType() != 
StateTtlConfiguration.TtlUpdateType.Disabled,
-                       "State does not need to be wrapped with TTL if it is 
configured as disabled.");
                this.original = original;
                this.config = config;
                this.timeProvider = timeProvider;

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
index 98d7c52..21145e5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
@@ -23,13 +23,15 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.FlinkRuntimeException;
 
+import javax.annotation.Nonnull;
+
 import java.util.AbstractMap;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
 
 /**
  * This class wraps map state with TTL logic.
@@ -84,52 +86,89 @@ class TtlMapState<K, N, UK, UV>
 
        @Override
        public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
-               return entriesStream()::iterator;
+               return entries(e -> e);
        }
 
-       private Stream<Map.Entry<UK, UV>> entriesStream() throws Exception {
+       private <R> Iterable<R> entries(
+               Function<Map.Entry<UK, UV>, R> resultMapper) throws Exception {
                Iterable<Map.Entry<UK, TtlValue<UV>>> withTs = 
original.entries();
-               withTs = withTs == null ? Collections.emptyList() : withTs;
-               return StreamSupport
-                       .stream(withTs.spliterator(), false)
-                       .filter(this::unexpiredAndUpdateOrCleanup)
-                       .map(TtlMapState::unwrapWithoutTs);
-       }
-
-       private boolean unexpiredAndUpdateOrCleanup(Map.Entry<UK, TtlValue<UV>> 
e) {
-               UV unexpiredValue;
-               try {
-                       unexpiredValue = getWithTtlCheckAndUpdate(
-                               e::getValue,
-                               v -> original.put(e.getKey(), v),
-                               () -> original.remove(e.getKey()));
-               } catch (Exception ex) {
-                       throw new FlinkRuntimeException(ex);
-               }
-               return unexpiredValue != null;
-       }
-
-       private static <UK, UV> Map.Entry<UK, UV> unwrapWithoutTs(Map.Entry<UK, 
TtlValue<UV>> e) {
-               return new AbstractMap.SimpleEntry<>(e.getKey(), 
e.getValue().getUserValue());
+               return () -> new EntriesIterator<>(withTs == null ? 
Collections.emptyList() : withTs, resultMapper);
        }
 
        @Override
        public Iterable<UK> keys() throws Exception {
-               return entriesStream().map(Map.Entry::getKey)::iterator;
+               return entries(Map.Entry::getKey);
        }
 
        @Override
        public Iterable<UV> values() throws Exception {
-               return entriesStream().map(Map.Entry::getValue)::iterator;
+               return entries(Map.Entry::getValue);
        }
 
        @Override
        public Iterator<Map.Entry<UK, UV>> iterator() throws Exception {
-               return entriesStream().iterator();
+               return entries().iterator();
        }
 
        @Override
        public void clear() {
                original.clear();
        }
+
+       private class EntriesIterator<R> implements Iterator<R> {
+               private final Iterator<Map.Entry<UK, TtlValue<UV>>> 
originalIterator;
+               private final Function<Map.Entry<UK, UV>, R> resultMapper;
+               private Map.Entry<UK, UV> nextUnexpired = null;
+               private boolean rightAfterNextIsCalled = false;
+
+               private EntriesIterator(
+                       @Nonnull Iterable<Map.Entry<UK, TtlValue<UV>>> withTs,
+                       @Nonnull Function<Map.Entry<UK, UV>, R> resultMapper) {
+                       this.originalIterator = withTs.iterator();
+                       this.resultMapper = resultMapper;
+               }
+
+               @Override
+               public boolean hasNext() {
+                       rightAfterNextIsCalled = false;
+                       while (nextUnexpired == null && 
originalIterator.hasNext()) {
+                               nextUnexpired = 
getUnexpiredAndUpdateOrCleanup(originalIterator.next());
+                       }
+                       return nextUnexpired != null;
+               }
+
+               @Override
+               public R next() {
+                       if (hasNext()) {
+                               rightAfterNextIsCalled = true;
+                               R result = resultMapper.apply(nextUnexpired);
+                               nextUnexpired = null;
+                               return result;
+                       }
+                       throw new NoSuchElementException();
+               }
+
+               @Override
+               public void remove() {
+                       if (rightAfterNextIsCalled) {
+                               originalIterator.remove();
+                       } else {
+                               throw new IllegalStateException("next() has not 
been called or hasNext() has been called afterwards," +
+                                       " remove() is supported only right 
after calling next()");
+                       }
+               }
+
+               private Map.Entry<UK, UV> 
getUnexpiredAndUpdateOrCleanup(Map.Entry<UK, TtlValue<UV>> e) {
+                       UV unexpiredValue;
+                       try {
+                               unexpiredValue = getWithTtlCheckAndUpdate(
+                                       e::getValue,
+                                       v -> original.put(e.getKey(), v),
+                                       originalIterator::remove);
+                       } catch (Exception ex) {
+                               throw new FlinkRuntimeException(ex);
+                       }
+                       return unexpiredValue == null ? null : new 
AbstractMap.SimpleEntry<>(e.getKey(), unexpiredValue);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
index 01e4be9..c0aa465 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
@@ -49,7 +49,7 @@ class TtlReducingState<K, N, T>
 
        @Override
        public void add(T value) throws Exception {
-               original.add(wrapWithTs(value, Long.MAX_VALUE));
+               original.add(wrapWithTs(value));
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
index 82096a6..5909ac7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
@@ -49,16 +49,14 @@ public class TtlStateFactory {
                TypeSerializer<N> namespaceSerializer,
                StateDescriptor<S, SV> stateDesc,
                KeyedStateFactory originalStateFactory,
-               StateTtlConfiguration ttlConfig,
                TtlTimeProvider timeProvider) throws Exception {
                Preconditions.checkNotNull(namespaceSerializer);
                Preconditions.checkNotNull(stateDesc);
                Preconditions.checkNotNull(originalStateFactory);
-               Preconditions.checkNotNull(ttlConfig);
                Preconditions.checkNotNull(timeProvider);
-               return ttlConfig.getTtlUpdateType() == 
StateTtlConfiguration.TtlUpdateType.Disabled ?
-                       originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
-                       new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+               return stateDesc.getTtlConfig() == null ?
+                       
originalStateFactory.createInternalState(namespaceSerializer, stateDesc) :
+                       new TtlStateFactory(originalStateFactory, 
stateDesc.getTtlConfig(), timeProvider)
                                .createState(namespaceSerializer, stateDesc);
        }
 
@@ -96,7 +94,7 @@ public class TtlStateFactory {
                                stateDesc.getClass(), TtlStateFactory.class);
                        throw new FlinkRuntimeException(message);
                }
-               return stateFactory.createState(namespaceSerializer, stateDesc);
+               return stateFactory.createInternalState(namespaceSerializer, 
stateDesc);
        }
 
        @SuppressWarnings("unchecked")
@@ -106,7 +104,7 @@ public class TtlStateFactory {
                ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new 
ValueStateDescriptor<>(
                        stateDesc.getName(), new 
TtlSerializer<>(stateDesc.getSerializer()));
                return (IS) new TtlValueState<>(
-                       originalStateFactory.createState(namespaceSerializer, 
ttlDescriptor),
+                       
originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor),
                        ttlConfig, timeProvider, stateDesc.getSerializer());
        }
 
@@ -118,7 +116,7 @@ public class TtlStateFactory {
                ListStateDescriptor<TtlValue<T>> ttlDescriptor = new 
ListStateDescriptor<>(
                        stateDesc.getName(), new 
TtlSerializer<>(listStateDesc.getElementSerializer()));
                return (IS) new TtlListState<>(
-                       originalStateFactory.createState(namespaceSerializer, 
ttlDescriptor),
+                       
originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor),
                        ttlConfig, timeProvider, listStateDesc.getSerializer());
        }
 
@@ -132,7 +130,7 @@ public class TtlStateFactory {
                        mapStateDesc.getKeySerializer(),
                        new TtlSerializer<>(mapStateDesc.getValueSerializer()));
                return (IS) new TtlMapState<>(
-                       originalStateFactory.createState(namespaceSerializer, 
ttlDescriptor),
+                       
originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor),
                        ttlConfig, timeProvider, mapStateDesc.getSerializer());
        }
 
@@ -146,7 +144,7 @@ public class TtlStateFactory {
                        new 
TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, 
timeProvider),
                        new TtlSerializer<>(stateDesc.getSerializer()));
                return (IS) new TtlReducingState<>(
-                       originalStateFactory.createState(namespaceSerializer, 
ttlDescriptor),
+                       
originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor),
                        ttlConfig, timeProvider, stateDesc.getSerializer());
        }
 
@@ -161,7 +159,7 @@ public class TtlStateFactory {
                AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor 
= new AggregatingStateDescriptor<>(
                        stateDesc.getName(), ttlAggregateFunction, new 
TtlSerializer<>(stateDesc.getSerializer()));
                return (IS) new TtlAggregatingState<>(
-                       originalStateFactory.createState(namespaceSerializer, 
ttlDescriptor),
+                       
originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor),
                        ttlConfig, timeProvider, stateDesc.getSerializer(), 
ttlAggregateFunction);
        }
 
@@ -178,7 +176,7 @@ public class TtlStateFactory {
                        new 
TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, 
timeProvider, initAcc),
                        new TtlSerializer<>(stateDesc.getSerializer()));
                return (IS) new TtlFoldingState<>(
-                       originalStateFactory.createState(namespaceSerializer, 
ttlDescriptor),
+                       
originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor),
                        ttlConfig, timeProvider, stateDesc.getSerializer());
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java
index bac9d36..84809cc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.state.ttl;
 /**
  * Provides time to TTL logic to judge about state expiration.
  */
-interface TtlTimeProvider {
+public interface TtlTimeProvider {
+       TtlTimeProvider DEFAULT = System::currentTimeMillis;
+
        long currentTimestamp();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index 9456f10..ffebc52 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -41,6 +41,7 @@ import 
org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
@@ -168,7 +169,8 @@ public class CheckpointSettingsSerializableTest extends 
TestLogger {
                        TypeSerializer<K> keySerializer,
                        int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
-                       TaskKvStateRegistry kvStateRegistry) throws Exception {
+                       TaskKvStateRegistry kvStateRegistry,
+                       TtlTimeProvider ttlTimeProvider) throws Exception {
                        throw new UnsupportedOperationException();
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
index 1325431..6424a7a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.IOUtils;
 
 import org.junit.Rule;
@@ -70,7 +71,8 @@ public class HeapKeyedStateBackendAsyncByDefaultTest {
                        IntSerializer.INSTANCE,
                        1,
                        new KeyGroupRange(0, 0),
-                       null
+                       null,
+                       TtlTimeProvider.DEFAULT
                );
 
                assertTrue(keyedStateBackend.supportsAsynchronousSnapshots());

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 3c5756b..bfdc05d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -73,6 +73,7 @@ import 
org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
 import org.apache.flink.types.IntValue;
@@ -182,7 +183,8 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                keySerializer,
                                numberOfKeyGroups,
                                keyGroupRange,
-                               env.getTaskKvStateRegistry());
+                               env.getTaskKvStateRegistry(),
+                           TtlTimeProvider.DEFAULT);
 
                backend.restore(null);
 
@@ -219,7 +221,8 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        keySerializer,
                        numberOfKeyGroups,
                        keyGroupRange,
-                       env.getTaskKvStateRegistry());
+                       env.getTaskKvStateRegistry(),
+                       TtlTimeProvider.DEFAULT);
 
                backend.restore(new StateObjectCollection<>(state));
 
@@ -3545,7 +3548,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        }
 
                        // insert some data to the backend.
-                       InternalValueState<Integer, VoidNamespace, Integer> 
valueState = backend.createState(
+                       InternalValueState<Integer, VoidNamespace, Integer> 
valueState = backend.createInternalState(
                                VoidNamespaceSerializer.INSTANCE,
                                new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
 
@@ -3602,7 +3605,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                try {
                        backend = createKeyedBackend(IntSerializer.INSTANCE);
-                       InternalValueState<Integer, VoidNamespace, Integer> 
valueState = backend.createState(
+                       InternalValueState<Integer, VoidNamespace, Integer> 
valueState = backend.createInternalState(
                                        VoidNamespaceSerializer.INSTANCE,
                                        new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
 
@@ -3649,7 +3652,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                try {
                        backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
stateHandle);
 
-                       InternalValueState<Integer, VoidNamespace, Integer> 
valueState = backend.createState(
+                       InternalValueState<Integer, VoidNamespace, Integer> 
valueState = backend.createInternalState(
                                        VoidNamespaceSerializer.INSTANCE,
                                        new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
 
@@ -3791,7 +3794,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                return;
                        }
 
-                       InternalValueState<Integer, VoidNamespace, Integer> 
valueState = backend.createState(
+                       InternalValueState<Integer, VoidNamespace, Integer> 
valueState = backend.createInternalState(
                                        VoidNamespaceSerializer.INSTANCE,
                                        new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
index dfcdffc..558f629 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.commons.io.IOUtils;
@@ -54,7 +55,8 @@ public class StateSnapshotCompressionTest extends TestLogger {
                        true,
                        executionConfig,
                        TestLocalRecoveryConfig.disabled(),
-                       mock(PriorityQueueSetFactory.class));
+                       mock(PriorityQueueSetFactory.class),
+                       TtlTimeProvider.DEFAULT);
 
                try {
                        Assert.assertTrue(
@@ -77,7 +79,8 @@ public class StateSnapshotCompressionTest extends TestLogger {
                        true,
                        executionConfig,
                        TestLocalRecoveryConfig.disabled(),
-                       mock(PriorityQueueSetFactory.class));
+                       mock(PriorityQueueSetFactory.class),
+                       TtlTimeProvider.DEFAULT);
 
                try {
                        Assert.assertTrue(
@@ -118,12 +121,13 @@ public class StateSnapshotCompressionTest extends 
TestLogger {
                        true,
                        executionConfig,
                        TestLocalRecoveryConfig.disabled(),
-                       mock(PriorityQueueSetFactory.class));
+                       mock(PriorityQueueSetFactory.class),
+                       TtlTimeProvider.DEFAULT);
 
                try {
 
                        InternalValueState<String, VoidNamespace, String> state 
=
-                               stateBackend.createState(new 
VoidNamespaceSerializer(), stateDescriptor);
+                               stateBackend.createInternalState(new 
VoidNamespaceSerializer(), stateDescriptor);
 
                        stateBackend.setCurrentKey("A");
                        state.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -160,12 +164,13 @@ public class StateSnapshotCompressionTest extends 
TestLogger {
                        true,
                        executionConfig,
                        TestLocalRecoveryConfig.disabled(),
-                       mock(PriorityQueueSetFactory.class));
+                       mock(PriorityQueueSetFactory.class),
+                       TtlTimeProvider.DEFAULT);
                try {
 
                        
stateBackend.restore(StateObjectCollection.singleton(stateHandle));
 
-                       InternalValueState<String, VoidNamespace, String> state 
= stateBackend.createState(
+                       InternalValueState<String, VoidNamespace, String> state 
= stateBackend.createInternalState(
                                new VoidNamespaceSerializer(),
                                stateDescriptor);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
index 249d0c3..7b8d69f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
@@ -73,7 +73,7 @@ public class HeapKeyedStateBackendSnapshotMigrationTest 
extends HeapStateBackend
 
                        
keyedBackend.restore(StateObjectCollection.singleton(stateHandles.getJobManagerOwnedSnapshot()));
 
-                       InternalMapState<String, Integer, Long, Long> state = 
keyedBackend.createState(IntSerializer.INSTANCE, stateDescr);
+                       InternalMapState<String, Integer, Long, Long> state = 
keyedBackend.createInternalState(IntSerializer.INSTANCE, stateDescr);
 
                        keyedBackend.setCurrentKey("abc");
                        state.setCurrentNamespace(namespace1);
@@ -233,7 +233,7 @@ public class HeapKeyedStateBackendSnapshotMigrationTest 
extends HeapStateBackend
                        final ListStateDescriptor<Long> stateDescr = new 
ListStateDescriptor<>("my-state", Long.class);
                        stateDescr.initializeSerializerUnlessSet(new 
ExecutionConfig());
 
-                       InternalListState<String, Integer, Long> state = 
keyedBackend.createState(IntSerializer.INSTANCE, stateDescr);
+                       InternalListState<String, Integer, Long> state = 
keyedBackend.createInternalState(IntSerializer.INSTANCE, stateDescr);
 
                        assertEquals(7, keyedBackend.numStateEntries());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
index cf6aef4..0eddf3c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -61,6 +62,7 @@ public abstract class HeapStateBackendTestBase {
                        async,
                        new ExecutionConfig(),
                        TestLocalRecoveryConfig.disabled(),
-                       new HeapPriorityQueueSetFactory(keyGroupRange, 
numKeyGroups, 128));
+                       new HeapPriorityQueueSetFactory(keyGroupRange, 
numKeyGroups, 128),
+                       TtlTimeProvider.DEFAULT);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapTtlStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapTtlStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapTtlStateTest.java
new file mode 100644
index 0000000..06de4be
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapTtlStateTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+
+/** Test suite for heap state TTL. */
+public class HeapTtlStateTest extends TtlStateTestBase {
+       @Override
+       protected StateBackendTestContext 
createStateBackendTestContext(TtlTimeProvider timeProvider) {
+               return new StateBackendTestContext(timeProvider) {
+                       @Override
+                       protected StateBackend createStateBackend() {
+                               return new MemoryStateBackend(false);
+                       }
+               };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java
deleted file mode 100644
index e14c3f8..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-class MockTimeProvider implements TtlTimeProvider {
-       long time = 0;
-
-       @Override
-       public long currentTimestamp() {
-               return time;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlStateTest.java
new file mode 100644
index 0000000..392bdba
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlStateTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
+
+/** Test suite for mock state TTL. */
+public class MockTtlStateTest extends TtlStateTestBase {
+       @Override
+       protected StateBackendTestContext 
createStateBackendTestContext(TtlTimeProvider timeProvider) {
+               return new StateBackendTestContext(timeProvider) {
+                       @Override
+                       protected StateBackend createStateBackend() {
+                               return new MockStateBackend();
+                       }
+               };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlTimeProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlTimeProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlTimeProvider.java
new file mode 100644
index 0000000..f980043
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlTimeProvider.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+class MockTtlTimeProvider implements TtlTimeProvider {
+       long time = 0;
+
+       @Override
+       public long currentTimestamp() {
+               return time;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
new file mode 100644
index 0000000..eaec234
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.RunnableFuture;
+
+/** Base class for state backend test context. */
+public abstract class StateBackendTestContext {
+       private final StateBackend stateBackend;
+       private final CheckpointStorageLocation checkpointStorageLocation;
+       private final TtlTimeProvider timeProvider;
+
+       private AbstractKeyedStateBackend<String> keyedStateBackend;
+
+       protected StateBackendTestContext(TtlTimeProvider timeProvider) {
+               this.timeProvider = Preconditions.checkNotNull(timeProvider);
+               this.stateBackend = 
Preconditions.checkNotNull(createStateBackend());
+               this.checkpointStorageLocation = 
createCheckpointStorageLocation();
+       }
+
+       protected abstract StateBackend createStateBackend();
+
+       private CheckpointStorageLocation createCheckpointStorageLocation() {
+               try {
+                       return stateBackend
+                               .createCheckpointStorage(new JobID())
+                               .initializeLocationForCheckpoint(2L);
+               } catch (IOException e) {
+                       throw new RuntimeException("unexpected");
+               }
+       }
+
+       void createAndRestoreKeyedStateBackend() {
+               Environment env = new DummyEnvironment();
+               try {
+                       disposeKeyedStateBackend();
+                       keyedStateBackend = 
stateBackend.createKeyedStateBackend(
+                               env, new JobID(), "test", 
StringSerializer.INSTANCE, 10,
+                               new KeyGroupRange(0, 9), 
env.getTaskKvStateRegistry(), timeProvider);
+                       keyedStateBackend.setCurrentKey("defaultKey");
+               } catch (Exception e) {
+                       throw new RuntimeException("unexpected");
+               }
+       }
+
+       void disposeKeyedStateBackend() {
+               if (keyedStateBackend != null) {
+                       keyedStateBackend.dispose();
+                       keyedStateBackend = null;
+               }
+       }
+
+       @Nonnull
+       KeyedStateHandle takeSnapshot() throws Exception {
+               RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshotRunnableFuture =
+                       keyedStateBackend.snapshot(682375462392L, 10L,
+                               checkpointStorageLocation, 
CheckpointOptions.forCheckpointWithDefaultLocation());
+               if (!snapshotRunnableFuture.isDone()) {
+                       snapshotRunnableFuture.run();
+               }
+               return 
snapshotRunnableFuture.get().getJobManagerOwnedSnapshot();
+       }
+
+       void restoreSnapshot(@Nullable KeyedStateHandle snapshot) throws 
Exception {
+               Collection<KeyedStateHandle> restoreState =
+                       snapshot == null ? null : new 
StateObjectCollection<>(Collections.singleton(snapshot));
+               keyedStateBackend.restore(restoreState);
+               if (snapshot != null) {
+                       snapshot.discardState();
+               }
+       }
+
+       void setCurrentKey(String key) {
+               Preconditions.checkNotNull(keyedStateBackend, "keyed backend is 
not initialised");
+               keyedStateBackend.setCurrentKey(key);
+       }
+
+       @SuppressWarnings("unchecked")
+       <N, S extends State, V> S createState(
+               StateDescriptor<S, V> stateDescriptor,
+               @SuppressWarnings("SameParameterValue") N defaultNamespace) 
throws Exception {
+               S state = 
keyedStateBackend.getOrCreateKeyedState(StringSerializer.INSTANCE, 
stateDescriptor);
+               ((InternalKvState<?, N, ?>) 
state).setCurrentNamespace(defaultNamespace);
+               return state;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java
deleted file mode 100644
index 5d9c682..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.functions.AggregateFunction;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/** Test suite for {@link TtlAggregatingState}. */
-public class TtlAggregatingStateTest
-       extends 
TtlMergingStateBase.TtlIntegerMergingStateBase<TtlAggregatingState<?, String, 
Integer, Long, String>, Integer, String> {
-       private static final long DEFAULT_ACCUMULATOR = 3L;
-
-       @Override
-       void initTestValues() {
-               updater = v -> ttlState.add(v);
-               getter = () -> ttlState.get();
-               originalGetter = () -> ttlState.original.get();
-
-               updateEmpty = 5;
-               updateUnexpired = 7;
-               updateExpired = 6;
-
-               getUpdateEmpty = "8";
-               getUnexpired = "15";
-               getUpdateExpired = "9";
-       }
-
-       @Override
-       TtlAggregatingState<?, String, Integer, Long, String> createState() {
-               AggregatingStateDescriptor<Integer, Long, String> 
aggregatingStateDes =
-                       new 
AggregatingStateDescriptor<>("TtlTestAggregatingState", AGGREGATE, 
LongSerializer.INSTANCE);
-               return (TtlAggregatingState<?, String, Integer, Long, String>) 
wrapMockState(aggregatingStateDes);
-       }
-
-       @Override
-       String getMergeResult(
-               List<Tuple2<String, Integer>> unexpiredUpdatesToMerge,
-               List<Tuple2<String, Integer>> finalUpdatesToMerge) {
-               Set<String> namespaces = new HashSet<>();
-               unexpiredUpdatesToMerge.forEach(t -> namespaces.add(t.f0));
-               finalUpdatesToMerge.forEach(t -> namespaces.add(t.f0));
-               return 
Integer.toString(getIntegerMergeResult(unexpiredUpdatesToMerge, 
finalUpdatesToMerge) +
-                       namespaces.size() * (int) DEFAULT_ACCUMULATOR);
-       }
-
-       private static final AggregateFunction<Integer, Long, String> AGGREGATE 
=
-               new AggregateFunction<Integer, Long, String>() {
-                       @Override
-                       public Long createAccumulator() {
-                               return DEFAULT_ACCUMULATOR;
-                       }
-
-                       @Override
-                       public Long add(Integer value, Long accumulator) {
-                               return accumulator + value;
-                       }
-
-                       @Override
-                       public String getResult(Long accumulator) {
-                               return accumulator.toString();
-                       }
-
-                       @Override
-                       public Long merge(Long a, Long b) {
-                               return a + b;
-                       }
-               };
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTestContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTestContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTestContext.java
new file mode 100644
index 0000000..b19391a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTestContext.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** Test suite for {@link TtlAggregatingState}. */
+class TtlAggregatingStateTestContext
+       extends 
TtlMergingStateTestContext.TtlIntegerMergingStateTestContext<TtlAggregatingState<?,
 String, Integer, Long, String>, Integer, String> {
+       private static final long DEFAULT_ACCUMULATOR = 3L;
+
+       @Override
+       void initTestValues() {
+               updateEmpty = 5;
+               updateUnexpired = 7;
+               updateExpired = 6;
+
+               getUpdateEmpty = "8";
+               getUnexpired = "15";
+               getUpdateExpired = "9";
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       <US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() {
+               return (StateDescriptor<US, SV>) new 
AggregatingStateDescriptor<>(
+                       "TtlTestAggregatingState", AGGREGATE, 
LongSerializer.INSTANCE);
+       }
+
+       @Override
+       void update(Integer value) throws Exception {
+               ttlState.add(value);
+       }
+
+       @Override
+       String get() throws Exception {
+               return ttlState.get();
+       }
+
+       @Override
+       Object getOriginal() throws Exception {
+               return ttlState.original.get();
+       }
+
+       @Override
+       String getMergeResult(
+               List<Tuple2<String, Integer>> unexpiredUpdatesToMerge,
+               List<Tuple2<String, Integer>> finalUpdatesToMerge) {
+               Set<String> namespaces = new HashSet<>();
+               unexpiredUpdatesToMerge.forEach(t -> namespaces.add(t.f0));
+               finalUpdatesToMerge.forEach(t -> namespaces.add(t.f0));
+               return 
Integer.toString(getIntegerMergeResult(unexpiredUpdatesToMerge, 
finalUpdatesToMerge) +
+                       namespaces.size() * (int) DEFAULT_ACCUMULATOR);
+       }
+
+       private static final AggregateFunction<Integer, Long, String> AGGREGATE 
=
+               new AggregateFunction<Integer, Long, String>() {
+                       @Override
+                       public Long createAccumulator() {
+                               return DEFAULT_ACCUMULATOR;
+                       }
+
+                       @Override
+                       public Long add(Integer value, Long accumulator) {
+                               return accumulator + value;
+                       }
+
+                       @Override
+                       public String getResult(Long accumulator) {
+                               return accumulator.toString();
+                       }
+
+                       @Override
+                       public Long merge(Long a, Long b) {
+                               return a + b;
+                       }
+               };
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java
deleted file mode 100644
index 8dac8ca..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-
-/** Test suite for {@link TtlFoldingState}. */
-@SuppressWarnings("deprecation")
-public class TtlFoldingStateTest extends TtlStateTestBase<TtlFoldingState<?, 
String, Long, String>, Long, String> {
-       @Override
-       void initTestValues() {
-               updater = v -> ttlState.add(v);
-               getter = () -> ttlState.get();
-               originalGetter = () -> ttlState.original.get();
-
-               updateEmpty = 5L;
-               updateUnexpired = 7L;
-               updateExpired = 6L;
-
-               getUpdateEmpty = "6";
-               getUnexpired = "13";
-               getUpdateExpired = "7";
-       }
-
-       @Override
-       TtlFoldingState<?, String, Long, String> createState() {
-               FoldingStateDescriptor<Long, String> foldingStateDesc =
-                       new FoldingStateDescriptor<>("TtlTestFoldingState", 
"1",  FOLD, StringSerializer.INSTANCE);
-               return (TtlFoldingState<?, String, Long, String>) 
wrapMockState(foldingStateDesc);
-       }
-
-       private static final FoldFunction<Long, String> FOLD = (acc, val) -> {
-               long lacc = acc == null ? 0 : Long.parseLong(acc);
-               return Long.toString(val == null ? lacc : lacc + val);
-       };
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTestContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTestContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTestContext.java
new file mode 100644
index 0000000..2b072b9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTestContext.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+
+/** Test suite for {@link TtlFoldingState}. */
+@SuppressWarnings("deprecation")
+class TtlFoldingStateTestContext extends 
TtlStateTestContextBase<TtlFoldingState<?, String, Long, String>, Long, String> 
{
+       @Override
+       void initTestValues() {
+               updateEmpty = 5L;
+               updateUnexpired = 7L;
+               updateExpired = 6L;
+
+               getUpdateEmpty = "6";
+               getUnexpired = "13";
+               getUpdateExpired = "7";
+       }
+
+       @Override
+       void update(Long value) throws Exception {
+               ttlState.add(value);
+       }
+
+       @Override
+       String get() throws Exception {
+               return ttlState.get();
+       }
+
+       @Override
+       Object getOriginal() throws Exception {
+               return ttlState.original.get();
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       <US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() {
+               return (StateDescriptor<US, SV>) new FoldingStateDescriptor<>(
+                       "TtlTestFoldingState", "1",  FOLD, 
StringSerializer.INSTANCE);
+       }
+
+       private static final FoldFunction<Long, String> FOLD = (acc, val) -> {
+               long lacc = acc == null ? 0 : Long.parseLong(acc);
+               return Long.toString(val == null ? lacc : lacc + val);
+       };
+}

Reply via email to