Repository: flink
Updated Branches:
  refs/heads/master 01cf808ee -> d309e61e2


[FLINK-9902][tests] Improve and refactor window checkpointing IT cases

This closes #6376.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d309e61e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d309e61e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d309e61e

Branch: refs/heads/master
Commit: d309e61e2bae170872b43cf60bd2fd9fef77814c
Parents: 01cf808
Author: Stefan Richter <[email protected]>
Authored: Wed Jul 18 23:29:56 2018 +0200
Committer: Stefan Richter <[email protected]>
Committed: Fri Jul 20 15:57:49 2018 +0200

----------------------------------------------------------------------
 .../EventTimeAllWindowCheckpointingITCase.java  | 296 ++-------------
 .../EventTimeWindowCheckpointingITCase.java     | 375 +++++--------------
 .../WindowCheckpointingITCase.java              | 229 +++--------
 .../test/checkpointing/utils/FailingSource.java | 155 ++++++++
 .../flink/test/checkpointing/utils/IntType.java |  38 ++
 .../checkpointing/utils/ValidatingSink.java     | 128 +++++++
 .../flink/test/util/SuccessException.java       |   2 +-
 7 files changed, 505 insertions(+), 718 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d309e61e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 9e14b26..5dc2aa0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -26,31 +26,23 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.test.checkpointing.utils.FailingSource;
+import org.apache.flink.test.checkpointing.utils.IntType;
+import org.apache.flink.test.checkpointing.utils.ValidatingSink;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.ClassRule;
 import org.junit.Test;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -88,20 +80,17 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
                final int numElementsPerKey = 3000;
                final int windowSize = 100;
                final int numKeys = 1;
-               FailingSource.reset();
 
                try {
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setParallelism(PARALLELISM);
                        
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                        env.enableCheckpointing(100);
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
                        env.getConfig().disableSysoutLogging();
 
                        env
-                                       .addSource(new FailingSource(numKeys,
-                                                       numElementsPerKey,
-                                                       numElementsPerKey / 3))
+                                       .addSource(new FailingSource(new 
EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(numKeys, 
windowSize), numElementsPerKey))
                                        .rebalance()
                                        .timeWindowAll(Time.of(windowSize, 
MILLISECONDS))
                                        .apply(new 
RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, 
TimeWindow>() {
@@ -133,9 +122,12 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
                                                        out.collect(new 
Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
                                                }
                                        })
-                                       .addSink(new ValidatingSink(numKeys, 
numElementsPerKey / windowSize)).setParallelism(1);
+                               .addSink(new ValidatingSink<>(
+                                       new 
EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(numElementsPerKey),
+                                       new 
EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(numKeys, 
numElementsPerKey, windowSize)))
+                               .setParallelism(1);
 
-                       tryExecute(env, "Tumbling Window Test");
+                       env.execute("Tumbling Window Test");
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -149,7 +141,6 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
                final int windowSize = 1000;
                final int windowSlide = 100;
                final int numKeys = 1;
-               FailingSource.reset();
 
                try {
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -160,7 +151,7 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
                        env.getConfig().disableSysoutLogging();
 
                        env
-                                       .addSource(new FailingSource(numKeys, 
numElementsPerKey, numElementsPerKey / 3))
+                               .addSource(new FailingSource(new 
EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(numKeys, 
windowSlide), numElementsPerKey))
                                        .rebalance()
                                        .timeWindowAll(Time.of(windowSize, 
MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
                                        .apply(new 
RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, 
TimeWindow>() {
@@ -192,9 +183,12 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
                                                        out.collect(new 
Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
                                                }
                                        })
-                                       .addSink(new ValidatingSink(numKeys, 
numElementsPerKey / windowSlide)).setParallelism(1);
+                               .addSink(new ValidatingSink<>(
+                                       new 
EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(numElementsPerKey),
+                                       new 
EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(numKeys, 
numElementsPerKey, windowSlide)))
+                               .setParallelism(1);
 
-                       tryExecute(env, "Sliding Window Test");
+                       env.execute("Sliding Window Test");
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -207,20 +201,17 @@ public class EventTimeAllWindowCheckpointingITCase 
extends TestLogger {
                final int numElementsPerKey = 3000;
                final int windowSize = 100;
                final int numKeys = 1;
-               FailingSource.reset();
 
                try {
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setParallelism(PARALLELISM);
                        
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                        env.enableCheckpointing(100);
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
                        env.getConfig().disableSysoutLogging();
 
                        env
-                                       .addSource(new FailingSource(numKeys,
-                                                       numElementsPerKey,
-                                                       numElementsPerKey / 3))
+                               .addSource(new FailingSource(new 
EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(numKeys, 
windowSize), numElementsPerKey))
                                        .rebalance()
                                        .timeWindowAll(Time.of(windowSize, 
MILLISECONDS))
                                        .reduce(
@@ -261,9 +252,12 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
                                                        }
                                                }
                                        })
-                                       .addSink(new ValidatingSink(numKeys, 
numElementsPerKey / windowSize)).setParallelism(1);
+                               .addSink(new ValidatingSink<>(
+                                       new 
EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(numElementsPerKey),
+                                       new 
EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(numKeys, 
numElementsPerKey, windowSize)))
+                               .setParallelism(1);
 
-                       tryExecute(env, "Tumbling Window Test");
+                       env.execute("Tumbling Window Test");
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -276,20 +270,17 @@ public class EventTimeAllWindowCheckpointingITCase 
extends TestLogger {
                final int numElementsPerKey = 3000;
                final int windowSize = 100;
                final int numKeys = 1;
-               FailingSource.reset();
 
                try {
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setParallelism(PARALLELISM);
                        
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                        env.enableCheckpointing(100);
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
                        env.getConfig().disableSysoutLogging();
 
                        env
-                                       .addSource(new FailingSource(numKeys,
-                                                       numElementsPerKey,
-                                                       numElementsPerKey / 3))
+                               .addSource(new FailingSource(new 
EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(numKeys, 
windowSize), numElementsPerKey))
                                        .rebalance()
                                        .timeWindowAll(Time.of(windowSize, 
MILLISECONDS))
                                        .fold(new Tuple4<>(0L, 0L, 0L, new 
IntType(0)),
@@ -329,9 +320,12 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
                                                                        }
                                                                }
                                                        })
-                                       .addSink(new ValidatingSink(numKeys, 
numElementsPerKey / windowSize)).setParallelism(1);
+                               .addSink(new ValidatingSink<>(
+                                       new 
EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(numElementsPerKey),
+                                       new 
EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(numKeys, 
numElementsPerKey, windowSize)))
+                               .setParallelism(1);
 
-                       tryExecute(env, "Tumbling Window Test");
+                       env.execute("Tumbling Window Test");
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -345,20 +339,17 @@ public class EventTimeAllWindowCheckpointingITCase 
extends TestLogger {
                final int windowSize = 1000;
                final int windowSlide = 100;
                final int numKeys = 1;
-               FailingSource.reset();
 
                try {
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setParallelism(PARALLELISM);
                        
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                        env.enableCheckpointing(100);
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
                        env.getConfig().disableSysoutLogging();
 
                        env
-                                       .addSource(new FailingSource(numKeys,
-                                                       numElementsPerKey,
-                                                       numElementsPerKey / 3))
+                               .addSource(new FailingSource(new 
EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(numKeys, 
windowSlide), numElementsPerKey))
                                        .rebalance()
                                        .timeWindowAll(Time.of(windowSize, 
MILLISECONDS),
                                                        Time.of(windowSlide, 
MILLISECONDS))
@@ -400,229 +391,16 @@ public class EventTimeAllWindowCheckpointingITCase 
extends TestLogger {
                                                        }
                                                }
                                        })
-                                       .addSink(new ValidatingSink(numKeys, 
numElementsPerKey / windowSlide)).setParallelism(1);
+                               .addSink(new ValidatingSink<>(
+                                       new 
EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(numElementsPerKey),
+                                       new 
EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(numKeys, 
numElementsPerKey, windowSlide)))
+                               .setParallelism(1);
 
-                       tryExecute(env, "Tumbling Window Test");
+                       env.execute("Tumbling Window Test");
                }
                catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
        }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       private static class FailingSource extends 
RichSourceFunction<Tuple2<Long, IntType>>
-                       implements ListCheckpointed<Integer>, 
CheckpointListener {
-               private static volatile boolean failedBefore = false;
-
-               private final int numKeys;
-               private final int numElementsToEmit;
-               private final int failureAfterNumElements;
-
-               private volatile int numElementsEmitted;
-               private volatile int numSuccessfulCheckpoints;
-               private volatile boolean running = true;
-
-               private FailingSource(int numKeys, int numElementsToEmitPerKey, 
int failureAfterNumElements) {
-                       this.numKeys = numKeys;
-                       this.numElementsToEmit = numElementsToEmitPerKey;
-                       this.failureAfterNumElements = failureAfterNumElements;
-               }
-
-               @Override
-               public void open(Configuration parameters) {
-                       // non-parallel source
-                       assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
-               }
-
-               @Override
-               public void run(SourceContext<Tuple2<Long, IntType>> ctx) 
throws Exception {
-                       // we loop longer than we have elements, to permit 
delayed checkpoints
-                       // to still cause a failure
-                       while (running) {
-
-                               if (!failedBefore) {
-                                       // delay a bit, if we have not failed 
before
-                                       Thread.sleep(1);
-                                       if (numSuccessfulCheckpoints >= 2 && 
numElementsEmitted >= failureAfterNumElements) {
-                                               // cause a failure if we have 
not failed before and have reached
-                                               // enough completed checkpoints 
and elements
-                                               failedBefore = true;
-                                               throw new Exception("Artificial 
Failure");
-                                       }
-                               }
-
-                               if (numElementsEmitted < numElementsToEmit &&
-                                               (failedBefore || 
numElementsEmitted <= failureAfterNumElements)) {
-                                       // the function failed before, or we 
are in the elements before the failure
-                                       synchronized (ctx.getCheckpointLock()) {
-                                               int next = numElementsEmitted++;
-                                               for (long i = 0; i < numKeys; 
i++) {
-                                                       
ctx.collectWithTimestamp(new Tuple2<>(i, new IntType(next)), next);
-                                               }
-                                               ctx.emitWatermark(new 
Watermark(next));
-                                       }
-                               }
-                               else {
-                                       // if our work is done, delay a bit to 
prevent busy waiting
-                                       Thread.sleep(1);
-                               }
-                       }
-               }
-
-               @Override
-               public void cancel() {
-                       running = false;
-               }
-
-               @Override
-               public void notifyCheckpointComplete(long checkpointId) {
-                       numSuccessfulCheckpoints++;
-               }
-
-               public static void reset() {
-                       failedBefore = false;
-               }
-
-               @Override
-               public List<Integer> snapshotState(long checkpointId, long 
timestamp) throws Exception {
-                       return 
Collections.singletonList(this.numElementsEmitted);
-               }
-
-               @Override
-               public void restoreState(List<Integer> state) throws Exception {
-                       if (state.isEmpty() || state.size() > 1) {
-                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
-                       }
-                       this.numElementsEmitted = state.get(0);
-               }
-       }
-
-       private static class ValidatingSink extends 
RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
-                       implements ListCheckpointed<HashMap<Long, Integer>> {
-
-               private final HashMap<Long, Integer> windowCounts = new 
HashMap<>();
-
-               private final int numKeys;
-               private final int numWindowsExpected;
-
-               private ValidatingSink(int numKeys, int numWindowsExpected) {
-                       this.numKeys = numKeys;
-                       this.numWindowsExpected = numWindowsExpected;
-               }
-
-               @Override
-               public void open(Configuration parameters) throws Exception {
-                       // this sink can only work with DOP 1
-                       assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
-
-                       // it can happen that a checkpoint happens when the 
complete success state is
-                       // already set. In that case we restart with the final 
state and would never
-                       // finish because no more elements arrive.
-                       if (windowCounts.size() == numKeys) {
-                               boolean seenAll = true;
-                               for (Integer windowCount: 
windowCounts.values()) {
-                                       if (windowCount != numWindowsExpected) {
-                                               seenAll = false;
-                                               break;
-                                       }
-                               }
-                               if (seenAll) {
-                                       throw new SuccessException();
-                               }
-                       }
-               }
-
-               @Override
-               public void close() throws Exception {
-                       boolean seenAll = true;
-                       if (windowCounts.size() == numKeys) {
-                               for (Integer windowCount: 
windowCounts.values()) {
-                                       if (windowCount != numWindowsExpected) {
-                                               seenAll = false;
-                                               break;
-                                       }
-                               }
-                       }
-                       assertTrue("The source must see all expected windows.", 
seenAll);
-               }
-
-               @Override
-               public void invoke(Tuple4<Long, Long, Long, IntType> value) 
throws Exception {
-
-                       // verify the contents of that window, Tuple4.f1 and 
.f2 are the window start/end
-                       // the sum should be "sum (start .. end-1)"
-
-                       int expectedSum = 0;
-                       for (long i = value.f1; i < value.f2; i++) {
-                               // only sum up positive vals, to filter out the 
negative start of the
-                               // first sliding windows
-                               if (i > 0) {
-                                       expectedSum += i;
-                               }
-                       }
-
-                       assertEquals("Window start: " + value.f1 + " end: " + 
value.f2, expectedSum, value.f3.value);
-
-                       Integer curr = windowCounts.get(value.f0);
-                       if (curr != null) {
-                               windowCounts.put(value.f0, curr + 1);
-                       }
-                       else {
-                               windowCounts.put(value.f0, 1);
-                       }
-
-                       boolean seenAll = true;
-                       if (windowCounts.size() == numKeys) {
-                               for (Integer windowCount: 
windowCounts.values()) {
-                                       if (windowCount < numWindowsExpected) {
-                                               seenAll = false;
-                                               break;
-                                       } else if (windowCount > 
numWindowsExpected) {
-                                               fail("Window count to high: " + 
windowCount);
-                                       }
-                               }
-
-                               if (seenAll) {
-                                       // exit
-                                       throw new SuccessException();
-                               }
-
-                       }
-               }
-
-               @Override
-               public List<HashMap<Long, Integer>> snapshotState(long 
checkpointId, long timestamp) throws Exception {
-                       return Collections.singletonList(this.windowCounts);
-               }
-
-               @Override
-               public void restoreState(List<HashMap<Long, Integer>> state) 
throws Exception {
-                       if (state.isEmpty() || state.size() > 1) {
-                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
-                       }
-                       this.windowCounts.putAll(state.get(0));
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Custom boxed integer type.
-        */
-       public static class IntType {
-
-               public int value;
-
-               public IntType() {}
-
-               public IntType(int value) {
-                       this.value = value;
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d309e61e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index c3d93d7..e9a2e45 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -32,21 +32,20 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.test.checkpointing.utils.FailingSource;
+import org.apache.flink.test.checkpointing.utils.IntType;
+import org.apache.flink.test.checkpointing.utils.ValidatingSink;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
@@ -65,19 +64,10 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
+import java.util.Map;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static 
org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE;
-import static 
org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC;
-import static 
org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.MEM;
-import static 
org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.MEM_ASYNC;
-import static 
org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC;
-import static 
org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL;
 import static 
org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
-import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -255,20 +245,19 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                final int numElementsPerKey = numElementsPerKey();
                final int windowSize = windowSize();
                final int numKeys = numKeys();
-               FailingSource.reset();
 
                try {
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setParallelism(PARALLELISM);
                        
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                        env.enableCheckpointing(100);
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
                        env.getConfig().disableSysoutLogging();
                        env.setStateBackend(this.stateBackend);
                        env.getConfig().setUseSnapshotCompression(true);
 
                        env
-                                       .addSource(new FailingSource(numKeys, 
numElementsPerKey, numElementsPerKey / 3))
+                                       .addSource(new FailingSource(new 
KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey))
                                        .rebalance()
                                        .keyBy(0)
                                        .timeWindow(Time.of(windowSize, 
MILLISECONDS))
@@ -299,12 +288,17 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                                                                sum += 
value.f1.value;
                                                                key = value.f0;
                                                        }
-                                                       out.collect(new 
Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+
+                                                       final Tuple4<Long, 
Long, Long, IntType> result =
+                                                               new 
Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum));
+                                                       out.collect(result);
                                                }
                                        })
-                                       .addSink(new ValidatingSink(numKeys, 
numElementsPerKey / windowSize)).setParallelism(1);
+                               .addSink(new ValidatingSink<>(
+                                       new 
SinkValidatorUpdateFun(numElementsPerKey),
+                                       new SinkValidatorCheckFun(numKeys, 
numElementsPerKey, windowSize))).setParallelism(1);
 
-                       tryExecute(env, "Tumbling Window Test");
+                       env.execute("Tumbling Window Test");
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -326,7 +320,6 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                final int numElementsPerKey = numElementsPerKey();
                final int windowSize = windowSize();
                final int numKeys = numKeys();
-               FailingSource.reset();
 
                try {
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -334,13 +327,13 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                        env.setMaxParallelism(maxParallelism);
                        
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                        env.enableCheckpointing(100);
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
                        env.getConfig().disableSysoutLogging();
                        env.setStateBackend(this.stateBackend);
                        env.getConfig().setUseSnapshotCompression(true);
 
                        env
-                                       .addSource(new FailingSource(numKeys, 
numElementsPerKey, numElementsPerKey / 3))
+                                       .addSource(new FailingSource(new 
KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey))
                                        .rebalance()
                                        .keyBy(0)
                                        .timeWindow(Time.of(windowSize, 
MILLISECONDS))
@@ -378,9 +371,11 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                                                        out.collect(new 
Tuple4<>(tuple.<Long>getField(0), window.getStart(), window.getEnd(), new 
IntType(count.value())));
                                                }
                                        })
-                                       .addSink(new 
CountValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
+                               .addSink(new ValidatingSink<>(
+                                       new CountingSinkValidatorUpdateFun(),
+                                       new SinkValidatorCheckFun(numKeys, 
numElementsPerKey, windowSize))).setParallelism(1);
 
-                       tryExecute(env, "Tumbling Window Test");
+                       env.execute("Tumbling Window Test");
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -394,7 +389,6 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                final int windowSize = windowSize();
                final int windowSlide = windowSlide();
                final int numKeys = numKeys();
-               FailingSource.reset();
 
                try {
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -402,13 +396,13 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                        env.setParallelism(PARALLELISM);
                        
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                        env.enableCheckpointing(100);
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
                        env.getConfig().disableSysoutLogging();
                        env.setStateBackend(this.stateBackend);
                        env.getConfig().setUseSnapshotCompression(true);
 
                        env
-                                       .addSource(new FailingSource(numKeys, 
numElementsPerKey, numElementsPerKey / 3))
+                                       .addSource(new FailingSource(new 
KeyedEventTimeGenerator(numKeys, windowSlide), numElementsPerKey))
                                        .rebalance()
                                        .keyBy(0)
                                        .timeWindow(Time.of(windowSize, 
MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
@@ -439,12 +433,16 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                                                                sum += 
value.f1.value;
                                                                key = value.f0;
                                                        }
-                                                       out.collect(new 
Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+                                                       final Tuple4<Long, 
Long, Long, IntType> output =
+                                                               new 
Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum));
+                                                       out.collect(output);
                                                }
                                        })
-                                       .addSink(new ValidatingSink(numKeys, 
numElementsPerKey / windowSlide)).setParallelism(1);
+                               .addSink(new ValidatingSink<>(
+                                       new 
SinkValidatorUpdateFun(numElementsPerKey),
+                                       new SinkValidatorCheckFun(numKeys, 
numElementsPerKey, windowSlide))).setParallelism(1);
 
-                       tryExecute(env, "Tumbling Window Test");
+                       env.execute("Tumbling Window Test");
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -457,20 +455,19 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                final int numElementsPerKey = numElementsPerKey();
                final int windowSize = windowSize();
                final int numKeys = numKeys();
-               FailingSource.reset();
 
                try {
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setParallelism(PARALLELISM);
                        
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                        env.enableCheckpointing(100);
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
                        env.getConfig().disableSysoutLogging();
                        env.setStateBackend(this.stateBackend);
                        env.getConfig().setUseSnapshotCompression(true);
 
                        env
-                                       .addSource(new FailingSource(numKeys, 
numElementsPerKey, numElementsPerKey / 3))
+                                       .addSource(new FailingSource(new 
KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey))
                                        .rebalance()
                                        .keyBy(0)
                                        .timeWindow(Time.of(windowSize, 
MILLISECONDS))
@@ -505,16 +502,19 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                                                        assertTrue(open);
 
                                                        for (Tuple2<Long, 
IntType> in: input) {
-                                                               out.collect(new 
Tuple4<>(in.f0,
-                                                                               
window.getStart(),
-                                                                               
window.getEnd(),
-                                                                               
in.f1));
+                                                               final 
Tuple4<Long, Long, Long, IntType> output = new Tuple4<>(in.f0,
+                                                                       
window.getStart(),
+                                                                       
window.getEnd(),
+                                                                       in.f1);
+                                                               
out.collect(output);
                                                        }
                                                }
                                        })
-                                       .addSink(new ValidatingSink(numKeys, 
numElementsPerKey / windowSize)).setParallelism(1);
+                               .addSink(new ValidatingSink<>(
+                                       new 
SinkValidatorUpdateFun(numElementsPerKey),
+                                       new SinkValidatorCheckFun(numKeys, 
numElementsPerKey, windowSize))).setParallelism(1);
 
-                       tryExecute(env, "Tumbling Window Test");
+                       env.execute("Tumbling Window Test");
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -528,20 +528,19 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                final int windowSize = windowSize();
                final int windowSlide = windowSlide();
                final int numKeys = numKeys();
-               FailingSource.reset();
 
                try {
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setParallelism(PARALLELISM);
                        
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                        env.enableCheckpointing(100);
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
                        env.getConfig().disableSysoutLogging();
                        env.setStateBackend(this.stateBackend);
                        env.getConfig().setUseSnapshotCompression(true);
 
                        env
-                                       .addSource(new FailingSource(numKeys, 
numElementsPerKey, numElementsPerKey / 3))
+                                       .addSource(new FailingSource(new 
KeyedEventTimeGenerator(numKeys, windowSlide), numElementsPerKey))
                                        .rebalance()
                                        .keyBy(0)
                                        .timeWindow(Time.of(windowSize, 
MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
@@ -585,9 +584,11 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                                                        }
                                                }
                                        })
-                                       .addSink(new ValidatingSink(numKeys, 
numElementsPerKey / windowSlide)).setParallelism(1);
+                                       .addSink(new ValidatingSink<>(
+                                               new 
SinkValidatorUpdateFun(numElementsPerKey),
+                                               new 
SinkValidatorCheckFun(numKeys, numElementsPerKey, 
windowSlide))).setParallelism(1);
 
-                       tryExecute(env, "Tumbling Window Test");
+                       env.execute("Tumbling Window Test");
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -599,151 +600,42 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
        //  Utilities
        // 
------------------------------------------------------------------------
 
-       private static class FailingSource extends 
RichSourceFunction<Tuple2<Long, IntType>>
-                       implements ListCheckpointed<Integer>, 
CheckpointListener {
-               private static volatile boolean failedBefore = false;
-
-               private final int numKeys;
-               private final int numElementsToEmit;
-               private final int failureAfterNumElements;
-
-               private volatile int numElementsEmitted;
-               private volatile int numSuccessfulCheckpoints;
-               private volatile boolean running = true;
-
-               private FailingSource(int numKeys, int numElementsToEmitPerKey, 
int failureAfterNumElements) {
-                       this.numKeys = numKeys;
-                       this.numElementsToEmit = numElementsToEmitPerKey;
-                       this.failureAfterNumElements = failureAfterNumElements;
-               }
-
-               @Override
-               public void open(Configuration parameters) {
-                       // non-parallel source
-                       assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
-               }
-
-               @Override
-               public void run(SourceContext<Tuple2<Long, IntType>> ctx) 
throws Exception {
-                       // we loop longer than we have elements, to permit 
delayed checkpoints
-                       // to still cause a failure
-                       while (running) {
-
-                               if (!failedBefore) {
-                                       // delay a bit, if we have not failed 
before
-                                       Thread.sleep(1);
-                                       if (numSuccessfulCheckpoints >= 2 && 
numElementsEmitted >= failureAfterNumElements) {
-                                               // cause a failure if we have 
not failed before and have reached
-                                               // enough completed checkpoints 
and elements
-                                               failedBefore = true;
-                                               throw new Exception("Artificial 
Failure");
-                                       }
-                               }
-
-                               if (numElementsEmitted < numElementsToEmit &&
-                                               (failedBefore || 
numElementsEmitted <= failureAfterNumElements)) {
-                                       // the function failed before, or we 
are in the elements before the failure
-                                       synchronized (ctx.getCheckpointLock()) {
-                                               int next = numElementsEmitted++;
-                                               for (long i = 0; i < numKeys; 
i++) {
-                                                       
ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next);
-                                               }
-                                               ctx.emitWatermark(new 
Watermark(next));
-                                       }
-                               }
-                               else {
-
-                                       // if our work is done, delay a bit to 
prevent busy waiting
-                                       Thread.sleep(1);
-                               }
-                       }
-               }
-
-               @Override
-               public void cancel() {
-                       running = false;
-               }
-
-               @Override
-               public void notifyCheckpointComplete(long checkpointId) {
-                       numSuccessfulCheckpoints++;
-               }
+       /**
+        * For validating the stateful window counts.
+        */
+       static class CountingSinkValidatorUpdateFun
+               implements ValidatingSink.CountUpdater<Tuple4<Long, Long, Long, 
IntType>> {
 
                @Override
-               public List<Integer> snapshotState(long checkpointId, long 
timestamp) throws Exception {
-                       return 
Collections.singletonList(this.numElementsEmitted);
-               }
+               public void updateCount(Tuple4<Long, Long, Long, IntType> 
value, Map<Long, Integer> windowCounts) {
 
-               @Override
-               public void restoreState(List<Integer> state) throws Exception {
-                       if (state.isEmpty() || state.size() > 1) {
-                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
-                       }
-                       this.numElementsEmitted = state.get(0);
-               }
+                       windowCounts.merge(value.f0, 1, (a, b) -> a + b);
 
-               public static void reset() {
-                       failedBefore = false;
+                       // verify the contents of that window, the contents 
should be:
+                       // (key + num windows so far)
+                       assertEquals("Window counts don't match for key " + 
value.f0 + ".", value.f0.intValue() + windowCounts.get(value.f0), 
value.f3.value);
                }
        }
 
-       private static class ValidatingSink extends 
RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
-                       implements ListCheckpointed<HashMap<Long, Integer>> {
-
-               private final HashMap<Long, Integer> windowCounts = new 
HashMap<>();
-
-               private final int numKeys;
-               private final int numWindowsExpected;
+       //------------------------------------
 
-               private ValidatingSink(int numKeys, int numWindowsExpected) {
-                       this.numKeys = numKeys;
-                       this.numWindowsExpected = numWindowsExpected;
-               }
+       static class SinkValidatorUpdateFun implements 
ValidatingSink.CountUpdater<Tuple4<Long, Long, Long, IntType>> {
 
-               @Override
-               public void open(Configuration parameters) throws Exception {
-                       // this sink can only work with DOP 1
-                       assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
+               private final int elementsPerKey;
 
-                       // it can happen that a checkpoint happens when the 
complete success state is
-                       // already set. In that case we restart with the final 
state and would never
-                       // finish because no more elements arrive.
-                       if (windowCounts.size() == numKeys) {
-                               boolean seenAll = true;
-                               for (Integer windowCount: 
windowCounts.values()) {
-                                       if (windowCount != numWindowsExpected) {
-                                               seenAll = false;
-                                               break;
-                                       }
-                               }
-                               if (seenAll) {
-                                       throw new SuccessException();
-                               }
-                       }
+               SinkValidatorUpdateFun(int elementsPerKey) {
+                       this.elementsPerKey = elementsPerKey;
                }
 
                @Override
-               public void close() throws Exception {
-                       boolean seenAll = true;
-                       if (windowCounts.size() == numKeys) {
-                               for (Integer windowCount: 
windowCounts.values()) {
-                                       if (windowCount < numWindowsExpected) {
-                                               seenAll = false;
-                                               break;
-                                       }
-                               }
-                       }
-                       assertTrue("The sink must see all expected windows.", 
seenAll);
-               }
-
-               @Override
-               public void invoke(Tuple4<Long, Long, Long, IntType> value) 
throws Exception {
-
+               public void updateCount(Tuple4<Long, Long, Long, IntType> 
value, Map<Long, Integer> windowCounts) {
                        // verify the contents of that window, Tuple4.f1 and 
.f2 are the window start/end
                        // the sum should be "sum (start .. end-1)"
 
                        int expectedSum = 0;
-                       for (long i = value.f1; i < value.f2; i++) {
+                       // we shorten the range if it goes beyond 
elementsPerKey, because those are "incomplete" sliding windows
+                       long countUntil = Math.min(value.f2, elementsPerKey);
+                       for (long i = value.f1; i < countUntil; i++) {
                                // only sum up positive vals, to filter out the 
negative start of the
                                // first sliding windows
                                if (i > 0) {
@@ -753,142 +645,55 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
 
                        assertEquals("Window start: " + value.f1 + " end: " + 
value.f2, expectedSum, value.f3.value);
 
-                       Integer curr = windowCounts.get(value.f0);
-                       if (curr != null) {
-                               windowCounts.put(value.f0, curr + 1);
-                       }
-                       else {
-                               windowCounts.put(value.f0, 1);
-                       }
-
-                       if (windowCounts.size() == numKeys) {
-                               boolean seenAll = true;
-                               for (Integer windowCount: 
windowCounts.values()) {
-                                       if (windowCount < numWindowsExpected) {
-                                               seenAll = false;
-                                               break;
-                                       } else if (windowCount > 
numWindowsExpected) {
-                                               fail("Window count to high: " + 
windowCount);
-                                       }
-                               }
-
-                               if (seenAll) {
-                                       // exit
-                                       throw new SuccessException();
-                               }
-
-                       }
-               }
-
-               @Override
-               public List<HashMap<Long, Integer>> snapshotState(long 
checkpointId, long timestamp) throws Exception {
-                       return Collections.singletonList(this.windowCounts);
-               }
-
-               @Override
-               public void restoreState(List<HashMap<Long, Integer>> state) 
throws Exception {
-                       if (state.isEmpty() || state.size() > 1) {
-                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
-                       }
-                       windowCounts.putAll(state.get(0));
+                       windowCounts.merge(value.f0, 1, (val, increment) -> val 
+ increment);
                }
        }
 
-       // Sink for validating the stateful window counts
-       private static class CountValidatingSink extends 
RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
-                       implements ListCheckpointed<HashMap<Long, Integer>> {
-
-               private final HashMap<Long, Integer> windowCounts = new 
HashMap<>();
+       static class SinkValidatorCheckFun implements 
ValidatingSink.ResultChecker {
 
                private final int numKeys;
                private final int numWindowsExpected;
 
-               private CountValidatingSink(int numKeys, int 
numWindowsExpected) {
+               SinkValidatorCheckFun(int numKeys, int elementsPerKey, int 
elementsPerWindow) {
                        this.numKeys = numKeys;
-                       this.numWindowsExpected = numWindowsExpected;
+                       this.numWindowsExpected = elementsPerKey / 
elementsPerWindow;
                }
 
                @Override
-               public void open(Configuration parameters) throws Exception {
-                       // this sink can only work with DOP 1
-                       assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
-               }
-
-               @Override
-               public void close() throws Exception {
-                       boolean seenAll = true;
+               public boolean checkResult(Map<Long, Integer> windowCounts) {
                        if (windowCounts.size() == numKeys) {
-                               for (Integer windowCount: 
windowCounts.values()) {
+                               for (Integer windowCount : 
windowCounts.values()) {
                                        if (windowCount < numWindowsExpected) {
-                                               seenAll = false;
-                                               break;
+                                               return false;
                                        }
                                }
+                               return true;
                        }
-                       assertTrue("The source must see all expected windows.", 
seenAll);
+                       return false;
                }
+       }
 
-               @Override
-               public void invoke(Tuple4<Long, Long, Long, IntType> value) 
throws Exception {
-
-                       Integer curr = windowCounts.get(value.f0);
-                       if (curr != null) {
-                               windowCounts.put(value.f0, curr + 1);
-                       }
-                       else {
-                               windowCounts.put(value.f0, 1);
-                       }
-
-                       // verify the contents of that window, the contents 
should be:
-                       // (key + num windows so far)
-
-                       assertEquals("Window counts don't match for key " + 
value.f0 + ".", value.f0.intValue() + windowCounts.get(value.f0), 
value.f3.value);
-
-                       boolean seenAll = true;
-                       if (windowCounts.size() == numKeys) {
-                               for (Integer windowCount: 
windowCounts.values()) {
-                                       if (windowCount < numWindowsExpected) {
-                                               seenAll = false;
-                                               break;
-                                       } else if (windowCount > 
numWindowsExpected) {
-                                               fail("Window count to high: " + 
windowCount);
-                                       }
-                               }
+       static class KeyedEventTimeGenerator implements 
FailingSource.EventEmittingGenerator {
 
-                               if (seenAll) {
-                                       // exit
-                                       throw new SuccessException();
-                               }
+               private final int keyUniverseSize;
+               private final int watermarkTrailing;
 
-                       }
-               }
-
-               @Override
-               public List<HashMap<Long, Integer>> snapshotState(long 
checkpointId, long timestamp) throws Exception {
-                       return Collections.singletonList(this.windowCounts);
+               public KeyedEventTimeGenerator(int keyUniverseSize, int 
numElementsPerWindow) {
+                       this.keyUniverseSize = keyUniverseSize;
+                       // we let the watermark a bit behind, so that there can 
be in-flight timers that required checkpointing
+                       // to include correct timer snapshots in our testing.
+                       this.watermarkTrailing = 4 * numElementsPerWindow / 3;
                }
 
                @Override
-               public void restoreState(List<HashMap<Long, Integer>> state) 
throws Exception {
-                       if (state.isEmpty() || state.size() > 1) {
-                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+               public void emitEvent(SourceFunction.SourceContext<Tuple2<Long, 
IntType>> ctx, int eventSequenceNo) {
+                       final IntType intTypeNext = new 
IntType(eventSequenceNo);
+                       for (long i = 0; i < keyUniverseSize; i++) {
+                               final Tuple2<Long, IntType> generatedEvent = 
new Tuple2<>(i, intTypeNext);
+                               ctx.collectWithTimestamp(generatedEvent, 
eventSequenceNo);
                        }
-                       this.windowCounts.putAll(state.get(0));
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       private static class IntType {
-
-               public int value;
-
-               public IntType() {}
 
-               public IntType(int value) {
-                       this.value = value;
+                       ctx.emitWatermark(new Watermark(eventSequenceNo - 
watermarkTrailing));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d309e61e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index b6163e8..b0e2967 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -25,18 +25,17 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.test.checkpointing.utils.FailingSource;
+import org.apache.flink.test.checkpointing.utils.IntType;
+import org.apache.flink.test.checkpointing.utils.ValidatingSink;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
@@ -47,9 +46,7 @@ import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
+import java.util.Map;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.flink.test.util.TestUtils.tryExecute;
@@ -92,7 +89,6 @@ public class WindowCheckpointingITCase extends TestLogger {
        @Test
        public void testTumblingProcessingTimeWindow() {
                final int numElements = 3000;
-               FailingSource.reset();
 
                try {
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -100,11 +96,14 @@ public class WindowCheckpointingITCase extends TestLogger {
                        env.setStreamTimeCharacteristic(timeCharacteristic);
                        env.getConfig().setAutoWatermarkInterval(10);
                        env.enableCheckpointing(100);
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
                        env.getConfig().disableSysoutLogging();
 
+                       SinkValidatorUpdaterAndChecker updaterAndChecker =
+                               new SinkValidatorUpdaterAndChecker(numElements, 
1);
+
                        env
-                                       .addSource(new 
FailingSource(numElements, numElements / 3))
+                                       .addSource(new FailingSource(new 
Generator(), numElements, timeCharacteristic))
                                        .rebalance()
                                        .keyBy(0)
                                        .timeWindow(Time.of(100, MILLISECONDS))
@@ -130,11 +129,12 @@ public class WindowCheckpointingITCase extends TestLogger 
{
 
                                                        for (Tuple2<Long, 
IntType> value : values) {
                                                                
assertEquals(value.f0.intValue(), value.f1.value);
-                                                               out.collect(new 
Tuple2<Long, IntType>(value.f0, new IntType(1)));
+                                                               out.collect(new 
Tuple2<>(value.f0, new IntType(1)));
                                                        }
                                                }
                                        })
-                                       .addSink(new 
ValidatingSink(numElements, 1)).setParallelism(1);
+                               .addSink(new 
ValidatingSink<>(updaterAndChecker, updaterAndChecker, timeCharacteristic))
+                               .setParallelism(1);
 
                        tryExecute(env, "Tumbling Window Test");
                }
@@ -147,7 +147,6 @@ public class WindowCheckpointingITCase extends TestLogger {
        @Test
        public void testSlidingProcessingTimeWindow() {
                final int numElements = 3000;
-               FailingSource.reset();
 
                try {
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -155,11 +154,12 @@ public class WindowCheckpointingITCase extends TestLogger 
{
                        env.setStreamTimeCharacteristic(timeCharacteristic);
                        env.getConfig().setAutoWatermarkInterval(10);
                        env.enableCheckpointing(100);
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
                        env.getConfig().disableSysoutLogging();
-
+                       SinkValidatorUpdaterAndChecker updaterAndChecker =
+                               new SinkValidatorUpdaterAndChecker(numElements, 
3);
                        env
-                                       .addSource(new 
FailingSource(numElements, numElements / 3))
+                                       .addSource(new FailingSource(new 
Generator(), numElements, timeCharacteristic))
                                        .rebalance()
                                        .keyBy(0)
                                        .timeWindow(Time.of(150, MILLISECONDS), 
Time.of(50, MILLISECONDS))
@@ -185,13 +185,14 @@ public class WindowCheckpointingITCase extends TestLogger 
{
 
                                                        for (Tuple2<Long, 
IntType> value : values) {
                                                                
assertEquals(value.f0.intValue(), value.f1.value);
-                                                               out.collect(new 
Tuple2<Long, IntType>(value.f0, new IntType(1)));
+                                                               out.collect(new 
Tuple2<>(value.f0, new IntType(1)));
                                                        }
                                                }
                                        })
-                                       .addSink(new 
ValidatingSink(numElements, 3)).setParallelism(1);
+                               .addSink(new 
ValidatingSink<>(updaterAndChecker, updaterAndChecker, timeCharacteristic))
+                               .setParallelism(1);
 
-                       tryExecute(env, "Tumbling Window Test");
+                       tryExecute(env, "Sliding Window Test");
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -202,7 +203,6 @@ public class WindowCheckpointingITCase extends TestLogger {
        @Test
        public void testAggregatingTumblingProcessingTimeWindow() {
                final int numElements = 3000;
-               FailingSource.reset();
 
                try {
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -210,11 +210,12 @@ public class WindowCheckpointingITCase extends TestLogger 
{
                        env.setStreamTimeCharacteristic(timeCharacteristic);
                        env.getConfig().setAutoWatermarkInterval(10);
                        env.enableCheckpointing(100);
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
                        env.getConfig().disableSysoutLogging();
-
+                       SinkValidatorUpdaterAndChecker updaterAndChecker =
+                               new SinkValidatorUpdaterAndChecker(numElements, 
1);
                        env
-                                       .addSource(new 
FailingSource(numElements, numElements / 3))
+                                       .addSource(new FailingSource(new 
Generator(), numElements, timeCharacteristic))
                                        .map(new MapFunction<Tuple2<Long, 
IntType>, Tuple2<Long, IntType>>() {
                                                @Override
                                                public Tuple2<Long, IntType> 
map(Tuple2<Long, IntType> value) {
@@ -234,9 +235,10 @@ public class WindowCheckpointingITCase extends TestLogger {
                                                        return new 
Tuple2<>(a.f0, new IntType(1));
                                                }
                                        })
-                                       .addSink(new 
ValidatingSink(numElements, 1)).setParallelism(1);
+                               .addSink(new 
ValidatingSink<>(updaterAndChecker, updaterAndChecker, timeCharacteristic))
+                               .setParallelism(1);
 
-                       tryExecute(env, "Tumbling Window Test");
+                       tryExecute(env, "Aggregating Tumbling Window Test");
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -247,7 +249,6 @@ public class WindowCheckpointingITCase extends TestLogger {
        @Test
        public void testAggregatingSlidingProcessingTimeWindow() {
                final int numElements = 3000;
-               FailingSource.reset();
 
                try {
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -255,11 +256,12 @@ public class WindowCheckpointingITCase extends TestLogger 
{
                        env.setStreamTimeCharacteristic(timeCharacteristic);
                        env.getConfig().setAutoWatermarkInterval(10);
                        env.enableCheckpointing(100);
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
                        env.getConfig().disableSysoutLogging();
-
+                       SinkValidatorUpdaterAndChecker updaterAndChecker =
+                               new SinkValidatorUpdaterAndChecker(numElements, 
3);
                        env
-                                       .addSource(new 
FailingSource(numElements, numElements / 3))
+                                       .addSource(new FailingSource(new 
Generator(), numElements, timeCharacteristic))
                                        .map(new MapFunction<Tuple2<Long, 
IntType>, Tuple2<Long, IntType>>() {
                                                @Override
                                                public Tuple2<Long, IntType> 
map(Tuple2<Long, IntType> value) {
@@ -278,9 +280,10 @@ public class WindowCheckpointingITCase extends TestLogger {
                                                        return new 
Tuple2<>(a.f0, new IntType(1));
                                                }
                                        })
-                                       .addSink(new 
ValidatingSink(numElements, 3)).setParallelism(1);
+                               .addSink(new 
ValidatingSink<>(updaterAndChecker, updaterAndChecker, timeCharacteristic))
+                               .setParallelism(1);
 
-                       tryExecute(env, "Tumbling Window Test");
+                       tryExecute(env, "Aggregating Sliding Window Test");
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -292,152 +295,50 @@ public class WindowCheckpointingITCase extends 
TestLogger {
        //  Utilities
        // 
------------------------------------------------------------------------
 
-       private static class FailingSource extends 
RichSourceFunction<Tuple2<Long, IntType>>
-                       implements ListCheckpointed<Integer>, 
CheckpointListener {
-               private static volatile boolean failedBefore = false;
-
-               private final int numElementsToEmit;
-               private final int failureAfterNumElements;
-
-               private volatile int numElementsEmitted;
-               private volatile int numSuccessfulCheckpoints;
-               private volatile boolean running = true;
-
-               private FailingSource(int numElementsToEmit, int 
failureAfterNumElements) {
-                       this.numElementsToEmit = numElementsToEmit;
-                       this.failureAfterNumElements = failureAfterNumElements;
-               }
-
-               @Override
-               public void open(Configuration parameters) {
-                       // non-parallel source
-                       assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
-               }
-
-               @Override
-               public void run(SourceContext<Tuple2<Long, IntType>> ctx) 
throws Exception {
-                       // we loop longer than we have elements, to permit 
delayed checkpoints
-                       // to still cause a failure
-                       while (running) {
-                               if (!failedBefore) {
-                                       // delay a bit, if we have not failed 
before
-                                       Thread.sleep(1);
-                                       if (numSuccessfulCheckpoints >= 2 && 
numElementsEmitted >= failureAfterNumElements) {
-                                               // cause a failure if we have 
not failed before and have reached
-                                               // enough completed checkpoints 
and elements
-                                               failedBefore = true;
-                                               throw new Exception("Artificial 
Failure");
-                                       }
-                               }
-
-                               if (numElementsEmitted < numElementsToEmit &&
-                                               (failedBefore || 
numElementsEmitted <= failureAfterNumElements)) {
-                                       // the function failed before, or we 
are in the elements before the failure
-                                       synchronized (ctx.getCheckpointLock()) {
-                                               int next = numElementsEmitted++;
-                                               ctx.collect(new Tuple2<Long, 
IntType>((long) next, new IntType(next)));
-                                       }
-                               } else {
-                                       // if our work is done, delay a bit to 
prevent busy waiting
-                                       Thread.sleep(10);
-                               }
-                       }
-               }
+       static class Generator implements FailingSource.EventEmittingGenerator {
 
                @Override
-               public void cancel() {
-                       running = false;
-               }
-
-               @Override
-               public void notifyCheckpointComplete(long checkpointId) {
-                       numSuccessfulCheckpoints++;
-               }
-
-               @Override
-               public List<Integer> snapshotState(long checkpointId, long 
timestamp) throws Exception {
-                       return 
Collections.singletonList(this.numElementsEmitted);
-               }
-
-               @Override
-               public void restoreState(List<Integer> state) throws Exception {
-                       if (state.isEmpty() || state.size() > 1) {
-                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
-                       }
-                       this.numElementsEmitted = state.get(0);
-               }
-
-               public static void reset() {
-                       failedBefore = false;
+               public void emitEvent(SourceFunction.SourceContext<Tuple2<Long, 
IntType>> ctx, int eventSequenceNo) {
+                       ctx.collect(new Tuple2<>((long) eventSequenceNo, new 
IntType(eventSequenceNo)));
                }
        }
 
-       private static class ValidatingSink extends 
RichSinkFunction<Tuple2<Long, IntType>>
-                       implements ListCheckpointed<HashMap<Long, Integer>> {
-
-               private final HashMap<Long, Integer> counts = new HashMap<>();
+       static class SinkValidatorUpdaterAndChecker
+               implements ValidatingSink.CountUpdater<Tuple2<Long, IntType>>, 
ValidatingSink.ResultChecker {
 
                private final int elementCountExpected;
                private final int countPerElementExpected;
 
-               private int aggCount;
-
-               private ValidatingSink(int elementCountExpected, int 
countPerElementExpected) {
+               SinkValidatorUpdaterAndChecker(int elementCountExpected, int 
countPerElementExpected) {
                        this.elementCountExpected = elementCountExpected;
                        this.countPerElementExpected = countPerElementExpected;
                }
 
                @Override
-               public void open(Configuration parameters) throws Exception {
-                       // this sink can only work with DOP 1
-                       assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
-                       checkSuccess();
+               public void updateCount(Tuple2<Long, IntType> value, Map<Long, 
Integer> windowCounts) {
+                       windowCounts.merge(value.f0, value.f1.value, (a, b) -> 
a + b);
                }
 
                @Override
-               public void invoke(Tuple2<Long, IntType> value) throws 
Exception {
-                       Integer curr = counts.get(value.f0);
-                       if (curr != null) {
-                               counts.put(value.f0, curr + value.f1.value);
-                       }
-                       else {
-                               counts.put(value.f0, value.f1.value);
-                       }
+               public boolean checkResult(Map<Long, Integer> windowCounts) {
+                       int aggCount = 0;
 
-                       // check if we have seen all we expect
-                       aggCount += value.f1.value;
-                       checkSuccess();
-               }
-
-               private void checkSuccess() throws SuccessException {
-                       if (aggCount >= elementCountExpected * 
countPerElementExpected) {
-                               // we are done. validate
-                               assertEquals(elementCountExpected, 
counts.size());
-
-                               for (Integer i : counts.values()) {
-                                       assertEquals(countPerElementExpected, 
i.intValue());
-                               }
-
-                               // exit
-                               throw new SuccessException();
+                       for (Integer i : windowCounts.values()) {
+                               aggCount += i;
                        }
-               }
 
-               @Override
-               public List<HashMap<Long, Integer>> snapshotState(long 
checkpointId, long timestamp) throws Exception {
-                       return Collections.singletonList(this.counts);
-               }
-
-               @Override
-               public void restoreState(List<HashMap<Long, Integer>> state) 
throws Exception {
-                       if (state.isEmpty() || state.size() > 1) {
-                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+                       if (aggCount < elementCountExpected * 
countPerElementExpected
+                               || elementCountExpected != windowCounts.size()) 
{
+                               return false;
                        }
-                       this.counts.putAll(state.get(0));
 
-                       for (Integer i : state.get(0).values()) {
-                               this.aggCount += i;
+                       for (int i : windowCounts.values()) {
+                               if (countPerElementExpected != i) {
+                                       return false;
+                               }
                        }
+
+                       return true;
                }
        }
 
@@ -452,22 +353,4 @@ public class WindowCheckpointingITCase extends TestLogger {
                                new 
TimeCharacteristic[]{TimeCharacteristic.IngestionTime}
                );
        }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       /**
-        * POJO with int value.
-        */
-       public static class IntType {
-
-               public int value;
-
-               public IntType() {}
-
-               public IntType(int value) {
-                       this.value = value;
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d309e61e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java
new file mode 100644
index 0000000..822d73b
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Source for window checkpointing IT cases that can introduce artificial 
failures.
+ */
+public class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
+       implements ListCheckpointed<Integer>, CheckpointListener {
+
+       /**
+        * Function to generate and emit the test events (and watermarks if 
required).
+        */
+       @FunctionalInterface
+       public interface EventEmittingGenerator extends Serializable {
+               void emitEvent(SourceContext<Tuple2<Long, IntType>> ctx, int 
eventSequenceNo);
+       }
+
+       private static final long INITIAL = Long.MIN_VALUE;
+       private static final long STATEFUL_CHECKPOINT_COMPLETED = 
Long.MIN_VALUE;
+
+       @Nonnull
+       private final EventEmittingGenerator eventEmittingGenerator;
+       private final int expectedEmitCalls;
+       private final int failureAfterNumElements;
+       private final boolean usingProcessingTime;
+       private final AtomicLong checkpointStatus;
+
+       private int emitCallCount;
+       private volatile boolean running;
+
+       public FailingSource(
+               @Nonnull EventEmittingGenerator eventEmittingGenerator,
+               @Nonnegative int numberOfGeneratorInvocations) {
+               this(eventEmittingGenerator, numberOfGeneratorInvocations, 
TimeCharacteristic.EventTime);
+       }
+
+       public FailingSource(
+               @Nonnull EventEmittingGenerator eventEmittingGenerator,
+               @Nonnegative int numberOfGeneratorInvocations,
+               @Nonnull TimeCharacteristic timeCharacteristic) {
+               this.eventEmittingGenerator = eventEmittingGenerator;
+               this.running = true;
+               this.emitCallCount = 0;
+               this.expectedEmitCalls = numberOfGeneratorInvocations;
+               this.failureAfterNumElements = numberOfGeneratorInvocations / 2;
+               this.checkpointStatus = new AtomicLong(INITIAL);
+               this.usingProcessingTime = timeCharacteristic == 
TimeCharacteristic.ProcessingTime;
+       }
+
+       @Override
+       public void open(Configuration parameters) {
+               // non-parallel source
+               assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
+       }
+
+       @Override
+       public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws 
Exception {
+
+               final RuntimeContext runtimeContext = getRuntimeContext();
+               // detect if this task is "the chosen one" and should fail (via 
subtaskidx), if it did not fail before (via attempt)
+               final boolean failThisTask =
+                       runtimeContext.getAttemptNumber() == 0 && 
runtimeContext.getIndexOfThisSubtask() == 0;
+
+               // we loop longer than we have elements, to permit delayed 
checkpoints
+               // to still cause a failure
+               while (running) {
+
+                       // the function failed before, or we are in the 
elements before the failure
+                       synchronized (ctx.getCheckpointLock()) {
+                               eventEmittingGenerator.emitEvent(ctx, 
emitCallCount++);
+                               running &= (emitCallCount < expectedEmitCalls);
+                       }
+
+                       if (emitCallCount < failureAfterNumElements) {
+                               Thread.sleep(1);
+                       } else if (failThisTask && emitCallCount == 
failureAfterNumElements) {
+                               // wait for a pending checkpoint that fulfills 
our requirements if needed
+                               while (checkpointStatus.get() != 
STATEFUL_CHECKPOINT_COMPLETED) {
+                                       Thread.sleep(1);
+                               }
+                               throw new Exception("Artificial Failure");
+                       }
+               }
+
+               if (usingProcessingTime) {
+                       while (running) {
+                               Thread.sleep(10);
+                       }
+               }
+       }
+
+       @Override
+       public void cancel() {
+               running = false;
+       }
+
+       @Override
+       public void notifyCheckpointComplete(long checkpointId) {
+               // This will unblock the task for failing, if this is the 
checkpoint we are waiting for
+               checkpointStatus.compareAndSet(checkpointId, 
STATEFUL_CHECKPOINT_COMPLETED);
+       }
+
+       @Override
+       public List<Integer> snapshotState(long checkpointId, long timestamp) 
throws Exception {
+               // We accept a checkpoint as basis if it should have a "decent 
amount" of state
+               if (emitCallCount > failureAfterNumElements / 2) {
+                       // This means we are waiting for notification of this 
checkpoint to completed now.
+                       checkpointStatus.compareAndSet(INITIAL, checkpointId);
+               }
+               return Collections.singletonList(this.emitCallCount);
+       }
+
+       @Override
+       public void restoreState(List<Integer> state) throws Exception {
+               if (state.isEmpty() || state.size() > 1) {
+                       throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+               }
+               this.emitCallCount = state.get(0);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d309e61e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/IntType.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/IntType.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/IntType.java
new file mode 100644
index 0000000..3bc4ea0
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/IntType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+/**
+ * Test type that wraps an int.
+ */
+public class IntType {
+
+       public int value;
+
+       public IntType(int value) {
+               this.value = value;
+       }
+
+       @Override
+       public String toString() {
+               return "IntType{" +
+                       "value=" + value +
+                       '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d309e61e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/ValidatingSink.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/ValidatingSink.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/ValidatingSink.java
new file mode 100644
index 0000000..b352738
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/ValidatingSink.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.test.util.SuccessException;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Generalized sink for validation of window checkpointing IT cases.
+ */
+public class ValidatingSink<T> extends RichSinkFunction<T>
+       implements ListCheckpointed<HashMap<Long, Integer>> {
+
+       /**
+        * Function to check if the window counts are as expected.
+        */
+       @FunctionalInterface
+       public interface ResultChecker extends Serializable {
+               boolean checkResult(Map<Long, Integer> windowCounts);
+       }
+
+       /**
+        * Function that updates the window counts from an update event.
+        *
+        * @param <T> type of the update event.
+        */
+       public interface CountUpdater<T> extends Serializable {
+               void updateCount(T update, Map<Long, Integer> windowCounts);
+       }
+
+       @Nonnull
+       private final ResultChecker resultChecker;
+
+       @Nonnull
+       private final CountUpdater<T> countUpdater;
+
+       @Nonnull
+       private final HashMap<Long, Integer> windowCounts;
+
+       private final boolean usingProcessingTime;
+
+       public ValidatingSink(
+               @Nonnull CountUpdater<T> countUpdater,
+               @Nonnull ResultChecker resultChecker) {
+               this(countUpdater, resultChecker, TimeCharacteristic.EventTime);
+       }
+
+       public ValidatingSink(
+               @Nonnull CountUpdater<T> countUpdater,
+               @Nonnull ResultChecker resultChecker,
+               @Nonnull TimeCharacteristic timeCharacteristic) {
+
+               this.resultChecker = resultChecker;
+               this.countUpdater = countUpdater;
+               this.usingProcessingTime = TimeCharacteristic.ProcessingTime == 
timeCharacteristic;
+               this.windowCounts = new HashMap<>();
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               // this sink can only work with DOP 1
+               assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
+               if (usingProcessingTime && 
resultChecker.checkResult(windowCounts)) {
+                       throw new SuccessException();
+               }
+       }
+
+       @Override
+       public void close() {
+               if (resultChecker.checkResult(windowCounts)) {
+                       if (usingProcessingTime) {
+                               throw new SuccessException();
+                       }
+               } else {
+                       throw new AssertionError("Test failed check.");
+               }
+       }
+
+       @Override
+       public void invoke(T value, Context context) throws Exception {
+               countUpdater.updateCount(value, windowCounts);
+               if (usingProcessingTime && 
resultChecker.checkResult(windowCounts)) {
+                       throw new SuccessException();
+               }
+       }
+
+       @Override
+       public List<HashMap<Long, Integer>> snapshotState(long checkpointId, 
long timestamp) throws Exception {
+               return Collections.singletonList(this.windowCounts);
+       }
+
+       @Override
+       public void restoreState(List<HashMap<Long, Integer>> state) throws 
Exception {
+               if (state.isEmpty() || state.size() > 1) {
+                       throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+               }
+               windowCounts.putAll(state.get(0));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d309e61e/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java 
b/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java
index 22ac02b..d8e2a8f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java
@@ -21,6 +21,6 @@ package org.apache.flink.test.util;
 /**
  * Exception that is thrown to terminate a program and indicate success.
  */
-public class SuccessException extends Exception {
+public class SuccessException extends RuntimeException {
        private static final long serialVersionUID = -7011865671593955887L;
 }

Reply via email to