Repository: flink Updated Branches: refs/heads/master 7fbd757dd -> e288617f9
[FLINK-5016] [checkpointing] Split EventTimeWindowCheckpointingITCase Split this EventTimeWindowCheckpointingITCase up into multiple tests in order to not run into the no output to stdout CI limit (currently set to 5 minutes). This closes #2933. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e288617f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e288617f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e288617f Branch: refs/heads/master Commit: e288617f9eca260f2fac53df48862335247199ed Parents: 7fbd757 Author: Ufuk Celebi <[email protected]> Authored: Sun Dec 4 16:06:16 2016 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Sun Dec 4 18:48:53 2016 +0100 ---------------------------------------------------------------------- ...tractEventTimeWindowCheckpointingITCase.java | 782 ++++++++++++++++++ .../EventTimeAllWindowCheckpointingITCase.java | 2 +- .../EventTimeWindowCheckpointingITCase.java | 800 ------------------- ...ckendEventTimeWindowCheckpointingITCase.java | 26 + ...ckendEventTimeWindowCheckpointingITCase.java | 26 + ...ckendEventTimeWindowCheckpointingITCase.java | 26 + 6 files changed, 861 insertions(+), 801 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e288617f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java new file mode 100644 index 0000000..583e42f --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java @@ -0,0 +1,782 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.test.util.SuccessException; +import org.apache.flink.util.Collector; +import org.apache.flink.util.TestLogger; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.util.HashMap; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.flink.test.util.TestUtils.tryExecute; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * This verifies that checkpointing works correctly with event time windows. This is more + * strict than {@link WindowCheckpointingITCase} because for event-time the contents + * of the emitted windows are deterministic. + * + * <p>Split into multiple test classes in order to decrease the runtime per backend + * and not run into CI infrastructure limits like no std output being emitted for + * I/O heavy variants. + */ +@SuppressWarnings("serial") +public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLogger { + + private static final int MAX_MEM_STATE_SIZE = 10 * 1024 * 1024; + private static final int PARALLELISM = 4; + + private static LocalFlinkMiniCluster cluster; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + private StateBackendEnum stateBackendEnum; + private AbstractStateBackend stateBackend; + + AbstractEventTimeWindowCheckpointingITCase(StateBackendEnum stateBackendEnum) { + this.stateBackendEnum = stateBackendEnum; + } + + enum StateBackendEnum { + MEM, FILE, ROCKSDB_FULLY_ASYNC + } + + @BeforeClass + public static void startTestCluster() { + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2); + config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48); + + cluster = new LocalFlinkMiniCluster(config, false); + cluster.start(); + } + + @AfterClass + public static void stopTestCluster() { + if (cluster != null) { + cluster.stop(); + } + } + + @Before + public void initStateBackend() throws IOException { + switch (stateBackendEnum) { + case MEM: + this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE); + break; + case FILE: { + String backups = tempFolder.newFolder().getAbsolutePath(); + this.stateBackend = new FsStateBackend("file://" + backups); + break; + } + case ROCKSDB_FULLY_ASYNC: { + String rocksDb = tempFolder.newFolder().getAbsolutePath(); + RocksDBStateBackend rdb = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE)); + rdb.setDbStoragePath(rocksDb); + this.stateBackend = rdb; + break; + } + + } + } + + // ------------------------------------------------------------------------ + + @Test + public void testTumblingTimeWindow() { + final int NUM_ELEMENTS_PER_KEY = 3000; + final int WINDOW_SIZE = 100; + final int NUM_KEYS = 100; + FailingSource.reset(); + + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getLeaderRPCPort()); + + env.setParallelism(PARALLELISM); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.enableCheckpointing(100); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.getConfig().disableSysoutLogging(); + env.setStateBackend(this.stateBackend); + + env + .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3)) + .rebalance() + .keyBy(0) + .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS)) + .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { + + private boolean open = false; + + @Override + public void open(Configuration parameters) { + assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks()); + open = true; + } + + @Override + public void apply( + Tuple tuple, + TimeWindow window, + Iterable<Tuple2<Long, IntType>> values, + Collector<Tuple4<Long, Long, Long, IntType>> out) { + + // validate that the function has been opened properly + assertTrue(open); + + int sum = 0; + long key = -1; + + for (Tuple2<Long, IntType> value : values) { + sum += value.f1.value; + key = value.f0; + } + out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum))); + } + }) + .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1); + + + tryExecute(env, "Tumbling Window Test"); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testTumblingTimeWindowWithKVStateMinMaxParallelism() { + doTestTumblingTimeWindowWithKVState(PARALLELISM); + } + + @Test + public void testTumblingTimeWindowWithKVStateMaxMaxParallelism() { + doTestTumblingTimeWindowWithKVState(1 << 15); + } + + public void doTestTumblingTimeWindowWithKVState(int maxParallelism) { + final int NUM_ELEMENTS_PER_KEY = 3000; + final int WINDOW_SIZE = 100; + final int NUM_KEYS = 100; + FailingSource.reset(); + + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getLeaderRPCPort()); + + env.setParallelism(PARALLELISM); + env.setMaxParallelism(maxParallelism); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.enableCheckpointing(100); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.getConfig().disableSysoutLogging(); + env.setStateBackend(this.stateBackend); + + env + .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3)) + .rebalance() + .keyBy(0) + .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS)) + .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { + + private boolean open = false; + + private ValueState<Integer> count; + + @Override + public void open(Configuration parameters) { + assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks()); + open = true; + count = getRuntimeContext().getState( + new ValueStateDescriptor<>("count", Integer.class, 0)); + } + + @Override + public void apply( + Tuple tuple, + TimeWindow window, + Iterable<Tuple2<Long, IntType>> values, + Collector<Tuple4<Long, Long, Long, IntType>> out) throws Exception { + + // the window count state starts with the key, so that we get + // different count results for each key + if (count.value() == 0) { + count.update(tuple.<Long>getField(0).intValue()); + } + + // validate that the function has been opened properly + assertTrue(open); + + count.update(count.value() + 1); + out.collect(new Tuple4<>(tuple.<Long>getField(0), window.getStart(), window.getEnd(), new IntType(count.value()))); + } + }) + .addSink(new CountValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1); + + + tryExecute(env, "Tumbling Window Test"); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSlidingTimeWindow() { + final int NUM_ELEMENTS_PER_KEY = 3000; + final int WINDOW_SIZE = 1000; + final int WINDOW_SLIDE = 100; + final int NUM_KEYS = 100; + FailingSource.reset(); + + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getLeaderRPCPort()); + + env.setMaxParallelism(2 * PARALLELISM); + env.setParallelism(PARALLELISM); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.enableCheckpointing(100); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.getConfig().disableSysoutLogging(); + env.setStateBackend(this.stateBackend); + + env + .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3)) + .rebalance() + .keyBy(0) + .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS)) + .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { + + private boolean open = false; + + @Override + public void open(Configuration parameters) { + assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks()); + open = true; + } + + @Override + public void apply( + Tuple tuple, + TimeWindow window, + Iterable<Tuple2<Long, IntType>> values, + Collector<Tuple4<Long, Long, Long, IntType>> out) { + + // validate that the function has been opened properly + assertTrue(open); + + int sum = 0; + long key = -1; + + for (Tuple2<Long, IntType> value : values) { + sum += value.f1.value; + key = value.f0; + } + out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum))); + } + }) + .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1); + + + tryExecute(env, "Tumbling Window Test"); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPreAggregatedTumblingTimeWindow() { + final int NUM_ELEMENTS_PER_KEY = 3000; + final int WINDOW_SIZE = 100; + final int NUM_KEYS = 100; + FailingSource.reset(); + + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getLeaderRPCPort()); + + env.setParallelism(PARALLELISM); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.enableCheckpointing(100); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.getConfig().disableSysoutLogging(); + env.setStateBackend(this.stateBackend); + + env + .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3)) + .rebalance() + .keyBy(0) + .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS)) + .reduce( + new ReduceFunction<Tuple2<Long, IntType>>() { + + @Override + public Tuple2<Long, IntType> reduce( + Tuple2<Long, IntType> a, + Tuple2<Long, IntType> b) { + return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value)); + } + }, + new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { + + private boolean open = false; + + @Override + public void open(Configuration parameters) { + assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks()); + open = true; + } + + @Override + public void apply( + Tuple tuple, + TimeWindow window, + Iterable<Tuple2<Long, IntType>> input, + Collector<Tuple4<Long, Long, Long, IntType>> out) { + + // validate that the function has been opened properly + assertTrue(open); + + for (Tuple2<Long, IntType> in: input) { + out.collect(new Tuple4<>(in.f0, + window.getStart(), + window.getEnd(), + in.f1)); + } + } + }) + .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1); + + + tryExecute(env, "Tumbling Window Test"); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPreAggregatedSlidingTimeWindow() { + final int NUM_ELEMENTS_PER_KEY = 3000; + final int WINDOW_SIZE = 1000; + final int WINDOW_SLIDE = 100; + final int NUM_KEYS = 100; + FailingSource.reset(); + + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getLeaderRPCPort()); + + env.setParallelism(PARALLELISM); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.enableCheckpointing(100); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.getConfig().disableSysoutLogging(); + env.setStateBackend(this.stateBackend); + + env + .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3)) + .rebalance() + .keyBy(0) + .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS)) + .reduce( + new ReduceFunction<Tuple2<Long, IntType>>() { + + @Override + public Tuple2<Long, IntType> reduce( + Tuple2<Long, IntType> a, + Tuple2<Long, IntType> b) { + + // validate that the function has been opened properly + return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value)); + } + }, + new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { + + private boolean open = false; + + @Override + public void open(Configuration parameters) { + assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks()); + open = true; + } + + @Override + public void apply( + Tuple tuple, + TimeWindow window, + Iterable<Tuple2<Long, IntType>> input, + Collector<Tuple4<Long, Long, Long, IntType>> out) { + + // validate that the function has been opened properly + assertTrue(open); + + for (Tuple2<Long, IntType> in: input) { + out.collect(new Tuple4<>(in.f0, + window.getStart(), + window.getEnd(), + in.f1)); + } + } + }) + .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1); + + + tryExecute(env, "Tumbling Window Test"); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>> + implements Checkpointed<Integer>, CheckpointListener + { + private static volatile boolean failedBefore = false; + + private final int numKeys; + private final int numElementsToEmit; + private final int failureAfterNumElements; + + private volatile int numElementsEmitted; + private volatile int numSuccessfulCheckpoints; + private volatile boolean running = true; + + private FailingSource(int numKeys, int numElementsToEmitPerKey, int failureAfterNumElements) { + this.numKeys = numKeys; + this.numElementsToEmit = numElementsToEmitPerKey; + this.failureAfterNumElements = failureAfterNumElements; + } + + @Override + public void open(Configuration parameters) { + // non-parallel source + assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); + } + + @Override + public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception { + // we loop longer than we have elements, to permit delayed checkpoints + // to still cause a failure + while (running) { + + if (!failedBefore) { + // delay a bit, if we have not failed before + Thread.sleep(1); + if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) { + // cause a failure if we have not failed before and have reached + // enough completed checkpoints and elements + failedBefore = true; + throw new Exception("Artificial Failure"); + } + } + + if (numElementsEmitted < numElementsToEmit && + (failedBefore || numElementsEmitted <= failureAfterNumElements)) + { + // the function failed before, or we are in the elements before the failure + synchronized (ctx.getCheckpointLock()) { + int next = numElementsEmitted++; + for (long i = 0; i < numKeys; i++) { + ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next); + } + ctx.emitWatermark(new Watermark(next)); + } + } + else { + + // if our work is done, delay a bit to prevent busy waiting + Thread.sleep(1); + } + } + } + + @Override + public void cancel() { + running = false; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + numSuccessfulCheckpoints++; + } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) { + return numElementsEmitted; + } + + @Override + public void restoreState(Integer state) { + numElementsEmitted = state; + } + + public static void reset() { + failedBefore = false; + } + } + + private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>> + implements Checkpointed<HashMap<Long, Integer>> { + + private final HashMap<Long, Integer> windowCounts = new HashMap<>(); + + private final int numKeys; + private final int numWindowsExpected; + + private ValidatingSink(int numKeys, int numWindowsExpected) { + this.numKeys = numKeys; + this.numWindowsExpected = numWindowsExpected; + } + + @Override + public void open(Configuration parameters) throws Exception { + // this sink can only work with DOP 1 + assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); + + // it can happen that a checkpoint happens when the complete success state is + // already set. In that case we restart with the final state and would never + // finish because no more elements arrive. + if (windowCounts.size() == numKeys) { + boolean seenAll = true; + for (Integer windowCount: windowCounts.values()) { + if (windowCount != numWindowsExpected) { + seenAll = false; + break; + } + } + if (seenAll) { + throw new SuccessException(); + } + } + } + + @Override + public void close() throws Exception { + boolean seenAll = true; + if (windowCounts.size() == numKeys) { + for (Integer windowCount: windowCounts.values()) { + if (windowCount < numWindowsExpected) { + seenAll = false; + break; + } + } + } + assertTrue("The sink must see all expected windows.", seenAll); + } + + @Override + public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception { + + // verify the contents of that window, Tuple4.f1 and .f2 are the window start/end + // the sum should be "sum (start .. end-1)" + + int expectedSum = 0; + for (long i = value.f1; i < value.f2; i++) { + // only sum up positive vals, to filter out the negative start of the + // first sliding windows + if (i > 0) { + expectedSum += i; + } + } + + assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value); + + + Integer curr = windowCounts.get(value.f0); + if (curr != null) { + windowCounts.put(value.f0, curr + 1); + } + else { + windowCounts.put(value.f0, 1); + } + + if (windowCounts.size() == numKeys) { + boolean seenAll = true; + for (Integer windowCount: windowCounts.values()) { + if (windowCount < numWindowsExpected) { + seenAll = false; + break; + } else if (windowCount > numWindowsExpected) { + fail("Window count to high: " + windowCount); + } + } + + if (seenAll) { + // exit + throw new SuccessException(); + } + + } + } + + @Override + public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) { + return this.windowCounts; + } + + @Override + public void restoreState(HashMap<Long, Integer> state) { + this.windowCounts.putAll(state); + } + } + + // Sink for validating the stateful window counts + private static class CountValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>> + implements Checkpointed<HashMap<Long, Integer>> { + + private final HashMap<Long, Integer> windowCounts = new HashMap<>(); + + private final int numKeys; + private final int numWindowsExpected; + + private CountValidatingSink(int numKeys, int numWindowsExpected) { + this.numKeys = numKeys; + this.numWindowsExpected = numWindowsExpected; + } + + @Override + public void open(Configuration parameters) throws Exception { + // this sink can only work with DOP 1 + assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); + } + + @Override + public void close() throws Exception { + boolean seenAll = true; + if (windowCounts.size() == numKeys) { + for (Integer windowCount: windowCounts.values()) { + if (windowCount < numWindowsExpected) { + seenAll = false; + break; + } + } + } + assertTrue("The source must see all expected windows.", seenAll); + } + + @Override + public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception { + + Integer curr = windowCounts.get(value.f0); + if (curr != null) { + windowCounts.put(value.f0, curr + 1); + } + else { + windowCounts.put(value.f0, 1); + } + + + // verify the contents of that window, the contents should be: + // (key + num windows so far) + + assertEquals("Window counts don't match for key " + value.f0 + ".", value.f0.intValue() + windowCounts.get(value.f0), value.f3.value); + + boolean seenAll = true; + if (windowCounts.size() == numKeys) { + for (Integer windowCount: windowCounts.values()) { + if (windowCount < numWindowsExpected) { + seenAll = false; + break; + } else if (windowCount > numWindowsExpected) { + fail("Window count to high: " + windowCount); + } + } + + if (seenAll) { + // exit + throw new SuccessException(); + } + + } + } + + @Override + public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) { + return this.windowCounts; + } + + @Override + public void restoreState(HashMap<Long, Integer> state) { + this.windowCounts.putAll(state); + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + public static class IntType { + + public int value; + + public IntType() {} + + public IntType(int value) { this.value = value; } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e288617f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java index 5d17608..b493e42 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java @@ -53,7 +53,7 @@ import static org.junit.Assert.*; * This verfies that checkpointing works correctly with event time windows. * * <p> - * This is a version of {@link EventTimeWindowCheckpointingITCase} for All-Windows. + * This is a version of {@link AbstractEventTimeWindowCheckpointingITCase} for All-Windows. */ @SuppressWarnings("serial") public class EventTimeAllWindowCheckpointingITCase extends TestLogger { http://git-wip-us.apache.org/repos/asf/flink/blob/e288617f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java deleted file mode 100644 index 50079d1..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ /dev/null @@ -1,800 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.checkpointing; - -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple4; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; -import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.test.util.SuccessException; -import org.apache.flink.util.Collector; -import org.apache.flink.util.TestLogger; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.apache.flink.test.util.TestUtils.tryExecute; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * This verifies that checkpointing works correctly with event time windows. This is more - * strict than {@link WindowCheckpointingITCase} because for event-time the contents - * of the emitted windows are deterministic. - */ -@SuppressWarnings("serial") -@RunWith(Parameterized.class) -public class EventTimeWindowCheckpointingITCase extends TestLogger { - - private static final int MAX_MEM_STATE_SIZE = 10 * 1024 * 1024; - private static final int PARALLELISM = 4; - - private static LocalFlinkMiniCluster cluster; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - private StateBackendEnum stateBackendEnum; - private AbstractStateBackend stateBackend; - - public EventTimeWindowCheckpointingITCase(StateBackendEnum stateBackendEnum) { - this.stateBackendEnum = stateBackendEnum; - } - - @BeforeClass - public static void startTestCluster() { - Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2); - config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48); - - cluster = new LocalFlinkMiniCluster(config, false); - cluster.start(); - } - - @AfterClass - public static void stopTestCluster() { - if (cluster != null) { - cluster.stop(); - } - } - - @Before - public void initStateBackend() throws IOException { - switch (stateBackendEnum) { - case MEM: - this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE); - break; - case FILE: { - String backups = tempFolder.newFolder().getAbsolutePath(); - this.stateBackend = new FsStateBackend("file://" + backups); - break; - } - case ROCKSDB_FULLY_ASYNC: { - String rocksDb = tempFolder.newFolder().getAbsolutePath(); - RocksDBStateBackend rdb = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE)); - rdb.setDbStoragePath(rocksDb); - this.stateBackend = rdb; - break; - } - - } - } - - // ------------------------------------------------------------------------ - - @Test - public void testTumblingTimeWindow() { - final int NUM_ELEMENTS_PER_KEY = 3000; - final int WINDOW_SIZE = 100; - final int NUM_KEYS = 100; - FailingSource.reset(); - - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( - "localhost", cluster.getLeaderRPCPort()); - - env.setParallelism(PARALLELISM); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - env.enableCheckpointing(100); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); - env.getConfig().disableSysoutLogging(); - env.setStateBackend(this.stateBackend); - - env - .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3)) - .rebalance() - .keyBy(0) - .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS)) - .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { - - private boolean open = false; - - @Override - public void open(Configuration parameters) { - assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks()); - open = true; - } - - @Override - public void apply( - Tuple tuple, - TimeWindow window, - Iterable<Tuple2<Long, IntType>> values, - Collector<Tuple4<Long, Long, Long, IntType>> out) { - - // validate that the function has been opened properly - assertTrue(open); - - int sum = 0; - long key = -1; - - for (Tuple2<Long, IntType> value : values) { - sum += value.f1.value; - key = value.f0; - } - out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum))); - } - }) - .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1); - - - tryExecute(env, "Tumbling Window Test"); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testTumblingTimeWindowWithKVStateMinMaxParallelism() { - doTestTumblingTimeWindowWithKVState(PARALLELISM); - } - - @Test - public void testTumblingTimeWindowWithKVStateMaxMaxParallelism() { - doTestTumblingTimeWindowWithKVState(1 << 15); - } - - public void doTestTumblingTimeWindowWithKVState(int maxParallelism) { - final int NUM_ELEMENTS_PER_KEY = 3000; - final int WINDOW_SIZE = 100; - final int NUM_KEYS = 100; - FailingSource.reset(); - - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( - "localhost", cluster.getLeaderRPCPort()); - - env.setParallelism(PARALLELISM); - env.setMaxParallelism(maxParallelism); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - env.enableCheckpointing(100); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); - env.getConfig().disableSysoutLogging(); - env.setStateBackend(this.stateBackend); - - env - .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3)) - .rebalance() - .keyBy(0) - .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS)) - .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { - - private boolean open = false; - - private ValueState<Integer> count; - - @Override - public void open(Configuration parameters) { - assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks()); - open = true; - count = getRuntimeContext().getState( - new ValueStateDescriptor<>("count", Integer.class, 0)); - } - - @Override - public void apply( - Tuple tuple, - TimeWindow window, - Iterable<Tuple2<Long, IntType>> values, - Collector<Tuple4<Long, Long, Long, IntType>> out) throws Exception { - - // the window count state starts with the key, so that we get - // different count results for each key - if (count.value() == 0) { - count.update(tuple.<Long>getField(0).intValue()); - } - - // validate that the function has been opened properly - assertTrue(open); - - count.update(count.value() + 1); - out.collect(new Tuple4<>(tuple.<Long>getField(0), window.getStart(), window.getEnd(), new IntType(count.value()))); - } - }) - .addSink(new CountValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1); - - - tryExecute(env, "Tumbling Window Test"); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testSlidingTimeWindow() { - final int NUM_ELEMENTS_PER_KEY = 3000; - final int WINDOW_SIZE = 1000; - final int WINDOW_SLIDE = 100; - final int NUM_KEYS = 100; - FailingSource.reset(); - - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( - "localhost", cluster.getLeaderRPCPort()); - - env.setMaxParallelism(2 * PARALLELISM); - env.setParallelism(PARALLELISM); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - env.enableCheckpointing(100); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); - env.getConfig().disableSysoutLogging(); - env.setStateBackend(this.stateBackend); - - env - .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3)) - .rebalance() - .keyBy(0) - .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS)) - .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { - - private boolean open = false; - - @Override - public void open(Configuration parameters) { - assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks()); - open = true; - } - - @Override - public void apply( - Tuple tuple, - TimeWindow window, - Iterable<Tuple2<Long, IntType>> values, - Collector<Tuple4<Long, Long, Long, IntType>> out) { - - // validate that the function has been opened properly - assertTrue(open); - - int sum = 0; - long key = -1; - - for (Tuple2<Long, IntType> value : values) { - sum += value.f1.value; - key = value.f0; - } - out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum))); - } - }) - .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1); - - - tryExecute(env, "Tumbling Window Test"); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testPreAggregatedTumblingTimeWindow() { - final int NUM_ELEMENTS_PER_KEY = 3000; - final int WINDOW_SIZE = 100; - final int NUM_KEYS = 100; - FailingSource.reset(); - - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( - "localhost", cluster.getLeaderRPCPort()); - - env.setParallelism(PARALLELISM); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - env.enableCheckpointing(100); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); - env.getConfig().disableSysoutLogging(); - env.setStateBackend(this.stateBackend); - - env - .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3)) - .rebalance() - .keyBy(0) - .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS)) - .reduce( - new ReduceFunction<Tuple2<Long, IntType>>() { - - @Override - public Tuple2<Long, IntType> reduce( - Tuple2<Long, IntType> a, - Tuple2<Long, IntType> b) { - return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value)); - } - }, - new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { - - private boolean open = false; - - @Override - public void open(Configuration parameters) { - assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks()); - open = true; - } - - @Override - public void apply( - Tuple tuple, - TimeWindow window, - Iterable<Tuple2<Long, IntType>> input, - Collector<Tuple4<Long, Long, Long, IntType>> out) { - - // validate that the function has been opened properly - assertTrue(open); - - for (Tuple2<Long, IntType> in: input) { - out.collect(new Tuple4<>(in.f0, - window.getStart(), - window.getEnd(), - in.f1)); - } - } - }) - .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1); - - - tryExecute(env, "Tumbling Window Test"); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testPreAggregatedSlidingTimeWindow() { - final int NUM_ELEMENTS_PER_KEY = 3000; - final int WINDOW_SIZE = 1000; - final int WINDOW_SLIDE = 100; - final int NUM_KEYS = 100; - FailingSource.reset(); - - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( - "localhost", cluster.getLeaderRPCPort()); - - env.setParallelism(PARALLELISM); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - env.enableCheckpointing(100); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); - env.getConfig().disableSysoutLogging(); - env.setStateBackend(this.stateBackend); - - env - .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3)) - .rebalance() - .keyBy(0) - .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS)) - .reduce( - new ReduceFunction<Tuple2<Long, IntType>>() { - - @Override - public Tuple2<Long, IntType> reduce( - Tuple2<Long, IntType> a, - Tuple2<Long, IntType> b) { - - // validate that the function has been opened properly - return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value)); - } - }, - new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { - - private boolean open = false; - - @Override - public void open(Configuration parameters) { - assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks()); - open = true; - } - - @Override - public void apply( - Tuple tuple, - TimeWindow window, - Iterable<Tuple2<Long, IntType>> input, - Collector<Tuple4<Long, Long, Long, IntType>> out) { - - // validate that the function has been opened properly - assertTrue(open); - - for (Tuple2<Long, IntType> in: input) { - out.collect(new Tuple4<>(in.f0, - window.getStart(), - window.getEnd(), - in.f1)); - } - } - }) - .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1); - - - tryExecute(env, "Tumbling Window Test"); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>> - implements Checkpointed<Integer>, CheckpointListener - { - private static volatile boolean failedBefore = false; - - private final int numKeys; - private final int numElementsToEmit; - private final int failureAfterNumElements; - - private volatile int numElementsEmitted; - private volatile int numSuccessfulCheckpoints; - private volatile boolean running = true; - - private FailingSource(int numKeys, int numElementsToEmitPerKey, int failureAfterNumElements) { - this.numKeys = numKeys; - this.numElementsToEmit = numElementsToEmitPerKey; - this.failureAfterNumElements = failureAfterNumElements; - } - - @Override - public void open(Configuration parameters) { - // non-parallel source - assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); - } - - @Override - public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception { - // we loop longer than we have elements, to permit delayed checkpoints - // to still cause a failure - while (running) { - - if (!failedBefore) { - // delay a bit, if we have not failed before - Thread.sleep(1); - if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) { - // cause a failure if we have not failed before and have reached - // enough completed checkpoints and elements - failedBefore = true; - throw new Exception("Artificial Failure"); - } - } - - if (numElementsEmitted < numElementsToEmit && - (failedBefore || numElementsEmitted <= failureAfterNumElements)) - { - // the function failed before, or we are in the elements before the failure - synchronized (ctx.getCheckpointLock()) { - int next = numElementsEmitted++; - for (long i = 0; i < numKeys; i++) { - ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next); - } - ctx.emitWatermark(new Watermark(next)); - } - } - else { - - // if our work is done, delay a bit to prevent busy waiting - Thread.sleep(1); - } - } - } - - @Override - public void cancel() { - running = false; - } - - @Override - public void notifyCheckpointComplete(long checkpointId) { - numSuccessfulCheckpoints++; - } - - @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return numElementsEmitted; - } - - @Override - public void restoreState(Integer state) { - numElementsEmitted = state; - } - - public static void reset() { - failedBefore = false; - } - } - - private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>> - implements Checkpointed<HashMap<Long, Integer>> { - - private final HashMap<Long, Integer> windowCounts = new HashMap<>(); - - private final int numKeys; - private final int numWindowsExpected; - - private ValidatingSink(int numKeys, int numWindowsExpected) { - this.numKeys = numKeys; - this.numWindowsExpected = numWindowsExpected; - } - - @Override - public void open(Configuration parameters) throws Exception { - // this sink can only work with DOP 1 - assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); - - // it can happen that a checkpoint happens when the complete success state is - // already set. In that case we restart with the final state and would never - // finish because no more elements arrive. - if (windowCounts.size() == numKeys) { - boolean seenAll = true; - for (Integer windowCount: windowCounts.values()) { - if (windowCount != numWindowsExpected) { - seenAll = false; - break; - } - } - if (seenAll) { - throw new SuccessException(); - } - } - } - - @Override - public void close() throws Exception { - boolean seenAll = true; - if (windowCounts.size() == numKeys) { - for (Integer windowCount: windowCounts.values()) { - if (windowCount < numWindowsExpected) { - seenAll = false; - break; - } - } - } - assertTrue("The sink must see all expected windows.", seenAll); - } - - @Override - public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception { - - // verify the contents of that window, Tuple4.f1 and .f2 are the window start/end - // the sum should be "sum (start .. end-1)" - - int expectedSum = 0; - for (long i = value.f1; i < value.f2; i++) { - // only sum up positive vals, to filter out the negative start of the - // first sliding windows - if (i > 0) { - expectedSum += i; - } - } - - assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value); - - - Integer curr = windowCounts.get(value.f0); - if (curr != null) { - windowCounts.put(value.f0, curr + 1); - } - else { - windowCounts.put(value.f0, 1); - } - - if (windowCounts.size() == numKeys) { - boolean seenAll = true; - for (Integer windowCount: windowCounts.values()) { - if (windowCount < numWindowsExpected) { - seenAll = false; - break; - } else if (windowCount > numWindowsExpected) { - fail("Window count to high: " + windowCount); - } - } - - if (seenAll) { - // exit - throw new SuccessException(); - } - - } - } - - @Override - public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) { - return this.windowCounts; - } - - @Override - public void restoreState(HashMap<Long, Integer> state) { - this.windowCounts.putAll(state); - } - } - - // Sink for validating the stateful window counts - private static class CountValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>> - implements Checkpointed<HashMap<Long, Integer>> { - - private final HashMap<Long, Integer> windowCounts = new HashMap<>(); - - private final int numKeys; - private final int numWindowsExpected; - - private CountValidatingSink(int numKeys, int numWindowsExpected) { - this.numKeys = numKeys; - this.numWindowsExpected = numWindowsExpected; - } - - @Override - public void open(Configuration parameters) throws Exception { - // this sink can only work with DOP 1 - assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); - } - - @Override - public void close() throws Exception { - boolean seenAll = true; - if (windowCounts.size() == numKeys) { - for (Integer windowCount: windowCounts.values()) { - if (windowCount < numWindowsExpected) { - seenAll = false; - break; - } - } - } - assertTrue("The source must see all expected windows.", seenAll); - } - - @Override - public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception { - - Integer curr = windowCounts.get(value.f0); - if (curr != null) { - windowCounts.put(value.f0, curr + 1); - } - else { - windowCounts.put(value.f0, 1); - } - - - // verify the contents of that window, the contents should be: - // (key + num windows so far) - - assertEquals("Window counts don't match for key " + value.f0 + ".", value.f0.intValue() + windowCounts.get(value.f0), value.f3.value); - - boolean seenAll = true; - if (windowCounts.size() == numKeys) { - for (Integer windowCount: windowCounts.values()) { - if (windowCount < numWindowsExpected) { - seenAll = false; - break; - } else if (windowCount > numWindowsExpected) { - fail("Window count to high: " + windowCount); - } - } - - if (seenAll) { - // exit - throw new SuccessException(); - } - - } - } - - @Override - public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) { - return this.windowCounts; - } - - @Override - public void restoreState(HashMap<Long, Integer> state) { - this.windowCounts.putAll(state); - } - } - - // ------------------------------------------------------------------------ - // Parametrization for testing with different state backends - // ------------------------------------------------------------------------ - - - @Parameterized.Parameters(name = "StateBackend = {0}") - @SuppressWarnings("unchecked,rawtypes") - public static Collection<Object[]> parameters(){ - return Arrays.asList(new Object[][] { - {StateBackendEnum.MEM}, - {StateBackendEnum.FILE}, - {StateBackendEnum.ROCKSDB_FULLY_ASYNC} - } - ); - } - - private enum StateBackendEnum { - MEM, FILE, ROCKSDB_FULLY_ASYNC - } - - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - public static class IntType { - - public int value; - - public IntType() {} - - public IntType(int value) { this.value = value; } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e288617f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java new file mode 100644 index 0000000..65fda09 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +public class FileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase { + + public FileBackendEventTimeWindowCheckpointingITCase() { + super(StateBackendEnum.FILE); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e288617f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java new file mode 100644 index 0000000..899b8d6 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +public class MemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase { + + public MemBackendEventTimeWindowCheckpointingITCase() { + super(StateBackendEnum.MEM); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e288617f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java new file mode 100644 index 0000000..14feb78 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +public class RocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase { + + public RocksDbBackendEventTimeWindowCheckpointingITCase() { + super(StateBackendEnum.ROCKSDB_FULLY_ASYNC); + } +}
