Repository: flink
Updated Branches:
  refs/heads/master 46334e2f3 -> 01cf808ee


[FLINK-9858][tests] State TTL End-to-End Test

This closes #6361.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/01cf808e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/01cf808e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/01cf808e

Branch: refs/heads/master
Commit: 01cf808ee863f4b0f429f20ba05fd2e322842e48
Parents: 46334e2
Author: Andrey Zagrebin <[email protected]>
Authored: Fri Jul 13 19:27:35 2018 +0200
Committer: Stefan Richter <[email protected]>
Committed: Fri Jul 20 10:26:28 2018 +0200

----------------------------------------------------------------------
 .../flink-stream-state-ttl-test/pom.xml         | 104 +++++++++++
 .../tests/DataStreamStateTTLTestProgram.java    | 100 ++++++++++
 .../flink/streaming/tests/TtlStateUpdate.java   |  45 +++++
 .../streaming/tests/TtlStateUpdateSource.java   |  78 ++++++++
 .../tests/TtlVerifyUpdateFunction.java          | 187 +++++++++++++++++++
 .../tests/verify/AbstractTtlStateVerifier.java  | 105 +++++++++++
 .../verify/TtlAggregatingStateVerifier.java     | 107 +++++++++++
 .../tests/verify/TtlFoldingStateVerifier.java   |  87 +++++++++
 .../tests/verify/TtlListStateVerifier.java      |  78 ++++++++
 .../tests/verify/TtlMapStateVerifier.java       |  94 ++++++++++
 .../tests/verify/TtlReducingStateVerifier.java  |  86 +++++++++
 .../tests/verify/TtlStateVerifier.java          |  60 ++++++
 .../tests/verify/TtlUpdateContext.java          |  71 +++++++
 .../tests/verify/TtlValueStateVerifier.java     |  66 +++++++
 .../tests/verify/TtlVerificationContext.java    |  69 +++++++
 .../streaming/tests/verify/ValueWithTs.java     | 107 +++++++++++
 flink-end-to-end-tests/pom.xml                  |   1 +
 flink-end-to-end-tests/run-nightly-tests.sh     |   3 +
 flink-end-to-end-tests/test-scripts/common.sh   |  37 ++--
 .../test-scripts/test_stream_state_ttl.sh       |  83 ++++++++
 20 files changed, 1555 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/pom.xml 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/pom.xml
new file mode 100644
index 0000000..ad04f3e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/pom.xml
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-end-to-end-tests</artifactId>
+               <version>1.7-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-stream-state-ttl-test</artifactId>
+       <name>flink-stream-state-ttl-test</name>
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-datastream-allround-test</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <version>3.0.0</version>
+                               <executions>
+                                       <execution>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<finalName>DataStreamStateTTLTestProgram</finalName>
+                                                       <artifactSet>
+                                                               <excludes>
+                                                                       
<exclude>com.google.code.findbugs:jsr305</exclude>
+                                                                       
<exclude>org.slf4j:*</exclude>
+                                                                       
<exclude>log4j:*</exclude>
+                                                               </excludes>
+                                                       </artifactSet>
+                                                       <filters>
+                                                               <filter>
+                                                                       
<artifact>*:*</artifact>
+                                                                       
<excludes>
+                                                                               
<exclude>META-INF/*.SF</exclude>
+                                                                               
<exclude>META-INF/*.DSA</exclude>
+                                                                               
<exclude>META-INF/*.RSA</exclude>
+                                                                       
</excludes>
+                                                               </filter>
+                                                       </filters>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                                                       
<mainClass>org.apache.flink.streaming.tests.DataStreamStateTTLTestProgram</mainClass>
+                                                               </transformer>
+                                                       </transformers>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
new file mode 100644
index 0000000..f4c9619
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.state.StateTtlConfiguration;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
+
+/**
+ * A test job for State TTL feature.
+ *
+ * <p>The test pipeline does the following:
+ * - generates random keyed state updates for each state TTL verifier (state 
type)
+ * - performs update of created state with TTL for each verifier
+ * - keeps previous updates in other state
+ * - verifies expected result of last update against preserved history of 
updates
+ *
+ * <p>Program parameters:
+ * <ul>
+ *     <li>update_generator_source.keyspace (int, default - 100): Number of 
different keys for updates emitted by the update generator.</li>
+ *     <li>update_generator_source.sleep_time (long, default - 0): 
Milliseconds to sleep after emitting updates in the update generator. Set to 0 
to disable sleeping.</li>
+ *     <li>update_generator_source.sleep_after_elements (long, default - 0): 
Number of updates to emit before sleeping in the update generator. Set to 0 to 
disable sleeping.</li>
+ *     <li>state_ttl_verifier.ttl_milli (long, default - 1000): State 
time-to-live.</li>
+ *     <li>report_stat.after_updates_num (long, default - 200): Report state 
update statistics after certain number of updates (average update chain length 
and clashes).</li>
+ * </ul>
+ */
+public class DataStreamStateTTLTestProgram {
+       private static final ConfigOption<Integer> 
UPDATE_GENERATOR_SRC_KEYSPACE = ConfigOptions
+               .key("update_generator_source.keyspace")
+               .defaultValue(100);
+
+       private static final ConfigOption<Long> UPDATE_GENERATOR_SRC_SLEEP_TIME 
= ConfigOptions
+               .key("update_generator_source.sleep_time")
+               .defaultValue(0L);
+
+       private static final ConfigOption<Long> 
UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS = ConfigOptions
+               .key("update_generator_source.sleep_after_elements")
+               .defaultValue(0L);
+
+       private static final ConfigOption<Long> STATE_TTL_VERIFIER_TTL_MILLI = 
ConfigOptions
+               .key("state_ttl_verifier.ttl_milli")
+               .defaultValue(1000L);
+
+       private static final ConfigOption<Long> REPORT_STAT_AFTER_UPDATES_NUM = 
ConfigOptions
+               .key("report_stat.after_updates_num")
+               .defaultValue(200L);
+
+       public static void main(String[] args) throws Exception {
+               final ParameterTool pt = ParameterTool.fromArgs(args);
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               setupEnvironment(env, pt);
+
+               int keySpace = pt.getInt(UPDATE_GENERATOR_SRC_KEYSPACE.key(), 
UPDATE_GENERATOR_SRC_KEYSPACE.defaultValue());
+               long sleepAfterElements = 
pt.getLong(UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.key(),
+                       
UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.defaultValue());
+               long sleepTime = 
pt.getLong(UPDATE_GENERATOR_SRC_SLEEP_TIME.key(),
+                       UPDATE_GENERATOR_SRC_SLEEP_TIME.defaultValue());
+               Time ttl = 
Time.milliseconds(pt.getLong(STATE_TTL_VERIFIER_TTL_MILLI.key(),
+                       STATE_TTL_VERIFIER_TTL_MILLI.defaultValue()));
+               long reportStatAfterUpdatesNum = 
pt.getLong(REPORT_STAT_AFTER_UPDATES_NUM.key(),
+                       REPORT_STAT_AFTER_UPDATES_NUM.defaultValue());
+
+               StateTtlConfiguration ttlConfig = 
StateTtlConfiguration.newBuilder(ttl).build();
+
+               env
+                       .addSource(new TtlStateUpdateSource(keySpace, 
sleepAfterElements, sleepTime))
+                       .name("TtlStateUpdateSource")
+                       .keyBy(TtlStateUpdate::getKey)
+                       .flatMap(new TtlVerifyUpdateFunction(ttlConfig, 
reportStatAfterUpdatesNum))
+                       .name("TtlVerifyUpdateFunction")
+                       .addSink(new PrintSinkFunction<>())
+                       .name("PrintFailedVerifications");
+
+               env.execute("State TTL test job");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java
new file mode 100644
index 0000000..e89b544
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdate.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/** Randomly generated keyed state updates per state type. */
+class TtlStateUpdate implements Serializable {
+       private final int key;
+
+       @Nonnull
+       private final Map<String, Object> updates;
+
+       TtlStateUpdate(int key, @Nonnull Map<String, Object> updates) {
+               this.key = key;
+               this.updates = updates;
+       }
+
+       int getKey() {
+               return key;
+       }
+
+       Object getUpdate(String verifierId) {
+               return updates.get(verifierId);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java
new file mode 100644
index 0000000..6aff14e
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlStateUpdateSource.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.tests.verify.TtlStateVerifier;
+
+import java.util.Map;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+/**
+ * Source of randomly generated keyed state updates.
+ *
+ * <p>Internal loop generates {@code sleepAfterElements} state updates
+ * for each verifier from {@link TtlStateVerifier#VERIFIERS} using {@link 
TtlStateVerifier#generateRandomUpdate}
+ * and waits for {@code sleepTime} to continue generation.
+ */
+class TtlStateUpdateSource extends RichParallelSourceFunction<TtlStateUpdate> {
+       private final int maxKey;
+       private final long sleepAfterElements;
+       private final long sleepTime;
+
+       /** Flag that determines if this source is running, i.e. generating 
events. */
+       private volatile boolean running = true;
+
+       TtlStateUpdateSource(int maxKey, long sleepAfterElements, long 
sleepTime) {
+               this.maxKey = maxKey;
+               this.sleepAfterElements = sleepAfterElements;
+               this.sleepTime = sleepTime;
+       }
+
+       @Override
+       public void run(SourceContext<TtlStateUpdate> ctx) throws Exception {
+               Random random = new Random();
+               long elementsBeforeSleep = sleepAfterElements;
+               while (running) {
+                       for (int i = 0; i < sleepAfterElements; i++) {
+                               synchronized (ctx.getCheckpointLock()) {
+                                       Map<String, Object> updates = 
TtlStateVerifier.VERIFIERS.stream()
+                                               
.collect(Collectors.toMap(TtlStateVerifier::getId, 
TtlStateVerifier::generateRandomUpdate));
+                                       ctx.collect(new 
TtlStateUpdate(random.nextInt(maxKey), updates));
+                               }
+                       }
+
+                       if (sleepTime > 0) {
+                               if (elementsBeforeSleep == 1) {
+                                       elementsBeforeSleep = 
sleepAfterElements;
+                                       long rnd = sleepTime < 
Integer.MAX_VALUE ? random.nextInt((int) sleepTime) : 0L;
+                                       Thread.sleep(rnd + sleepTime);
+                               } else if (elementsBeforeSleep > 1) {
+                                       --elementsBeforeSleep;
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void cancel() {
+               running = false;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
new file mode 100644
index 0000000..a99a45f
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.tests.verify.TtlStateVerifier;
+import org.apache.flink.streaming.tests.verify.TtlUpdateContext;
+import org.apache.flink.streaming.tests.verify.TtlVerificationContext;
+import org.apache.flink.streaming.tests.verify.ValueWithTs;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * Update state with TTL for each verifier.
+ *
+ * <p>This function for each verifier from {@link TtlStateVerifier#VERIFIERS}
+ * - creates state with TTL
+ * - creates state of previous updates for further verification against it
+ * - receives random state update
+ * - gets state value before update
+ * - updates state with random value
+ * - gets state value after update
+ * - checks if this update clashes with any previous updates
+ * - if clashes, clears state and recreate update
+ * - verifies last update against previous updates
+ * - emits verification context in case of failure
+ */
+class TtlVerifyUpdateFunction
+       extends RichFlatMapFunction<TtlStateUpdate, String> implements 
CheckpointedFunction {
+       private static final Logger LOG = 
LoggerFactory.getLogger(TtlVerifyUpdateFunction.class);
+
+       @Nonnull
+       private final StateTtlConfiguration ttlConfig;
+       private final long ttl;
+       private final UpdateStat stat;
+
+       private transient Map<String, State> states;
+       private transient Map<String, ListState<ValueWithTs<?>>> 
prevUpdatesByVerifierId;
+
+       TtlVerifyUpdateFunction(@Nonnull StateTtlConfiguration ttlConfig, long 
reportStatAfterUpdatesNum) {
+               this.ttlConfig = ttlConfig;
+               this.ttl = ttlConfig.getTtl().toMilliseconds();
+               this.stat = new UpdateStat(reportStatAfterUpdatesNum);
+       }
+
+       @Override
+       public void flatMap(TtlStateUpdate updates, Collector<String> out) 
throws Exception {
+               for (TtlStateVerifier<?, ?> verifier : 
TtlStateVerifier.VERIFIERS) {
+                       TtlVerificationContext<?, ?> verificationContext = 
generateUpdateAndVerificationContext(updates, verifier);
+                       if (!verifier.verify(verificationContext)) {
+                               out.collect(verificationContext.toString());
+                       }
+               }
+       }
+
+       private TtlVerificationContext<?, ?> 
generateUpdateAndVerificationContext(
+               TtlStateUpdate updates, TtlStateVerifier<?, ?> verifier) throws 
Exception {
+               List<ValueWithTs<?>> prevUpdates = 
getPrevUpdates(verifier.getId());
+               Object update = updates.getUpdate(verifier.getId());
+               TtlUpdateContext<?, ?> updateContext = performUpdate(verifier, 
update);
+               boolean clashes = 
updateClashesWithPrevUpdates(updateContext.getUpdateWithTs(), prevUpdates);
+               if (clashes) {
+                       resetState(verifier.getId());
+                       prevUpdates = Collections.emptyList();
+                       updateContext = performUpdate(verifier, update);
+               }
+               stat.update(clashes, prevUpdates.size());
+               
prevUpdatesByVerifierId.get(verifier.getId()).add(updateContext.getUpdateWithTs());
+               return new TtlVerificationContext<>(updates.getKey(), 
verifier.getId(), prevUpdates, updateContext);
+       }
+
+       private List<ValueWithTs<?>> getPrevUpdates(String verifierId) throws 
Exception {
+               return StreamSupport
+                       
.stream(prevUpdatesByVerifierId.get(verifierId).get().spliterator(), false)
+                       .collect(Collectors.toList());
+       }
+
+       private TtlUpdateContext<?, ?> performUpdate(
+               TtlStateVerifier<?, ?> verifier, Object update) throws 
Exception {
+               State state = states.get(verifier.getId());
+               long timestampBeforeUpdate = System.currentTimeMillis();
+               Object valueBeforeUpdate = verifier.get(state);
+               verifier.update(state, update);
+               Object updatedValue = verifier.get(state);
+               return new TtlUpdateContext<>(timestampBeforeUpdate,
+                       valueBeforeUpdate, update, updatedValue, 
System.currentTimeMillis());
+       }
+
+       private boolean updateClashesWithPrevUpdates(ValueWithTs<?> update, 
List<ValueWithTs<?>> prevUpdates) {
+               return tooSlow(update) ||
+                       (!prevUpdates.isEmpty() && 
prevUpdates.stream().anyMatch(pu -> updatesClash(pu, update)));
+       }
+
+       private boolean tooSlow(ValueWithTs<?> update) {
+               return update.getTimestampAfterUpdate() - 
update.getTimestampBeforeUpdate() >= ttl;
+       }
+
+       private boolean updatesClash(ValueWithTs<?> prevUpdate, ValueWithTs<?> 
nextUpdate) {
+               return prevUpdate.getTimestampAfterUpdate() + ttl >= 
nextUpdate.getTimestampBeforeUpdate() &&
+                       prevUpdate.getTimestampBeforeUpdate() + ttl <= 
nextUpdate.getTimestampAfterUpdate();
+       }
+
+       private void resetState(String verifierId) {
+               states.get(verifierId).clear();
+               prevUpdatesByVerifierId.get(verifierId).clear();
+       }
+
+       @Override
+       public void snapshotState(FunctionSnapshotContext context) {
+
+       }
+
+       @Override
+       public void initializeState(FunctionInitializationContext context) {
+               states = TtlStateVerifier.VERIFIERS.stream()
+                       .collect(Collectors.toMap(TtlStateVerifier::getId, v -> 
v.createState(context, ttlConfig)));
+               prevUpdatesByVerifierId = TtlStateVerifier.VERIFIERS.stream()
+                       .collect(Collectors.toMap(TtlStateVerifier::getId, v -> 
{
+                               Preconditions.checkNotNull(v);
+                               TypeSerializer<ValueWithTs<?>> typeSerializer = 
new ValueWithTs.Serializer(v.getUpdateSerializer());
+                               ListStateDescriptor<ValueWithTs<?>> stateDesc = 
new ListStateDescriptor<>(
+                                       "TtlPrevValueState_" + v.getId(), 
typeSerializer);
+                               KeyedStateStore store = 
context.getKeyedStateStore();
+                               return store.getListState(stateDesc);
+                       }));
+       }
+
+       private static class UpdateStat implements Serializable {
+               final long reportStatAfterUpdatesNum;
+               long updates = 0;
+               long clashes = 0;
+               long prevUpdatesNum = 0;
+
+               UpdateStat(long reportStatAfterUpdatesNum) {
+                       this.reportStatAfterUpdatesNum = 
reportStatAfterUpdatesNum;
+               }
+
+               void update(boolean clash, long prevUpdatesSize) {
+                       updates++;
+                       if (clash) {
+                               clashes++;
+                       }
+                       prevUpdatesNum += prevUpdatesSize;
+                       if (updates % reportStatAfterUpdatesNum == 0) {
+                               LOG.info(String.format("Avg update chain 
length: %d, clash stat: %d/%d",
+                                       prevUpdatesNum / updates, clashes, 
updates));
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java
new file mode 100644
index 0000000..c56ff19
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+
+/** Base class for State TTL verifiers. */
+abstract class AbstractTtlStateVerifier<D extends StateDescriptor<S, SV>, S 
extends State, SV, UV, GV>
+       implements TtlStateVerifier<UV, GV> {
+       static final Random RANDOM = new Random();
+
+       @Nonnull
+       final D stateDesc;
+
+       AbstractTtlStateVerifier(@Nonnull D stateDesc) {
+               this.stateDesc = stateDesc;
+       }
+
+       @Nonnull
+       static String randomString() {
+               return StringUtils.getRandomString(RANDOM, 2, 20);
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       @Nonnull
+       public State createState(@Nonnull FunctionInitializationContext 
context, @Nonnull StateTtlConfiguration ttlConfig) {
+               stateDesc.enableTimeToLive(ttlConfig);
+               return createState(context);
+       }
+
+       abstract State createState(FunctionInitializationContext context);
+
+       @SuppressWarnings("unchecked")
+       @Override
+       @Nonnull
+       public TypeSerializer<UV> getUpdateSerializer() {
+               return (TypeSerializer<UV>) stateDesc.getSerializer();
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public GV get(@Nonnull State state) throws Exception {
+               return getInternal((S) state);
+       }
+
+       abstract GV getInternal(@Nonnull S state) throws Exception;
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public void update(@Nonnull State state, Object update) throws 
Exception {
+               updateInternal((S) state, (UV) update);
+       }
+
+       abstract void updateInternal(@Nonnull S state, UV update) throws 
Exception;
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public boolean verify(@Nonnull TtlVerificationContext<?, ?> 
verificationContextRaw) {
+               TtlVerificationContext<UV, GV> verificationContext = 
(TtlVerificationContext<UV, GV>) verificationContextRaw;
+               List<ValueWithTs<UV>> updates = new 
ArrayList<>(verificationContext.getPrevUpdates());
+               long currentTimestamp = 
verificationContext.getUpdateContext().getTimestampBeforeUpdate();
+               GV prevValue = expected(updates, currentTimestamp);
+               GV valueBeforeUpdate = 
verificationContext.getUpdateContext().getValueBeforeUpdate();
+               ValueWithTs<UV> update = 
verificationContext.getUpdateContext().getUpdateWithTs();
+               GV updatedValue = 
verificationContext.getUpdateContext().getUpdatedValue();
+               updates.add(update);
+               GV expectedValue = expected(updates, currentTimestamp);
+               return Objects.equals(valueBeforeUpdate, prevValue) && 
Objects.equals(updatedValue, expectedValue);
+       }
+
+       abstract GV expected(@Nonnull List<ValueWithTs<UV>> updates, long 
currentTimestamp);
+
+       boolean expired(long lastTimestamp, long currentTimestamp) {
+               return lastTimestamp + 
stateDesc.getTtlConfig().getTtl().toMilliseconds() <= currentTimestamp;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java
new file mode 100644
index 0000000..960bbe7
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlAggregatingStateVerifier.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+class TtlAggregatingStateVerifier extends AbstractTtlStateVerifier<
+       AggregatingStateDescriptor<Integer, Long, String>, 
AggregatingState<Integer, String>, Long, Integer, String> {
+       TtlAggregatingStateVerifier() {
+               super(new 
AggregatingStateDescriptor<>("TtlAggregatingStateVerifier", AGG_FUNC, 
LongSerializer.INSTANCE));
+       }
+
+       @Override
+       @Nonnull
+       State createState(@Nonnull FunctionInitializationContext context) {
+               return 
context.getKeyedStateStore().getAggregatingState(stateDesc);
+       }
+
+       @Override
+       @Nonnull
+       public TypeSerializer<Integer> getUpdateSerializer() {
+               return IntSerializer.INSTANCE;
+       }
+
+       @Override
+       @Nonnull
+       public Integer generateRandomUpdate() {
+               return RANDOM.nextInt(100);
+       }
+
+       @Override
+       String getInternal(@Nonnull AggregatingState<Integer, String> state) 
throws Exception {
+               return state.get();
+       }
+
+       @Override
+       void updateInternal(@Nonnull AggregatingState<Integer, String> state, 
Integer update) throws Exception {
+               state.add(update);
+       }
+
+       @Override
+       String expected(@Nonnull List<ValueWithTs<Integer>> updates, long 
currentTimestamp) {
+               if (updates.isEmpty()) {
+                       return null;
+               }
+               long acc = AGG_FUNC.createAccumulator();
+               long lastTs = updates.get(0).getTimestampAfterUpdate();
+               for (ValueWithTs<Integer> update : updates) {
+                       if (expired(lastTs, update.getTimestampAfterUpdate())) {
+                               acc = AGG_FUNC.createAccumulator();
+                       }
+                       acc = AGG_FUNC.add(update.getValue(), acc);
+                       lastTs = update.getTimestampAfterUpdate();
+               }
+               return expired(lastTs, currentTimestamp) ? null : 
AGG_FUNC.getResult(acc);
+       }
+
+       private static final AggregateFunction<Integer, Long, String> AGG_FUNC =
+               new AggregateFunction<Integer, Long, String>() {
+                       @Override
+                       public Long createAccumulator() {
+                               return 3L;
+                       }
+
+                       @Override
+                       public Long add(Integer value, Long accumulator) {
+                               return accumulator + value;
+                       }
+
+                       @Override
+                       public String getResult(Long accumulator) {
+                               return Long.toString(accumulator);
+                       }
+
+                       @Override
+                       public Long merge(Long a, Long b) {
+                               return a + b;
+                       }
+               };
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java
new file mode 100644
index 0000000..c1cc761
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlFoldingStateVerifier.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+@SuppressWarnings("deprecation")
+class TtlFoldingStateVerifier extends AbstractTtlStateVerifier<
+       FoldingStateDescriptor<Integer, Long>, FoldingState<Integer, Long>, 
Long, Integer, Long> {
+       private static final long INIT_VAL = 5L;
+
+       TtlFoldingStateVerifier() {
+               super(new FoldingStateDescriptor<>(
+                       "TtlFoldingStateVerifier", INIT_VAL, (v, acc) -> acc + 
v, LongSerializer.INSTANCE));
+       }
+
+       @Override
+       @Nonnull
+       State createState(@Nonnull FunctionInitializationContext context) {
+               return context.getKeyedStateStore().getFoldingState(stateDesc);
+       }
+
+       @Override
+       @Nonnull
+       public TypeSerializer<Integer> getUpdateSerializer() {
+               return IntSerializer.INSTANCE;
+       }
+
+       @Override
+       @Nonnull
+       public Integer generateRandomUpdate() {
+               return RANDOM.nextInt(100);
+       }
+
+       @Override
+       Long getInternal(@Nonnull FoldingState<Integer, Long> state) throws 
Exception {
+               return state.get();
+       }
+
+       @Override
+       void updateInternal(@Nonnull FoldingState<Integer, Long> state, Integer 
update) throws Exception {
+               state.add(update);
+       }
+
+       @Override
+       Long expected(@Nonnull List<ValueWithTs<Integer>> updates, long 
currentTimestamp) {
+               if (updates.isEmpty()) {
+                       return null;
+               }
+               long acc = INIT_VAL;
+               long lastTs = updates.get(0).getTimestampAfterUpdate();
+               for (ValueWithTs<Integer> update : updates) {
+                       if (expired(lastTs, update.getTimestampAfterUpdate())) {
+                               acc = INIT_VAL;
+                       }
+                       acc += update.getValue();
+                       lastTs = update.getTimestampAfterUpdate();
+               }
+               return expired(lastTs, currentTimestamp) ? null : acc;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java
new file mode 100644
index 0000000..b355aa9
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlListStateVerifier.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+class TtlListStateVerifier extends AbstractTtlStateVerifier<
+       ListStateDescriptor<String>, ListState<String>, List<String>, String, 
List<String>> {
+       TtlListStateVerifier() {
+               super(new ListStateDescriptor<>("TtlListStateVerifier", 
StringSerializer.INSTANCE));
+       }
+
+       @Override
+       @Nonnull
+       State createState(@Nonnull FunctionInitializationContext context) {
+               return context.getKeyedStateStore().getListState(stateDesc);
+       }
+
+       @Override
+       @Nonnull
+       public TypeSerializer<String> getUpdateSerializer() {
+               return StringSerializer.INSTANCE;
+       }
+
+       @Override
+       @Nonnull
+       public String generateRandomUpdate() {
+               return randomString();
+       }
+
+       @Override
+       @Nonnull
+       List<String> getInternal(@Nonnull ListState<String> state) throws 
Exception {
+               return StreamSupport.stream(state.get().spliterator(), false)
+                       .collect(Collectors.toList());
+       }
+
+       @Override
+       void updateInternal(@Nonnull ListState<String> state, String update) 
throws Exception {
+               state.add(update);
+       }
+
+       @Override
+       @Nonnull
+       List<String> expected(@Nonnull List<ValueWithTs<String>> updates, long 
currentTimestamp) {
+               return updates.stream()
+                       .filter(u -> !expired(u.getTimestampAfterUpdate(), 
currentTimestamp))
+                       .map(ValueWithTs::getValue)
+                       .collect(Collectors.toList());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java
new file mode 100644
index 0000000..a9d6b36
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlMapStateVerifier.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+
+class TtlMapStateVerifier extends AbstractTtlStateVerifier<
+       MapStateDescriptor<String, String>, MapState<String, String>,
+       Map<String, String>, Tuple2<String, String>, Map<String, String>> {
+       private static final List<String> KEYS = new ArrayList<>();
+       static {
+               IntStream.range(0, RANDOM.nextInt(5) + 5).forEach(i -> 
KEYS.add(randomString()));
+       }
+
+       TtlMapStateVerifier() {
+               super(new MapStateDescriptor<>("TtlMapStateVerifier", 
StringSerializer.INSTANCE, StringSerializer.INSTANCE));
+       }
+
+       @Override
+       @Nonnull
+       State createState(@Nonnull FunctionInitializationContext context) {
+               return context.getKeyedStateStore().getMapState(stateDesc);
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       @Nonnull
+       public TypeSerializer<Tuple2<String, String>> getUpdateSerializer() {
+               return new TupleSerializer(
+                       Tuple2.class, new TypeSerializer[] 
{StringSerializer.INSTANCE, StringSerializer.INSTANCE});
+       }
+
+       @Override
+       @Nonnull
+       public Tuple2<String, String> generateRandomUpdate() {
+               return Tuple2.of(KEYS.get(RANDOM.nextInt(KEYS.size())), 
randomString());
+       }
+
+       @Override
+       @Nonnull
+       Map<String, String> getInternal(@Nonnull MapState<String, String> 
state) throws Exception {
+               return StreamSupport.stream(state.entries().spliterator(), 
false)
+                       .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+       }
+
+       @Override
+       void updateInternal(@Nonnull MapState<String, String> state, 
Tuple2<String, String> update) throws Exception {
+               state.put(update.f0, update.f1);
+       }
+
+       @Override
+       @Nonnull
+       Map<String, String> expected(@Nonnull List<ValueWithTs<Tuple2<String, 
String>>> updates, long currentTimestamp) {
+               return updates.stream()
+                       .collect(Collectors.groupingBy(u -> u.getValue().f0))
+                       .entrySet().stream()
+                       .map(e -> e.getValue().get(e.getValue().size() - 1))
+                       .filter(u -> !expired(u.getTimestampAfterUpdate(), 
currentTimestamp))
+                       .map(ValueWithTs::getValue)
+                       .collect(Collectors.toMap(u -> u.f0, u -> u.f1));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java
new file mode 100644
index 0000000..773be05
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlReducingStateVerifier.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+class TtlReducingStateVerifier extends AbstractTtlStateVerifier<
+       ReducingStateDescriptor<Integer>, ReducingState<Integer>, Integer, 
Integer, Integer> {
+       TtlReducingStateVerifier() {
+               super(new ReducingStateDescriptor<>(
+                       "TtlReducingStateVerifier",
+                       (ReduceFunction<Integer>) (value1, value2) -> value1 + 
value2,
+                       IntSerializer.INSTANCE));
+       }
+
+       @Override
+       @Nonnull
+       State createState(@Nonnull FunctionInitializationContext context) {
+               return context.getKeyedStateStore().getReducingState(stateDesc);
+       }
+
+       @Override
+       @Nonnull
+       public TypeSerializer<Integer> getUpdateSerializer() {
+               return IntSerializer.INSTANCE;
+       }
+
+       @Override
+       @Nonnull
+       public Integer generateRandomUpdate() {
+               return RANDOM.nextInt(100);
+       }
+
+       @Override
+       Integer getInternal(@Nonnull ReducingState<Integer> state) throws 
Exception {
+               return state.get();
+       }
+
+       @Override
+       void updateInternal(@Nonnull ReducingState<Integer> state, Integer 
update) throws Exception {
+               state.add(update);
+       }
+
+       @Override
+       Integer expected(@Nonnull List<ValueWithTs<Integer>> updates, long 
currentTimestamp) {
+               if (updates.isEmpty()) {
+                       return null;
+               }
+               int acc = 0;
+               long lastTs = updates.get(0).getTimestampAfterUpdate();
+               for (ValueWithTs<Integer> update : updates) {
+                       if (expired(lastTs, update.getTimestampAfterUpdate())) {
+                               acc = 0;
+                       }
+                       acc += update.getValue();
+                       lastTs = update.getTimestampAfterUpdate();
+               }
+               return expired(lastTs, currentTimestamp) ? null : acc;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlStateVerifier.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlStateVerifier.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlStateVerifier.java
new file mode 100644
index 0000000..e1c2e07
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlStateVerifier.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** TTL state verifier interface. */
+public interface TtlStateVerifier<UV, GV> {
+       List<TtlStateVerifier<?, ?>> VERIFIERS = Arrays.asList(
+               new TtlValueStateVerifier(),
+               new TtlListStateVerifier(),
+               new TtlMapStateVerifier(),
+               new TtlAggregatingStateVerifier(),
+               new TtlReducingStateVerifier(),
+               new TtlFoldingStateVerifier()
+       );
+
+       @Nonnull
+       default String getId() {
+               return this.getClass().getSimpleName();
+       }
+
+       @Nonnull
+       State createState(@Nonnull FunctionInitializationContext context, 
@Nonnull StateTtlConfiguration ttlConfig);
+
+       @Nonnull
+       TypeSerializer<UV> getUpdateSerializer();
+
+       UV generateRandomUpdate();
+
+       GV get(@Nonnull State state) throws Exception;
+
+       void update(@Nonnull State state, Object update) throws Exception;
+
+       boolean verify(@Nonnull TtlVerificationContext<?, ?> 
verificationContext);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java
new file mode 100644
index 0000000..959340b
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+/** Contains context relevant for state update with TTL. */
+public class TtlUpdateContext<UV, GV> implements Serializable {
+       private final long timestampBeforeUpdate;
+       private final GV valueBeforeUpdate;
+       private final UV update;
+       private final GV updatedValue;
+       private final long timestampAfterUpdate;
+
+       public TtlUpdateContext(
+               long timestampBeforeUpdate,
+               GV valueBeforeUpdate, UV update, GV updatedValue,
+               long timestampAfterUpdate) {
+               this.valueBeforeUpdate = valueBeforeUpdate;
+               this.update = update;
+               this.updatedValue = updatedValue;
+               this.timestampBeforeUpdate = timestampBeforeUpdate;
+               this.timestampAfterUpdate = timestampAfterUpdate;
+       }
+
+       long getTimestampBeforeUpdate() {
+               return timestampBeforeUpdate;
+       }
+
+       GV getValueBeforeUpdate() {
+               return valueBeforeUpdate;
+       }
+
+       @Nonnull
+       public ValueWithTs<UV> getUpdateWithTs() {
+               return new ValueWithTs<>(update, timestampBeforeUpdate, 
timestampAfterUpdate);
+       }
+
+       GV getUpdatedValue() {
+               return updatedValue;
+       }
+
+       @Override
+       public String toString() {
+               return "TtlUpdateContext{" +
+                       "timestampBeforeUpdate=" + timestampBeforeUpdate +
+                       ", valueBeforeUpdate=" + valueBeforeUpdate +
+                       ", update=" + update +
+                       ", updatedValue=" + updatedValue +
+                       ", timestampAfterUpdate=" + timestampAfterUpdate +
+                       '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java
new file mode 100644
index 0000000..fa4929b
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlValueStateVerifier.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+class TtlValueStateVerifier
+       extends AbstractTtlStateVerifier<ValueStateDescriptor<String>, 
ValueState<String>, String, String, String> {
+       TtlValueStateVerifier() {
+               super(new ValueStateDescriptor<>("TtlValueStateVerifier", 
StringSerializer.INSTANCE));
+       }
+
+       @Override
+       @Nonnull
+       State createState(FunctionInitializationContext context) {
+               return context.getKeyedStateStore().getState(stateDesc);
+       }
+
+       @Nonnull
+       public String generateRandomUpdate() {
+               return randomString();
+       }
+
+       @Override
+       String getInternal(@Nonnull ValueState<String> state) throws Exception {
+               return state.value();
+       }
+
+       @Override
+       void updateInternal(@Nonnull ValueState<String> state, String update) 
throws Exception {
+               state.update(update);
+       }
+
+       @Override
+       String expected(@Nonnull List<ValueWithTs<String>> updates, long 
currentTimestamp) {
+               if (updates.isEmpty()) {
+                       return null;
+               }
+               ValueWithTs<String> lastUpdate = updates.get(updates.size() - 
1);
+               return expired(lastUpdate.getTimestampAfterUpdate(), 
currentTimestamp) ? null : lastUpdate.getValue();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java
new file mode 100644
index 0000000..4c985cd
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlVerificationContext.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Data to verify state update with TTL. */
+public class TtlVerificationContext<UV, GV> implements Serializable {
+       private final int key;
+       @Nonnull
+       private final String  verifierId;
+       @Nonnull
+       private final List<ValueWithTs<UV>> prevUpdates;
+       @Nonnull
+       private final TtlUpdateContext<UV, GV> updateContext;
+
+       @SuppressWarnings("unchecked")
+       public TtlVerificationContext(
+               int key,
+               @Nonnull String verifierId,
+               @Nonnull List<ValueWithTs<?>> prevUpdates,
+               @Nonnull TtlUpdateContext<?, ?> updateContext) {
+               this.key = key;
+               this.verifierId = verifierId;
+               this.prevUpdates = new ArrayList<>();
+               prevUpdates.forEach(pu -> 
this.prevUpdates.add((ValueWithTs<UV>) pu));
+               this.updateContext = (TtlUpdateContext<UV, GV>) updateContext;
+       }
+
+       @Nonnull
+       List<ValueWithTs<UV>> getPrevUpdates() {
+               return prevUpdates;
+       }
+
+       @Nonnull
+       TtlUpdateContext<UV, GV> getUpdateContext() {
+               return updateContext;
+       }
+
+       @Override
+       public String toString() {
+               return "TtlVerificationContext{" +
+                       "key=" + key +
+                       ", verifierId='" + verifierId + '\'' +
+                       ", prevUpdates=" + prevUpdates +
+                       ", updateContext=" + updateContext +
+                       '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
new file mode 100644
index 0000000..9302377
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+/** User state value with timestamps before and after update. */
+public class ValueWithTs<V> implements Serializable {
+       private final V value;
+       private final long timestampBeforeUpdate;
+       private final long timestampAfterUpdate;
+
+       public ValueWithTs(V value, long timestampBeforeUpdate, long 
timestampAfterUpdate) {
+               this.value = value;
+               this.timestampBeforeUpdate = timestampBeforeUpdate;
+               this.timestampAfterUpdate = timestampAfterUpdate;
+       }
+
+       V getValue() {
+               return value;
+       }
+
+       public long getTimestampBeforeUpdate() {
+               return timestampBeforeUpdate;
+       }
+
+       public long getTimestampAfterUpdate() {
+               return timestampAfterUpdate;
+       }
+
+       @Override
+       public String toString() {
+               return "ValueWithTs{" +
+                       "value=" + value +
+                       ", timestampBeforeUpdate=" + timestampBeforeUpdate +
+                       ", timestampAfterUpdate=" + timestampAfterUpdate +
+                       '}';
+       }
+
+       /** Serializer for Serializer. */
+       public static class Serializer extends 
CompositeSerializer<ValueWithTs<?>> {
+
+               public Serializer(TypeSerializer<?> userValueSerializer) {
+                       super(true, userValueSerializer, 
LongSerializer.INSTANCE, LongSerializer.INSTANCE);
+               }
+
+               @SuppressWarnings("unchecked")
+               Serializer(PrecomputedParameters precomputed, 
TypeSerializer<?>... fieldSerializers) {
+                       super(precomputed, fieldSerializers);
+               }
+
+               @Override
+               public ValueWithTs<?> createInstance(@Nonnull Object ... 
values) {
+                       return new ValueWithTs<>(values[0], (Long) values[1], 
(Long) values[2]);
+               }
+
+               @Override
+               protected void setField(@Nonnull ValueWithTs<?> value, int 
index, Object fieldValue) {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               protected Object getField(@Nonnull ValueWithTs<?> value, int 
index) {
+                       switch (index) {
+                               case 0:
+                                       return value.getValue();
+                               case 1:
+                                       return value.getTimestampBeforeUpdate();
+                               case 2:
+                                       return value.getTimestampAfterUpdate();
+                               default:
+                                       throw new 
FlinkRuntimeException("Unexpected field index for ValueWithTs");
+                       }
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               protected CompositeSerializer<ValueWithTs<?>> 
createSerializerInstance(
+                       PrecomputedParameters precomputed,
+                       TypeSerializer<?>... originalSerializers) {
+                       return new Serializer(precomputed, 
(TypeSerializer<Object>) originalSerializers[0]);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 86dd3db..4abf595 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -50,6 +50,7 @@ under the License.
                <module>flink-elasticsearch5-test</module>
                <module>flink-quickstart-test</module>
                <module>flink-confluent-schema-registry</module>
+               <module>flink-stream-state-ttl-test</module>
        </modules>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index 15c73d5..dc8424f 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -102,5 +102,8 @@ run_test "Quickstarts Scala nightly end-to-end test" 
"$END_TO_END_DIR/test-scrip
 
 run_test "Avro Confluent Schema Registry nightly end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_confluent_schema_registry.sh"
 
+run_test "State TTL Heap backend end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh file"
+run_test "State TTL RocksDb backend end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh rocks"
+
 printf "\n[PASS] All tests passed\n"
 exit 0

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index c78afe7..f4563cc 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -240,6 +240,15 @@ function start_cluster {
   done
 }
 
+function start_taskmanagers {
+    tmnum=$1
+    echo "Start ${tmnum} more task managers"
+    for (( c=0; c<tmnum; c++ ))
+    do
+        $FLINK_DIR/bin/taskmanager.sh start
+    done
+}
+
 function start_and_wait_for_tm {
 
   tm_query_result=$(curl -s "http://localhost:8081/taskmanagers";)
@@ -456,18 +465,17 @@ function s3_delete {
     https://${bucket}.s3.amazonaws.com/${s3_file}
 }
 
-# This function starts the given number of task managers and monitors their 
processes. If a task manager process goes
-# away a replacement is started.
+# This function starts the given number of task managers and monitors their 
processes.
+# If a task manager process goes away a replacement is started.
 function tm_watchdog {
   local expectedTm=$1
   while true;
   do
     runningTm=`jps | grep -Eo 'TaskManagerRunner|TaskManager' | wc -l`;
     count=$((expectedTm-runningTm))
-    for (( c=0; c<count; c++ ))
-    do
-      $FLINK_DIR/bin/taskmanager.sh start > /dev/null
-    done
+    if (( count != 0 )); then
+        start_taskmanagers ${count} > /dev/null
+    fi
     sleep 5;
   done
 }
@@ -508,7 +516,8 @@ function rollback_flink_slf4j_metric_reporter() {
 
 function get_metric_processed_records {
   OPERATOR=$1
-  N=$(grep ".General purpose test job.$OPERATOR.numRecordsIn:" 
$FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | tail -1)
+  JOB_NAME="${2:-General purpose test job}"
+  N=$(grep ".${JOB_NAME}.$OPERATOR.numRecordsIn:" 
$FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | tail -1)
   if [ -z $N ]; then
     N=0
   fi
@@ -517,7 +526,8 @@ function get_metric_processed_records {
 
 function get_num_metric_samples {
   OPERATOR=$1
-  N=$(grep ".General purpose test job.$OPERATOR.numRecordsIn:" 
$FLINK_DIR/log/*taskexecutor*.log | wc -l)
+  JOB_NAME="${2:-General purpose test job}"
+  N=$(grep ".${JOB_NAME}.$OPERATOR.numRecordsIn:" 
$FLINK_DIR/log/*taskexecutor*.log | wc -l)
   if [ -z $N ]; then
     N=0
   fi
@@ -527,13 +537,14 @@ function get_num_metric_samples {
 function wait_oper_metric_num_in_records {
     OPERATOR=$1
     MAX_NUM_METRICS="${2:-200}"
-    NUM_METRICS=$(get_num_metric_samples ${OPERATOR})
-    OLD_NUM_METRICS=${3:-${NUM_METRICS}}
+    JOB_NAME="${3:-General purpose test job}"
+    NUM_METRICS=$(get_num_metric_samples ${OPERATOR} '${JOB_NAME}')
+    OLD_NUM_METRICS=${4:-${NUM_METRICS}}
     # monitor the numRecordsIn metric of the state machine operator in the 
second execution
     # we let the test finish once the second restore execution has processed 
200 records
     while : ; do
-      NUM_METRICS=$(get_num_metric_samples ${OPERATOR})
-      NUM_RECORDS=$(get_metric_processed_records ${OPERATOR})
+      NUM_METRICS=$(get_num_metric_samples ${OPERATOR} "${JOB_NAME}")
+      NUM_RECORDS=$(get_metric_processed_records ${OPERATOR} "${JOB_NAME}")
 
       # only account for metrics that appeared in the second execution
       if (( $OLD_NUM_METRICS >= $NUM_METRICS )) ; then
@@ -541,7 +552,7 @@ function wait_oper_metric_num_in_records {
       fi
 
       if (( $NUM_RECORDS < $MAX_NUM_METRICS )); then
-        echo "Waiting for job to process up to 200 records, current progress: 
$NUM_RECORDS records ..."
+        echo "Waiting for job to process up to ${MAX_NUM_METRICS} records, 
current progress: ${NUM_RECORDS} records ..."
         sleep 1
       else
         break

http://git-wip-us.apache.org/repos/asf/flink/blob/01cf808e/flink-end-to-end-tests/test-scripts/test_stream_state_ttl.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_stream_state_ttl.sh 
b/flink-end-to-end-tests/test-scripts/test_stream_state_ttl.sh
new file mode 100755
index 0000000..fb911f3
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_stream_state_ttl.sh
@@ -0,0 +1,83 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+STATE_BACKEND_TYPE="${1:-file}"
+STATE_BACKEND_FILE_ASYNC="${2:-false}"
+TTL="${3:-1000}"
+PRECISION="${4:-5}"
+PARALLELISM="${5-3}"
+UPDATE_NUM="${6-1000}"
+
+CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+TEST=flink-stream-state-ttl-test
+TEST_PROGRAM_NAME=DataStreamStateTTLTestProgram
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar
+
+setup_flink_slf4j_metric_reporter
+function test_cleanup {
+  # don't call ourselves again for another signal interruption
+  trap "exit -1" INT
+  # don't call ourselves again for normal exit
+  trap "" EXIT
+
+  # revert our modifications to the Flink distribution
+  rm ${FLINK_DIR}/lib/flink-metrics-slf4j-*.jar
+}
+trap test_cleanup INT
+trap test_cleanup EXIT
+
+start_cluster
+start_taskmanagers $PARALLELISM
+
+function job_id() {
+    CMD="${FLINK_DIR}/bin/flink run -d -p ${PARALLELISM} ${TEST_PROGRAM_JAR} \
+      --test.semantics exactly-once \
+      --environment.parallelism ${PARALLELISM} \
+      --state_backend ${STATE_BACKEND_TYPE} \
+      --state_ttl_verifier.ttl_milli ${TTL} \
+      --state_ttl_verifier.precision_milli ${PRECISION} \
+      --state_backend.checkpoint_directory ${CHECKPOINT_DIR} \
+      --state_backend.file.async ${STATE_BACKEND_FILE_ASYNC} \
+      --update_generator_source.sleep_time 10 \
+      --update_generator_source.sleep_after_elements 1"
+    echo "${CMD}"
+}
+
+JOB_CMD=$(job_id)
+echo ${JOB_CMD}
+JOB=$(${JOB_CMD} | grep 'Job has been submitted with JobID' | sed 's/.* //g')
+wait_job_running ${JOB}
+wait_oper_metric_num_in_records TtlVerifyUpdateFunction.0 ${UPDATE_NUM} 'State 
TTL test job'
+
+SAVEPOINT_PATH=$(take_savepoint ${JOB} ${TEST_DATA_DIR} \
+  | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+cancel_job ${JOB}
+
+JOB_CMD=$(job_id)
+echo ${JOB_CMD}
+JOB=$(${JOB_CMD} | grep 'Job has been submitted with JobID' | sed 's/.* //g')
+wait_job_running ${JOB}
+wait_oper_metric_num_in_records TtlVerifyUpdateFunction.0 ${UPDATE_NUM} "State 
TTL test job"
+
+# if verification fails job produces failed TTL'ed state updates,
+# output would be non-empty and the test will not pass

Reply via email to