Repository: flink Updated Branches: refs/heads/release-1.6 6a36afd3a -> f19337a7d
[FLINK-9902][tests] Improve and refactor window checkpointing IT cases This closes #6376. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f19337a7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f19337a7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f19337a7 Branch: refs/heads/release-1.6 Commit: f19337a7d7576ef905a59088ba8279c2d0397a1f Parents: 6a36afd Author: Stefan Richter <[email protected]> Authored: Wed Jul 18 23:29:56 2018 +0200 Committer: Stefan Richter <[email protected]> Committed: Fri Jul 20 15:58:44 2018 +0200 ---------------------------------------------------------------------- .../EventTimeAllWindowCheckpointingITCase.java | 296 ++------------- .../EventTimeWindowCheckpointingITCase.java | 375 +++++-------------- .../WindowCheckpointingITCase.java | 229 +++-------- .../test/checkpointing/utils/FailingSource.java | 155 ++++++++ .../flink/test/checkpointing/utils/IntType.java | 38 ++ .../checkpointing/utils/ValidatingSink.java | 128 +++++++ .../flink/test/util/SuccessException.java | 2 +- 7 files changed, 505 insertions(+), 718 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f19337a7/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java index 9e14b26..5dc2aa0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java @@ -26,31 +26,23 @@ import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction; -import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.test.checkpointing.utils.FailingSource; +import org.apache.flink.test.checkpointing.utils.IntType; +import org.apache.flink.test.checkpointing.utils.ValidatingSink; import org.apache.flink.test.util.MiniClusterResource; import org.apache.flink.test.util.MiniClusterResourceConfiguration; -import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; import org.junit.ClassRule; import org.junit.Test; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; - import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.apache.flink.test.util.TestUtils.tryExecute; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -88,20 +80,17 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { final int numElementsPerKey = 3000; final int windowSize = 100; final int numKeys = 1; - FailingSource.reset(); try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(PARALLELISM); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(100); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); env.getConfig().disableSysoutLogging(); env - .addSource(new FailingSource(numKeys, - numElementsPerKey, - numElementsPerKey / 3)) + .addSource(new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey)) .rebalance() .timeWindowAll(Time.of(windowSize, MILLISECONDS)) .apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() { @@ -133,9 +122,12 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum))); } }) - .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1); + .addSink(new ValidatingSink<>( + new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(numElementsPerKey), + new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))) + .setParallelism(1); - tryExecute(env, "Tumbling Window Test"); + env.execute("Tumbling Window Test"); } catch (Exception e) { e.printStackTrace(); @@ -149,7 +141,6 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { final int windowSize = 1000; final int windowSlide = 100; final int numKeys = 1; - FailingSource.reset(); try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -160,7 +151,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { env.getConfig().disableSysoutLogging(); env - .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3)) + .addSource(new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(numKeys, windowSlide), numElementsPerKey)) .rebalance() .timeWindowAll(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS)) .apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() { @@ -192,9 +183,12 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum))); } }) - .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1); + .addSink(new ValidatingSink<>( + new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(numElementsPerKey), + new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSlide))) + .setParallelism(1); - tryExecute(env, "Sliding Window Test"); + env.execute("Sliding Window Test"); } catch (Exception e) { e.printStackTrace(); @@ -207,20 +201,17 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { final int numElementsPerKey = 3000; final int windowSize = 100; final int numKeys = 1; - FailingSource.reset(); try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(PARALLELISM); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(100); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); env.getConfig().disableSysoutLogging(); env - .addSource(new FailingSource(numKeys, - numElementsPerKey, - numElementsPerKey / 3)) + .addSource(new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey)) .rebalance() .timeWindowAll(Time.of(windowSize, MILLISECONDS)) .reduce( @@ -261,9 +252,12 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { } } }) - .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1); + .addSink(new ValidatingSink<>( + new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(numElementsPerKey), + new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))) + .setParallelism(1); - tryExecute(env, "Tumbling Window Test"); + env.execute("Tumbling Window Test"); } catch (Exception e) { e.printStackTrace(); @@ -276,20 +270,17 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { final int numElementsPerKey = 3000; final int windowSize = 100; final int numKeys = 1; - FailingSource.reset(); try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(PARALLELISM); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(100); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); env.getConfig().disableSysoutLogging(); env - .addSource(new FailingSource(numKeys, - numElementsPerKey, - numElementsPerKey / 3)) + .addSource(new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey)) .rebalance() .timeWindowAll(Time.of(windowSize, MILLISECONDS)) .fold(new Tuple4<>(0L, 0L, 0L, new IntType(0)), @@ -329,9 +320,12 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { } } }) - .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1); + .addSink(new ValidatingSink<>( + new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(numElementsPerKey), + new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))) + .setParallelism(1); - tryExecute(env, "Tumbling Window Test"); + env.execute("Tumbling Window Test"); } catch (Exception e) { e.printStackTrace(); @@ -345,20 +339,17 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { final int windowSize = 1000; final int windowSlide = 100; final int numKeys = 1; - FailingSource.reset(); try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(PARALLELISM); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(100); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); env.getConfig().disableSysoutLogging(); env - .addSource(new FailingSource(numKeys, - numElementsPerKey, - numElementsPerKey / 3)) + .addSource(new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(numKeys, windowSlide), numElementsPerKey)) .rebalance() .timeWindowAll(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS)) @@ -400,229 +391,16 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { } } }) - .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1); + .addSink(new ValidatingSink<>( + new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(numElementsPerKey), + new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSlide))) + .setParallelism(1); - tryExecute(env, "Tumbling Window Test"); + env.execute("Tumbling Window Test"); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>> - implements ListCheckpointed<Integer>, CheckpointListener { - private static volatile boolean failedBefore = false; - - private final int numKeys; - private final int numElementsToEmit; - private final int failureAfterNumElements; - - private volatile int numElementsEmitted; - private volatile int numSuccessfulCheckpoints; - private volatile boolean running = true; - - private FailingSource(int numKeys, int numElementsToEmitPerKey, int failureAfterNumElements) { - this.numKeys = numKeys; - this.numElementsToEmit = numElementsToEmitPerKey; - this.failureAfterNumElements = failureAfterNumElements; - } - - @Override - public void open(Configuration parameters) { - // non-parallel source - assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); - } - - @Override - public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception { - // we loop longer than we have elements, to permit delayed checkpoints - // to still cause a failure - while (running) { - - if (!failedBefore) { - // delay a bit, if we have not failed before - Thread.sleep(1); - if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) { - // cause a failure if we have not failed before and have reached - // enough completed checkpoints and elements - failedBefore = true; - throw new Exception("Artificial Failure"); - } - } - - if (numElementsEmitted < numElementsToEmit && - (failedBefore || numElementsEmitted <= failureAfterNumElements)) { - // the function failed before, or we are in the elements before the failure - synchronized (ctx.getCheckpointLock()) { - int next = numElementsEmitted++; - for (long i = 0; i < numKeys; i++) { - ctx.collectWithTimestamp(new Tuple2<>(i, new IntType(next)), next); - } - ctx.emitWatermark(new Watermark(next)); - } - } - else { - // if our work is done, delay a bit to prevent busy waiting - Thread.sleep(1); - } - } - } - - @Override - public void cancel() { - running = false; - } - - @Override - public void notifyCheckpointComplete(long checkpointId) { - numSuccessfulCheckpoints++; - } - - public static void reset() { - failedBefore = false; - } - - @Override - public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { - return Collections.singletonList(this.numElementsEmitted); - } - - @Override - public void restoreState(List<Integer> state) throws Exception { - if (state.isEmpty() || state.size() > 1) { - throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); - } - this.numElementsEmitted = state.get(0); - } - } - - private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>> - implements ListCheckpointed<HashMap<Long, Integer>> { - - private final HashMap<Long, Integer> windowCounts = new HashMap<>(); - - private final int numKeys; - private final int numWindowsExpected; - - private ValidatingSink(int numKeys, int numWindowsExpected) { - this.numKeys = numKeys; - this.numWindowsExpected = numWindowsExpected; - } - - @Override - public void open(Configuration parameters) throws Exception { - // this sink can only work with DOP 1 - assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); - - // it can happen that a checkpoint happens when the complete success state is - // already set. In that case we restart with the final state and would never - // finish because no more elements arrive. - if (windowCounts.size() == numKeys) { - boolean seenAll = true; - for (Integer windowCount: windowCounts.values()) { - if (windowCount != numWindowsExpected) { - seenAll = false; - break; - } - } - if (seenAll) { - throw new SuccessException(); - } - } - } - - @Override - public void close() throws Exception { - boolean seenAll = true; - if (windowCounts.size() == numKeys) { - for (Integer windowCount: windowCounts.values()) { - if (windowCount != numWindowsExpected) { - seenAll = false; - break; - } - } - } - assertTrue("The source must see all expected windows.", seenAll); - } - - @Override - public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception { - - // verify the contents of that window, Tuple4.f1 and .f2 are the window start/end - // the sum should be "sum (start .. end-1)" - - int expectedSum = 0; - for (long i = value.f1; i < value.f2; i++) { - // only sum up positive vals, to filter out the negative start of the - // first sliding windows - if (i > 0) { - expectedSum += i; - } - } - - assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value); - - Integer curr = windowCounts.get(value.f0); - if (curr != null) { - windowCounts.put(value.f0, curr + 1); - } - else { - windowCounts.put(value.f0, 1); - } - - boolean seenAll = true; - if (windowCounts.size() == numKeys) { - for (Integer windowCount: windowCounts.values()) { - if (windowCount < numWindowsExpected) { - seenAll = false; - break; - } else if (windowCount > numWindowsExpected) { - fail("Window count to high: " + windowCount); - } - } - - if (seenAll) { - // exit - throw new SuccessException(); - } - - } - } - - @Override - public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception { - return Collections.singletonList(this.windowCounts); - } - - @Override - public void restoreState(List<HashMap<Long, Integer>> state) throws Exception { - if (state.isEmpty() || state.size() > 1) { - throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); - } - this.windowCounts.putAll(state.get(0)); - } - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - /** - * Custom boxed integer type. - */ - public static class IntType { - - public int value; - - public IntType() {} - - public IntType(int value) { - this.value = value; - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/f19337a7/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index c3d93d7..e9a2e45 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -32,21 +32,20 @@ import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.test.checkpointing.utils.FailingSource; +import org.apache.flink.test.checkpointing.utils.IntType; +import org.apache.flink.test.checkpointing.utils.ValidatingSink; import org.apache.flink.test.util.MiniClusterResource; import org.apache.flink.test.util.MiniClusterResourceConfiguration; -import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -65,19 +64,10 @@ import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; +import java.util.Map; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE; -import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC; -import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.MEM; -import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.MEM_ASYNC; -import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC; -import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL; import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK; -import static org.apache.flink.test.util.TestUtils.tryExecute; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -255,20 +245,19 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { final int numElementsPerKey = numElementsPerKey(); final int windowSize = windowSize(); final int numKeys = numKeys(); - FailingSource.reset(); try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(PARALLELISM); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(100); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); env.getConfig().disableSysoutLogging(); env.setStateBackend(this.stateBackend); env.getConfig().setUseSnapshotCompression(true); env - .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3)) + .addSource(new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey)) .rebalance() .keyBy(0) .timeWindow(Time.of(windowSize, MILLISECONDS)) @@ -299,12 +288,17 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { sum += value.f1.value; key = value.f0; } - out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum))); + + final Tuple4<Long, Long, Long, IntType> result = + new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)); + out.collect(result); } }) - .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1); + .addSink(new ValidatingSink<>( + new SinkValidatorUpdateFun(numElementsPerKey), + new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))).setParallelism(1); - tryExecute(env, "Tumbling Window Test"); + env.execute("Tumbling Window Test"); } catch (Exception e) { e.printStackTrace(); @@ -326,7 +320,6 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { final int numElementsPerKey = numElementsPerKey(); final int windowSize = windowSize(); final int numKeys = numKeys(); - FailingSource.reset(); try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -334,13 +327,13 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { env.setMaxParallelism(maxParallelism); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(100); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); env.getConfig().disableSysoutLogging(); env.setStateBackend(this.stateBackend); env.getConfig().setUseSnapshotCompression(true); env - .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3)) + .addSource(new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey)) .rebalance() .keyBy(0) .timeWindow(Time.of(windowSize, MILLISECONDS)) @@ -378,9 +371,11 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { out.collect(new Tuple4<>(tuple.<Long>getField(0), window.getStart(), window.getEnd(), new IntType(count.value()))); } }) - .addSink(new CountValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1); + .addSink(new ValidatingSink<>( + new CountingSinkValidatorUpdateFun(), + new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))).setParallelism(1); - tryExecute(env, "Tumbling Window Test"); + env.execute("Tumbling Window Test"); } catch (Exception e) { e.printStackTrace(); @@ -394,7 +389,6 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { final int windowSize = windowSize(); final int windowSlide = windowSlide(); final int numKeys = numKeys(); - FailingSource.reset(); try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -402,13 +396,13 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { env.setParallelism(PARALLELISM); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(100); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); env.getConfig().disableSysoutLogging(); env.setStateBackend(this.stateBackend); env.getConfig().setUseSnapshotCompression(true); env - .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3)) + .addSource(new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSlide), numElementsPerKey)) .rebalance() .keyBy(0) .timeWindow(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS)) @@ -439,12 +433,16 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { sum += value.f1.value; key = value.f0; } - out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum))); + final Tuple4<Long, Long, Long, IntType> output = + new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)); + out.collect(output); } }) - .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1); + .addSink(new ValidatingSink<>( + new SinkValidatorUpdateFun(numElementsPerKey), + new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSlide))).setParallelism(1); - tryExecute(env, "Tumbling Window Test"); + env.execute("Tumbling Window Test"); } catch (Exception e) { e.printStackTrace(); @@ -457,20 +455,19 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { final int numElementsPerKey = numElementsPerKey(); final int windowSize = windowSize(); final int numKeys = numKeys(); - FailingSource.reset(); try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(PARALLELISM); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(100); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); env.getConfig().disableSysoutLogging(); env.setStateBackend(this.stateBackend); env.getConfig().setUseSnapshotCompression(true); env - .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3)) + .addSource(new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey)) .rebalance() .keyBy(0) .timeWindow(Time.of(windowSize, MILLISECONDS)) @@ -505,16 +502,19 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { assertTrue(open); for (Tuple2<Long, IntType> in: input) { - out.collect(new Tuple4<>(in.f0, - window.getStart(), - window.getEnd(), - in.f1)); + final Tuple4<Long, Long, Long, IntType> output = new Tuple4<>(in.f0, + window.getStart(), + window.getEnd(), + in.f1); + out.collect(output); } } }) - .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1); + .addSink(new ValidatingSink<>( + new SinkValidatorUpdateFun(numElementsPerKey), + new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))).setParallelism(1); - tryExecute(env, "Tumbling Window Test"); + env.execute("Tumbling Window Test"); } catch (Exception e) { e.printStackTrace(); @@ -528,20 +528,19 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { final int windowSize = windowSize(); final int windowSlide = windowSlide(); final int numKeys = numKeys(); - FailingSource.reset(); try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(PARALLELISM); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(100); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); env.getConfig().disableSysoutLogging(); env.setStateBackend(this.stateBackend); env.getConfig().setUseSnapshotCompression(true); env - .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3)) + .addSource(new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSlide), numElementsPerKey)) .rebalance() .keyBy(0) .timeWindow(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS)) @@ -585,9 +584,11 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { } } }) - .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1); + .addSink(new ValidatingSink<>( + new SinkValidatorUpdateFun(numElementsPerKey), + new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSlide))).setParallelism(1); - tryExecute(env, "Tumbling Window Test"); + env.execute("Tumbling Window Test"); } catch (Exception e) { e.printStackTrace(); @@ -599,151 +600,42 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { // Utilities // ------------------------------------------------------------------------ - private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>> - implements ListCheckpointed<Integer>, CheckpointListener { - private static volatile boolean failedBefore = false; - - private final int numKeys; - private final int numElementsToEmit; - private final int failureAfterNumElements; - - private volatile int numElementsEmitted; - private volatile int numSuccessfulCheckpoints; - private volatile boolean running = true; - - private FailingSource(int numKeys, int numElementsToEmitPerKey, int failureAfterNumElements) { - this.numKeys = numKeys; - this.numElementsToEmit = numElementsToEmitPerKey; - this.failureAfterNumElements = failureAfterNumElements; - } - - @Override - public void open(Configuration parameters) { - // non-parallel source - assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); - } - - @Override - public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception { - // we loop longer than we have elements, to permit delayed checkpoints - // to still cause a failure - while (running) { - - if (!failedBefore) { - // delay a bit, if we have not failed before - Thread.sleep(1); - if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) { - // cause a failure if we have not failed before and have reached - // enough completed checkpoints and elements - failedBefore = true; - throw new Exception("Artificial Failure"); - } - } - - if (numElementsEmitted < numElementsToEmit && - (failedBefore || numElementsEmitted <= failureAfterNumElements)) { - // the function failed before, or we are in the elements before the failure - synchronized (ctx.getCheckpointLock()) { - int next = numElementsEmitted++; - for (long i = 0; i < numKeys; i++) { - ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next); - } - ctx.emitWatermark(new Watermark(next)); - } - } - else { - - // if our work is done, delay a bit to prevent busy waiting - Thread.sleep(1); - } - } - } - - @Override - public void cancel() { - running = false; - } - - @Override - public void notifyCheckpointComplete(long checkpointId) { - numSuccessfulCheckpoints++; - } + /** + * For validating the stateful window counts. + */ + static class CountingSinkValidatorUpdateFun + implements ValidatingSink.CountUpdater<Tuple4<Long, Long, Long, IntType>> { @Override - public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { - return Collections.singletonList(this.numElementsEmitted); - } + public void updateCount(Tuple4<Long, Long, Long, IntType> value, Map<Long, Integer> windowCounts) { - @Override - public void restoreState(List<Integer> state) throws Exception { - if (state.isEmpty() || state.size() > 1) { - throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); - } - this.numElementsEmitted = state.get(0); - } + windowCounts.merge(value.f0, 1, (a, b) -> a + b); - public static void reset() { - failedBefore = false; + // verify the contents of that window, the contents should be: + // (key + num windows so far) + assertEquals("Window counts don't match for key " + value.f0 + ".", value.f0.intValue() + windowCounts.get(value.f0), value.f3.value); } } - private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>> - implements ListCheckpointed<HashMap<Long, Integer>> { - - private final HashMap<Long, Integer> windowCounts = new HashMap<>(); - - private final int numKeys; - private final int numWindowsExpected; + //------------------------------------ - private ValidatingSink(int numKeys, int numWindowsExpected) { - this.numKeys = numKeys; - this.numWindowsExpected = numWindowsExpected; - } + static class SinkValidatorUpdateFun implements ValidatingSink.CountUpdater<Tuple4<Long, Long, Long, IntType>> { - @Override - public void open(Configuration parameters) throws Exception { - // this sink can only work with DOP 1 - assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); + private final int elementsPerKey; - // it can happen that a checkpoint happens when the complete success state is - // already set. In that case we restart with the final state and would never - // finish because no more elements arrive. - if (windowCounts.size() == numKeys) { - boolean seenAll = true; - for (Integer windowCount: windowCounts.values()) { - if (windowCount != numWindowsExpected) { - seenAll = false; - break; - } - } - if (seenAll) { - throw new SuccessException(); - } - } + SinkValidatorUpdateFun(int elementsPerKey) { + this.elementsPerKey = elementsPerKey; } @Override - public void close() throws Exception { - boolean seenAll = true; - if (windowCounts.size() == numKeys) { - for (Integer windowCount: windowCounts.values()) { - if (windowCount < numWindowsExpected) { - seenAll = false; - break; - } - } - } - assertTrue("The sink must see all expected windows.", seenAll); - } - - @Override - public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception { - + public void updateCount(Tuple4<Long, Long, Long, IntType> value, Map<Long, Integer> windowCounts) { // verify the contents of that window, Tuple4.f1 and .f2 are the window start/end // the sum should be "sum (start .. end-1)" int expectedSum = 0; - for (long i = value.f1; i < value.f2; i++) { + // we shorten the range if it goes beyond elementsPerKey, because those are "incomplete" sliding windows + long countUntil = Math.min(value.f2, elementsPerKey); + for (long i = value.f1; i < countUntil; i++) { // only sum up positive vals, to filter out the negative start of the // first sliding windows if (i > 0) { @@ -753,142 +645,55 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value); - Integer curr = windowCounts.get(value.f0); - if (curr != null) { - windowCounts.put(value.f0, curr + 1); - } - else { - windowCounts.put(value.f0, 1); - } - - if (windowCounts.size() == numKeys) { - boolean seenAll = true; - for (Integer windowCount: windowCounts.values()) { - if (windowCount < numWindowsExpected) { - seenAll = false; - break; - } else if (windowCount > numWindowsExpected) { - fail("Window count to high: " + windowCount); - } - } - - if (seenAll) { - // exit - throw new SuccessException(); - } - - } - } - - @Override - public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception { - return Collections.singletonList(this.windowCounts); - } - - @Override - public void restoreState(List<HashMap<Long, Integer>> state) throws Exception { - if (state.isEmpty() || state.size() > 1) { - throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); - } - windowCounts.putAll(state.get(0)); + windowCounts.merge(value.f0, 1, (val, increment) -> val + increment); } } - // Sink for validating the stateful window counts - private static class CountValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>> - implements ListCheckpointed<HashMap<Long, Integer>> { - - private final HashMap<Long, Integer> windowCounts = new HashMap<>(); + static class SinkValidatorCheckFun implements ValidatingSink.ResultChecker { private final int numKeys; private final int numWindowsExpected; - private CountValidatingSink(int numKeys, int numWindowsExpected) { + SinkValidatorCheckFun(int numKeys, int elementsPerKey, int elementsPerWindow) { this.numKeys = numKeys; - this.numWindowsExpected = numWindowsExpected; + this.numWindowsExpected = elementsPerKey / elementsPerWindow; } @Override - public void open(Configuration parameters) throws Exception { - // this sink can only work with DOP 1 - assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); - } - - @Override - public void close() throws Exception { - boolean seenAll = true; + public boolean checkResult(Map<Long, Integer> windowCounts) { if (windowCounts.size() == numKeys) { - for (Integer windowCount: windowCounts.values()) { + for (Integer windowCount : windowCounts.values()) { if (windowCount < numWindowsExpected) { - seenAll = false; - break; + return false; } } + return true; } - assertTrue("The source must see all expected windows.", seenAll); + return false; } + } - @Override - public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception { - - Integer curr = windowCounts.get(value.f0); - if (curr != null) { - windowCounts.put(value.f0, curr + 1); - } - else { - windowCounts.put(value.f0, 1); - } - - // verify the contents of that window, the contents should be: - // (key + num windows so far) - - assertEquals("Window counts don't match for key " + value.f0 + ".", value.f0.intValue() + windowCounts.get(value.f0), value.f3.value); - - boolean seenAll = true; - if (windowCounts.size() == numKeys) { - for (Integer windowCount: windowCounts.values()) { - if (windowCount < numWindowsExpected) { - seenAll = false; - break; - } else if (windowCount > numWindowsExpected) { - fail("Window count to high: " + windowCount); - } - } + static class KeyedEventTimeGenerator implements FailingSource.EventEmittingGenerator { - if (seenAll) { - // exit - throw new SuccessException(); - } + private final int keyUniverseSize; + private final int watermarkTrailing; - } - } - - @Override - public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception { - return Collections.singletonList(this.windowCounts); + public KeyedEventTimeGenerator(int keyUniverseSize, int numElementsPerWindow) { + this.keyUniverseSize = keyUniverseSize; + // we let the watermark a bit behind, so that there can be in-flight timers that required checkpointing + // to include correct timer snapshots in our testing. + this.watermarkTrailing = 4 * numElementsPerWindow / 3; } @Override - public void restoreState(List<HashMap<Long, Integer>> state) throws Exception { - if (state.isEmpty() || state.size() > 1) { - throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + public void emitEvent(SourceFunction.SourceContext<Tuple2<Long, IntType>> ctx, int eventSequenceNo) { + final IntType intTypeNext = new IntType(eventSequenceNo); + for (long i = 0; i < keyUniverseSize; i++) { + final Tuple2<Long, IntType> generatedEvent = new Tuple2<>(i, intTypeNext); + ctx.collectWithTimestamp(generatedEvent, eventSequenceNo); } - this.windowCounts.putAll(state.get(0)); - } - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - private static class IntType { - - public int value; - - public IntType() {} - public IntType(int value) { - this.value = value; + ctx.emitWatermark(new Watermark(eventSequenceNo - watermarkTrailing)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f19337a7/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java index b6163e8..b0e2967 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java @@ -25,18 +25,17 @@ import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.test.checkpointing.utils.FailingSource; +import org.apache.flink.test.checkpointing.utils.IntType; +import org.apache.flink.test.checkpointing.utils.ValidatingSink; import org.apache.flink.test.util.MiniClusterResource; import org.apache.flink.test.util.MiniClusterResourceConfiguration; -import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -47,9 +46,7 @@ import org.junit.runners.Parameterized; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; +import java.util.Map; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.flink.test.util.TestUtils.tryExecute; @@ -92,7 +89,6 @@ public class WindowCheckpointingITCase extends TestLogger { @Test public void testTumblingProcessingTimeWindow() { final int numElements = 3000; - FailingSource.reset(); try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -100,11 +96,14 @@ public class WindowCheckpointingITCase extends TestLogger { env.setStreamTimeCharacteristic(timeCharacteristic); env.getConfig().setAutoWatermarkInterval(10); env.enableCheckpointing(100); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); env.getConfig().disableSysoutLogging(); + SinkValidatorUpdaterAndChecker updaterAndChecker = + new SinkValidatorUpdaterAndChecker(numElements, 1); + env - .addSource(new FailingSource(numElements, numElements / 3)) + .addSource(new FailingSource(new Generator(), numElements, timeCharacteristic)) .rebalance() .keyBy(0) .timeWindow(Time.of(100, MILLISECONDS)) @@ -130,11 +129,12 @@ public class WindowCheckpointingITCase extends TestLogger { for (Tuple2<Long, IntType> value : values) { assertEquals(value.f0.intValue(), value.f1.value); - out.collect(new Tuple2<Long, IntType>(value.f0, new IntType(1))); + out.collect(new Tuple2<>(value.f0, new IntType(1))); } } }) - .addSink(new ValidatingSink(numElements, 1)).setParallelism(1); + .addSink(new ValidatingSink<>(updaterAndChecker, updaterAndChecker, timeCharacteristic)) + .setParallelism(1); tryExecute(env, "Tumbling Window Test"); } @@ -147,7 +147,6 @@ public class WindowCheckpointingITCase extends TestLogger { @Test public void testSlidingProcessingTimeWindow() { final int numElements = 3000; - FailingSource.reset(); try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -155,11 +154,12 @@ public class WindowCheckpointingITCase extends TestLogger { env.setStreamTimeCharacteristic(timeCharacteristic); env.getConfig().setAutoWatermarkInterval(10); env.enableCheckpointing(100); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); env.getConfig().disableSysoutLogging(); - + SinkValidatorUpdaterAndChecker updaterAndChecker = + new SinkValidatorUpdaterAndChecker(numElements, 3); env - .addSource(new FailingSource(numElements, numElements / 3)) + .addSource(new FailingSource(new Generator(), numElements, timeCharacteristic)) .rebalance() .keyBy(0) .timeWindow(Time.of(150, MILLISECONDS), Time.of(50, MILLISECONDS)) @@ -185,13 +185,14 @@ public class WindowCheckpointingITCase extends TestLogger { for (Tuple2<Long, IntType> value : values) { assertEquals(value.f0.intValue(), value.f1.value); - out.collect(new Tuple2<Long, IntType>(value.f0, new IntType(1))); + out.collect(new Tuple2<>(value.f0, new IntType(1))); } } }) - .addSink(new ValidatingSink(numElements, 3)).setParallelism(1); + .addSink(new ValidatingSink<>(updaterAndChecker, updaterAndChecker, timeCharacteristic)) + .setParallelism(1); - tryExecute(env, "Tumbling Window Test"); + tryExecute(env, "Sliding Window Test"); } catch (Exception e) { e.printStackTrace(); @@ -202,7 +203,6 @@ public class WindowCheckpointingITCase extends TestLogger { @Test public void testAggregatingTumblingProcessingTimeWindow() { final int numElements = 3000; - FailingSource.reset(); try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -210,11 +210,12 @@ public class WindowCheckpointingITCase extends TestLogger { env.setStreamTimeCharacteristic(timeCharacteristic); env.getConfig().setAutoWatermarkInterval(10); env.enableCheckpointing(100); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); env.getConfig().disableSysoutLogging(); - + SinkValidatorUpdaterAndChecker updaterAndChecker = + new SinkValidatorUpdaterAndChecker(numElements, 1); env - .addSource(new FailingSource(numElements, numElements / 3)) + .addSource(new FailingSource(new Generator(), numElements, timeCharacteristic)) .map(new MapFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>>() { @Override public Tuple2<Long, IntType> map(Tuple2<Long, IntType> value) { @@ -234,9 +235,10 @@ public class WindowCheckpointingITCase extends TestLogger { return new Tuple2<>(a.f0, new IntType(1)); } }) - .addSink(new ValidatingSink(numElements, 1)).setParallelism(1); + .addSink(new ValidatingSink<>(updaterAndChecker, updaterAndChecker, timeCharacteristic)) + .setParallelism(1); - tryExecute(env, "Tumbling Window Test"); + tryExecute(env, "Aggregating Tumbling Window Test"); } catch (Exception e) { e.printStackTrace(); @@ -247,7 +249,6 @@ public class WindowCheckpointingITCase extends TestLogger { @Test public void testAggregatingSlidingProcessingTimeWindow() { final int numElements = 3000; - FailingSource.reset(); try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -255,11 +256,12 @@ public class WindowCheckpointingITCase extends TestLogger { env.setStreamTimeCharacteristic(timeCharacteristic); env.getConfig().setAutoWatermarkInterval(10); env.enableCheckpointing(100); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); env.getConfig().disableSysoutLogging(); - + SinkValidatorUpdaterAndChecker updaterAndChecker = + new SinkValidatorUpdaterAndChecker(numElements, 3); env - .addSource(new FailingSource(numElements, numElements / 3)) + .addSource(new FailingSource(new Generator(), numElements, timeCharacteristic)) .map(new MapFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>>() { @Override public Tuple2<Long, IntType> map(Tuple2<Long, IntType> value) { @@ -278,9 +280,10 @@ public class WindowCheckpointingITCase extends TestLogger { return new Tuple2<>(a.f0, new IntType(1)); } }) - .addSink(new ValidatingSink(numElements, 3)).setParallelism(1); + .addSink(new ValidatingSink<>(updaterAndChecker, updaterAndChecker, timeCharacteristic)) + .setParallelism(1); - tryExecute(env, "Tumbling Window Test"); + tryExecute(env, "Aggregating Sliding Window Test"); } catch (Exception e) { e.printStackTrace(); @@ -292,152 +295,50 @@ public class WindowCheckpointingITCase extends TestLogger { // Utilities // ------------------------------------------------------------------------ - private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>> - implements ListCheckpointed<Integer>, CheckpointListener { - private static volatile boolean failedBefore = false; - - private final int numElementsToEmit; - private final int failureAfterNumElements; - - private volatile int numElementsEmitted; - private volatile int numSuccessfulCheckpoints; - private volatile boolean running = true; - - private FailingSource(int numElementsToEmit, int failureAfterNumElements) { - this.numElementsToEmit = numElementsToEmit; - this.failureAfterNumElements = failureAfterNumElements; - } - - @Override - public void open(Configuration parameters) { - // non-parallel source - assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); - } - - @Override - public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception { - // we loop longer than we have elements, to permit delayed checkpoints - // to still cause a failure - while (running) { - if (!failedBefore) { - // delay a bit, if we have not failed before - Thread.sleep(1); - if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) { - // cause a failure if we have not failed before and have reached - // enough completed checkpoints and elements - failedBefore = true; - throw new Exception("Artificial Failure"); - } - } - - if (numElementsEmitted < numElementsToEmit && - (failedBefore || numElementsEmitted <= failureAfterNumElements)) { - // the function failed before, or we are in the elements before the failure - synchronized (ctx.getCheckpointLock()) { - int next = numElementsEmitted++; - ctx.collect(new Tuple2<Long, IntType>((long) next, new IntType(next))); - } - } else { - // if our work is done, delay a bit to prevent busy waiting - Thread.sleep(10); - } - } - } + static class Generator implements FailingSource.EventEmittingGenerator { @Override - public void cancel() { - running = false; - } - - @Override - public void notifyCheckpointComplete(long checkpointId) { - numSuccessfulCheckpoints++; - } - - @Override - public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { - return Collections.singletonList(this.numElementsEmitted); - } - - @Override - public void restoreState(List<Integer> state) throws Exception { - if (state.isEmpty() || state.size() > 1) { - throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); - } - this.numElementsEmitted = state.get(0); - } - - public static void reset() { - failedBefore = false; + public void emitEvent(SourceFunction.SourceContext<Tuple2<Long, IntType>> ctx, int eventSequenceNo) { + ctx.collect(new Tuple2<>((long) eventSequenceNo, new IntType(eventSequenceNo))); } } - private static class ValidatingSink extends RichSinkFunction<Tuple2<Long, IntType>> - implements ListCheckpointed<HashMap<Long, Integer>> { - - private final HashMap<Long, Integer> counts = new HashMap<>(); + static class SinkValidatorUpdaterAndChecker + implements ValidatingSink.CountUpdater<Tuple2<Long, IntType>>, ValidatingSink.ResultChecker { private final int elementCountExpected; private final int countPerElementExpected; - private int aggCount; - - private ValidatingSink(int elementCountExpected, int countPerElementExpected) { + SinkValidatorUpdaterAndChecker(int elementCountExpected, int countPerElementExpected) { this.elementCountExpected = elementCountExpected; this.countPerElementExpected = countPerElementExpected; } @Override - public void open(Configuration parameters) throws Exception { - // this sink can only work with DOP 1 - assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); - checkSuccess(); + public void updateCount(Tuple2<Long, IntType> value, Map<Long, Integer> windowCounts) { + windowCounts.merge(value.f0, value.f1.value, (a, b) -> a + b); } @Override - public void invoke(Tuple2<Long, IntType> value) throws Exception { - Integer curr = counts.get(value.f0); - if (curr != null) { - counts.put(value.f0, curr + value.f1.value); - } - else { - counts.put(value.f0, value.f1.value); - } + public boolean checkResult(Map<Long, Integer> windowCounts) { + int aggCount = 0; - // check if we have seen all we expect - aggCount += value.f1.value; - checkSuccess(); - } - - private void checkSuccess() throws SuccessException { - if (aggCount >= elementCountExpected * countPerElementExpected) { - // we are done. validate - assertEquals(elementCountExpected, counts.size()); - - for (Integer i : counts.values()) { - assertEquals(countPerElementExpected, i.intValue()); - } - - // exit - throw new SuccessException(); + for (Integer i : windowCounts.values()) { + aggCount += i; } - } - @Override - public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception { - return Collections.singletonList(this.counts); - } - - @Override - public void restoreState(List<HashMap<Long, Integer>> state) throws Exception { - if (state.isEmpty() || state.size() > 1) { - throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + if (aggCount < elementCountExpected * countPerElementExpected + || elementCountExpected != windowCounts.size()) { + return false; } - this.counts.putAll(state.get(0)); - for (Integer i : state.get(0).values()) { - this.aggCount += i; + for (int i : windowCounts.values()) { + if (countPerElementExpected != i) { + return false; + } } + + return true; } } @@ -452,22 +353,4 @@ public class WindowCheckpointingITCase extends TestLogger { new TimeCharacteristic[]{TimeCharacteristic.IngestionTime} ); } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - /** - * POJO with int value. - */ - public static class IntType { - - public int value; - - public IntType() {} - - public IntType(int value) { - this.value = value; - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/f19337a7/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java new file mode 100644 index 0000000..822d73b --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; + +/** + * Source for window checkpointing IT cases that can introduce artificial failures. + */ +public class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>> + implements ListCheckpointed<Integer>, CheckpointListener { + + /** + * Function to generate and emit the test events (and watermarks if required). + */ + @FunctionalInterface + public interface EventEmittingGenerator extends Serializable { + void emitEvent(SourceContext<Tuple2<Long, IntType>> ctx, int eventSequenceNo); + } + + private static final long INITIAL = Long.MIN_VALUE; + private static final long STATEFUL_CHECKPOINT_COMPLETED = Long.MIN_VALUE; + + @Nonnull + private final EventEmittingGenerator eventEmittingGenerator; + private final int expectedEmitCalls; + private final int failureAfterNumElements; + private final boolean usingProcessingTime; + private final AtomicLong checkpointStatus; + + private int emitCallCount; + private volatile boolean running; + + public FailingSource( + @Nonnull EventEmittingGenerator eventEmittingGenerator, + @Nonnegative int numberOfGeneratorInvocations) { + this(eventEmittingGenerator, numberOfGeneratorInvocations, TimeCharacteristic.EventTime); + } + + public FailingSource( + @Nonnull EventEmittingGenerator eventEmittingGenerator, + @Nonnegative int numberOfGeneratorInvocations, + @Nonnull TimeCharacteristic timeCharacteristic) { + this.eventEmittingGenerator = eventEmittingGenerator; + this.running = true; + this.emitCallCount = 0; + this.expectedEmitCalls = numberOfGeneratorInvocations; + this.failureAfterNumElements = numberOfGeneratorInvocations / 2; + this.checkpointStatus = new AtomicLong(INITIAL); + this.usingProcessingTime = timeCharacteristic == TimeCharacteristic.ProcessingTime; + } + + @Override + public void open(Configuration parameters) { + // non-parallel source + assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); + } + + @Override + public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception { + + final RuntimeContext runtimeContext = getRuntimeContext(); + // detect if this task is "the chosen one" and should fail (via subtaskidx), if it did not fail before (via attempt) + final boolean failThisTask = + runtimeContext.getAttemptNumber() == 0 && runtimeContext.getIndexOfThisSubtask() == 0; + + // we loop longer than we have elements, to permit delayed checkpoints + // to still cause a failure + while (running) { + + // the function failed before, or we are in the elements before the failure + synchronized (ctx.getCheckpointLock()) { + eventEmittingGenerator.emitEvent(ctx, emitCallCount++); + running &= (emitCallCount < expectedEmitCalls); + } + + if (emitCallCount < failureAfterNumElements) { + Thread.sleep(1); + } else if (failThisTask && emitCallCount == failureAfterNumElements) { + // wait for a pending checkpoint that fulfills our requirements if needed + while (checkpointStatus.get() != STATEFUL_CHECKPOINT_COMPLETED) { + Thread.sleep(1); + } + throw new Exception("Artificial Failure"); + } + } + + if (usingProcessingTime) { + while (running) { + Thread.sleep(10); + } + } + } + + @Override + public void cancel() { + running = false; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + // This will unblock the task for failing, if this is the checkpoint we are waiting for + checkpointStatus.compareAndSet(checkpointId, STATEFUL_CHECKPOINT_COMPLETED); + } + + @Override + public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { + // We accept a checkpoint as basis if it should have a "decent amount" of state + if (emitCallCount > failureAfterNumElements / 2) { + // This means we are waiting for notification of this checkpoint to completed now. + checkpointStatus.compareAndSet(INITIAL, checkpointId); + } + return Collections.singletonList(this.emitCallCount); + } + + @Override + public void restoreState(List<Integer> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + this.emitCallCount = state.get(0); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f19337a7/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/IntType.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/IntType.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/IntType.java new file mode 100644 index 0000000..3bc4ea0 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/IntType.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +/** + * Test type that wraps an int. + */ +public class IntType { + + public int value; + + public IntType(int value) { + this.value = value; + } + + @Override + public String toString() { + return "IntType{" + + "value=" + value + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f19337a7/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/ValidatingSink.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/ValidatingSink.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/ValidatingSink.java new file mode 100644 index 0000000..b352738 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/ValidatingSink.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.test.util.SuccessException; + +import javax.annotation.Nonnull; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +/** + * Generalized sink for validation of window checkpointing IT cases. + */ +public class ValidatingSink<T> extends RichSinkFunction<T> + implements ListCheckpointed<HashMap<Long, Integer>> { + + /** + * Function to check if the window counts are as expected. + */ + @FunctionalInterface + public interface ResultChecker extends Serializable { + boolean checkResult(Map<Long, Integer> windowCounts); + } + + /** + * Function that updates the window counts from an update event. + * + * @param <T> type of the update event. + */ + public interface CountUpdater<T> extends Serializable { + void updateCount(T update, Map<Long, Integer> windowCounts); + } + + @Nonnull + private final ResultChecker resultChecker; + + @Nonnull + private final CountUpdater<T> countUpdater; + + @Nonnull + private final HashMap<Long, Integer> windowCounts; + + private final boolean usingProcessingTime; + + public ValidatingSink( + @Nonnull CountUpdater<T> countUpdater, + @Nonnull ResultChecker resultChecker) { + this(countUpdater, resultChecker, TimeCharacteristic.EventTime); + } + + public ValidatingSink( + @Nonnull CountUpdater<T> countUpdater, + @Nonnull ResultChecker resultChecker, + @Nonnull TimeCharacteristic timeCharacteristic) { + + this.resultChecker = resultChecker; + this.countUpdater = countUpdater; + this.usingProcessingTime = TimeCharacteristic.ProcessingTime == timeCharacteristic; + this.windowCounts = new HashMap<>(); + } + + @Override + public void open(Configuration parameters) throws Exception { + // this sink can only work with DOP 1 + assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); + if (usingProcessingTime && resultChecker.checkResult(windowCounts)) { + throw new SuccessException(); + } + } + + @Override + public void close() { + if (resultChecker.checkResult(windowCounts)) { + if (usingProcessingTime) { + throw new SuccessException(); + } + } else { + throw new AssertionError("Test failed check."); + } + } + + @Override + public void invoke(T value, Context context) throws Exception { + countUpdater.updateCount(value, windowCounts); + if (usingProcessingTime && resultChecker.checkResult(windowCounts)) { + throw new SuccessException(); + } + } + + @Override + public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception { + return Collections.singletonList(this.windowCounts); + } + + @Override + public void restoreState(List<HashMap<Long, Integer>> state) throws Exception { + if (state.isEmpty() || state.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + } + windowCounts.putAll(state.get(0)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f19337a7/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java b/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java index 22ac02b..d8e2a8f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java +++ b/flink-tests/src/test/java/org/apache/flink/test/util/SuccessException.java @@ -21,6 +21,6 @@ package org.apache.flink.test.util; /** * Exception that is thrown to terminate a program and indicate success. */ -public class SuccessException extends Exception { +public class SuccessException extends RuntimeException { private static final long serialVersionUID = -7011865671593955887L; }
