Repository: flink
Updated Branches:
  refs/heads/master 7fbd757dd -> e288617f9


[FLINK-5016] [checkpointing] Split EventTimeWindowCheckpointingITCase

Split this EventTimeWindowCheckpointingITCase up into multiple tests
in order to not run into the no output to stdout CI limit (currently
set to 5 minutes).

This closes #2933.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e288617f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e288617f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e288617f

Branch: refs/heads/master
Commit: e288617f9eca260f2fac53df48862335247199ed
Parents: 7fbd757
Author: Ufuk Celebi <[email protected]>
Authored: Sun Dec 4 16:06:16 2016 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Sun Dec 4 18:48:53 2016 +0100

----------------------------------------------------------------------
 ...tractEventTimeWindowCheckpointingITCase.java | 782 ++++++++++++++++++
 .../EventTimeAllWindowCheckpointingITCase.java  |   2 +-
 .../EventTimeWindowCheckpointingITCase.java     | 800 -------------------
 ...ckendEventTimeWindowCheckpointingITCase.java |  26 +
 ...ckendEventTimeWindowCheckpointingITCase.java |  26 +
 ...ckendEventTimeWindowCheckpointingITCase.java |  26 +
 6 files changed, 861 insertions(+), 801 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e288617f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..583e42f
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,782 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * This verifies that checkpointing works correctly with event time windows. 
This is more
+ * strict than {@link WindowCheckpointingITCase} because for event-time the 
contents
+ * of the emitted windows are deterministic.
+ *
+ * <p>Split into multiple test classes in order to decrease the runtime per 
backend
+ * and not run into CI infrastructure limits like no std output being emitted 
for
+ * I/O heavy variants.
+ */
+@SuppressWarnings("serial")
+public abstract class AbstractEventTimeWindowCheckpointingITCase extends 
TestLogger {
+
+       private static final int MAX_MEM_STATE_SIZE = 10 * 1024 * 1024;
+       private static final int PARALLELISM = 4;
+
+       private static LocalFlinkMiniCluster cluster;
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       private StateBackendEnum stateBackendEnum;
+       private AbstractStateBackend stateBackend;
+
+       AbstractEventTimeWindowCheckpointingITCase(StateBackendEnum 
stateBackendEnum) {
+               this.stateBackendEnum = stateBackendEnum;
+       }
+
+       enum StateBackendEnum {
+               MEM, FILE, ROCKSDB_FULLY_ASYNC
+       }
+
+       @BeforeClass
+       public static void startTestCluster() {
+               Configuration config = new Configuration();
+               config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
PARALLELISM / 2);
+               config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 
48);
+
+               cluster = new LocalFlinkMiniCluster(config, false);
+               cluster.start();
+       }
+
+       @AfterClass
+       public static void stopTestCluster() {
+               if (cluster != null) {
+                       cluster.stop();
+               }
+       }
+
+       @Before
+       public void initStateBackend() throws IOException {
+               switch (stateBackendEnum) {
+                       case MEM:
+                               this.stateBackend = new 
MemoryStateBackend(MAX_MEM_STATE_SIZE);
+                               break;
+                       case FILE: {
+                               String backups = 
tempFolder.newFolder().getAbsolutePath();
+                               this.stateBackend = new 
FsStateBackend("file://" + backups);
+                               break;
+                       }
+                       case ROCKSDB_FULLY_ASYNC: {
+                               String rocksDb = 
tempFolder.newFolder().getAbsolutePath();
+                               RocksDBStateBackend rdb = new 
RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE));
+                               rdb.setDbStoragePath(rocksDb);
+                               this.stateBackend = rdb;
+                               break;
+                       }
+
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testTumblingTimeWindow() {
+               final int NUM_ELEMENTS_PER_KEY = 3000;
+               final int WINDOW_SIZE = 100;
+               final int NUM_KEYS = 100;
+               FailingSource.reset();
+               
+               try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
+                                       "localhost", 
cluster.getLeaderRPCPort());
+                       
+                       env.setParallelism(PARALLELISM);
+                       
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+                       env.enableCheckpointing(100);
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+                       env.getConfig().disableSysoutLogging();
+                       env.setStateBackend(this.stateBackend);
+
+                       env
+                                       .addSource(new FailingSource(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+                                       .rebalance()
+                                       .keyBy(0)
+                                       .timeWindow(Time.of(WINDOW_SIZE, 
MILLISECONDS))
+                                       .apply(new 
RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, 
Tuple, TimeWindow>() {
+
+                                               private boolean open = false;
+
+                                               @Override
+                                               public void open(Configuration 
parameters) {
+                                                       
assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+                                                       open = true;
+                                               }
+
+                                               @Override
+                                               public void apply(
+                                                               Tuple tuple,
+                                                               TimeWindow 
window,
+                                                               
Iterable<Tuple2<Long, IntType>> values,
+                                                               
Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+                                                       // validate that the 
function has been opened properly
+                                                       assertTrue(open);
+
+                                                       int sum = 0;
+                                                       long key = -1;
+
+                                                       for (Tuple2<Long, 
IntType> value : values) {
+                                                               sum += 
value.f1.value;
+                                                               key = value.f0;
+                                                       }
+                                                       out.collect(new 
Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+                                               }
+                                       })
+                                       .addSink(new ValidatingSink(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
+
+
+                       tryExecute(env, "Tumbling Window Test");
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testTumblingTimeWindowWithKVStateMinMaxParallelism() {
+               doTestTumblingTimeWindowWithKVState(PARALLELISM);
+       }
+
+       @Test
+       public void testTumblingTimeWindowWithKVStateMaxMaxParallelism() {
+               doTestTumblingTimeWindowWithKVState(1 << 15);
+       }
+
+       public void doTestTumblingTimeWindowWithKVState(int maxParallelism) {
+               final int NUM_ELEMENTS_PER_KEY = 3000;
+               final int WINDOW_SIZE = 100;
+               final int NUM_KEYS = 100;
+               FailingSource.reset();
+
+               try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
+                                       "localhost", 
cluster.getLeaderRPCPort());
+
+                       env.setParallelism(PARALLELISM);
+                       env.setMaxParallelism(maxParallelism);
+                       
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+                       env.enableCheckpointing(100);
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+                       env.getConfig().disableSysoutLogging();
+                       env.setStateBackend(this.stateBackend);
+
+                       env
+                                       .addSource(new FailingSource(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+                                       .rebalance()
+                                       .keyBy(0)
+                                       .timeWindow(Time.of(WINDOW_SIZE, 
MILLISECONDS))
+                                       .apply(new 
RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, 
Tuple, TimeWindow>() {
+
+                                               private boolean open = false;
+
+                                               private ValueState<Integer> 
count;
+
+                                               @Override
+                                               public void open(Configuration 
parameters) {
+                                                       
assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+                                                       open = true;
+                                                       count = 
getRuntimeContext().getState(
+                                                                       new 
ValueStateDescriptor<>("count", Integer.class, 0));
+                                               }
+
+                                               @Override
+                                               public void apply(
+                                                               Tuple tuple,
+                                                               TimeWindow 
window,
+                                                               
Iterable<Tuple2<Long, IntType>> values,
+                                                               
Collector<Tuple4<Long, Long, Long, IntType>> out) throws Exception {
+
+                                                       // the window count 
state starts with the key, so that we get
+                                                       // different count 
results for each key
+                                                       if (count.value() == 0) 
{
+                                                               
count.update(tuple.<Long>getField(0).intValue());
+                                                       }
+
+                                                       // validate that the 
function has been opened properly
+                                                       assertTrue(open);
+
+                                                       
count.update(count.value() + 1);
+                                                       out.collect(new 
Tuple4<>(tuple.<Long>getField(0), window.getStart(), window.getEnd(), new 
IntType(count.value())));
+                                               }
+                                       })
+                                       .addSink(new 
CountValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / 
WINDOW_SIZE)).setParallelism(1);
+
+
+                       tryExecute(env, "Tumbling Window Test");
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testSlidingTimeWindow() {
+               final int NUM_ELEMENTS_PER_KEY = 3000;
+               final int WINDOW_SIZE = 1000;
+               final int WINDOW_SLIDE = 100;
+               final int NUM_KEYS = 100;
+               FailingSource.reset();
+
+               try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
+                                       "localhost", 
cluster.getLeaderRPCPort());
+
+                       env.setMaxParallelism(2 * PARALLELISM);
+                       env.setParallelism(PARALLELISM);
+                       
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+                       env.enableCheckpointing(100);
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+                       env.getConfig().disableSysoutLogging();
+                       env.setStateBackend(this.stateBackend);
+
+                       env
+                                       .addSource(new FailingSource(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+                                       .rebalance()
+                                       .keyBy(0)
+                                       .timeWindow(Time.of(WINDOW_SIZE, 
MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
+                                       .apply(new 
RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, 
Tuple, TimeWindow>() {
+
+                                               private boolean open = false;
+
+                                               @Override
+                                               public void open(Configuration 
parameters) {
+                                                       
assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+                                                       open = true;
+                                               }
+
+                                               @Override
+                                               public void apply(
+                                                               Tuple tuple,
+                                                               TimeWindow 
window,
+                                                               
Iterable<Tuple2<Long, IntType>> values,
+                                                               
Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+                                                       // validate that the 
function has been opened properly
+                                                       assertTrue(open);
+
+                                                       int sum = 0;
+                                                       long key = -1;
+
+                                                       for (Tuple2<Long, 
IntType> value : values) {
+                                                               sum += 
value.f1.value;
+                                                               key = value.f0;
+                                                       }
+                                                       out.collect(new 
Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+                                               }
+                                       })
+                                       .addSink(new ValidatingSink(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
+
+
+                       tryExecute(env, "Tumbling Window Test");
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testPreAggregatedTumblingTimeWindow() {
+               final int NUM_ELEMENTS_PER_KEY = 3000;
+               final int WINDOW_SIZE = 100;
+               final int NUM_KEYS = 100;
+               FailingSource.reset();
+
+               try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
+                                       "localhost", 
cluster.getLeaderRPCPort());
+
+                       env.setParallelism(PARALLELISM);
+                       
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+                       env.enableCheckpointing(100);
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+                       env.getConfig().disableSysoutLogging();
+                       env.setStateBackend(this.stateBackend);
+
+                       env
+                                       .addSource(new FailingSource(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+                                       .rebalance()
+                                       .keyBy(0)
+                                       .timeWindow(Time.of(WINDOW_SIZE, 
MILLISECONDS))
+                                       .reduce(
+                                                       new 
ReduceFunction<Tuple2<Long, IntType>>() {
+
+                                                               @Override
+                                                               public 
Tuple2<Long, IntType> reduce(
+                                                                               
Tuple2<Long, IntType> a,
+                                                                               
Tuple2<Long, IntType> b) {
+                                                                       return 
new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
+                                                               }
+                                                       },
+                                                       new 
RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, 
Tuple, TimeWindow>() {
+
+                                               private boolean open = false;
+
+                                               @Override
+                                               public void open(Configuration 
parameters) {
+                                                       
assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+                                                       open = true;
+                                               }
+
+                                               @Override
+                                               public void apply(
+                                                               Tuple tuple,
+                                                               TimeWindow 
window,
+                                                               
Iterable<Tuple2<Long, IntType>> input,
+                                                               
Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+                                                       // validate that the 
function has been opened properly
+                                                       assertTrue(open);
+
+                                                       for (Tuple2<Long, 
IntType> in: input) {
+                                                               out.collect(new 
Tuple4<>(in.f0,
+                                                                               
window.getStart(),
+                                                                               
window.getEnd(),
+                                                                               
in.f1));
+                                                       }
+                                               }
+                                       })
+                                       .addSink(new ValidatingSink(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
+
+
+                       tryExecute(env, "Tumbling Window Test");
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testPreAggregatedSlidingTimeWindow() {
+               final int NUM_ELEMENTS_PER_KEY = 3000;
+               final int WINDOW_SIZE = 1000;
+               final int WINDOW_SLIDE = 100;
+               final int NUM_KEYS = 100;
+               FailingSource.reset();
+
+               try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
+                                       "localhost", 
cluster.getLeaderRPCPort());
+
+                       env.setParallelism(PARALLELISM);
+                       
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+                       env.enableCheckpointing(100);
+                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+                       env.getConfig().disableSysoutLogging();
+                       env.setStateBackend(this.stateBackend);
+
+                       env
+                                       .addSource(new FailingSource(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
+                                       .rebalance()
+                                       .keyBy(0)
+                                       .timeWindow(Time.of(WINDOW_SIZE, 
MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
+                                       .reduce(
+                                                       new 
ReduceFunction<Tuple2<Long, IntType>>() {
+
+                                                               @Override
+                                                               public 
Tuple2<Long, IntType> reduce(
+                                                                               
Tuple2<Long, IntType> a,
+                                                                               
Tuple2<Long, IntType> b) {
+
+                                                                       // 
validate that the function has been opened properly
+                                                                       return 
new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
+                                                               }
+                                                       },
+                                                       new 
RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, 
Tuple, TimeWindow>() {
+
+                                               private boolean open = false;
+
+                                               @Override
+                                               public void open(Configuration 
parameters) {
+                                                       
assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+                                                       open = true;
+                                               }
+
+                                               @Override
+                                               public void apply(
+                                                               Tuple tuple,
+                                                               TimeWindow 
window,
+                                                               
Iterable<Tuple2<Long, IntType>> input,
+                                                               
Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+                                                       // validate that the 
function has been opened properly
+                                                       assertTrue(open);
+
+                                                       for (Tuple2<Long, 
IntType> in: input) {
+                                                               out.collect(new 
Tuple4<>(in.f0,
+                                                                               
window.getStart(),
+                                                                               
window.getEnd(),
+                                                                               
in.f1));
+                                                       }
+                                               }
+                                       })
+                                       .addSink(new ValidatingSink(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
+
+
+                       tryExecute(env, "Tumbling Window Test");
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       private static class FailingSource extends 
RichSourceFunction<Tuple2<Long, IntType>>
+                       implements Checkpointed<Integer>, CheckpointListener
+       {
+               private static volatile boolean failedBefore = false;
+
+               private final int numKeys;
+               private final int numElementsToEmit;
+               private final int failureAfterNumElements;
+
+               private volatile int numElementsEmitted;
+               private volatile int numSuccessfulCheckpoints;
+               private volatile boolean running = true;
+
+               private FailingSource(int numKeys, int numElementsToEmitPerKey, 
int failureAfterNumElements) {
+                       this.numKeys = numKeys;
+                       this.numElementsToEmit = numElementsToEmitPerKey;
+                       this.failureAfterNumElements = failureAfterNumElements;
+               }
+
+               @Override
+               public void open(Configuration parameters) {
+                       // non-parallel source
+                       assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
+               }
+
+               @Override
+               public void run(SourceContext<Tuple2<Long, IntType>> ctx) 
throws Exception {
+                       // we loop longer than we have elements, to permit 
delayed checkpoints
+                       // to still cause a failure
+                       while (running) {
+
+                               if (!failedBefore) {
+                                       // delay a bit, if we have not failed 
before
+                                       Thread.sleep(1);
+                                       if (numSuccessfulCheckpoints >= 2 && 
numElementsEmitted >= failureAfterNumElements) {
+                                               // cause a failure if we have 
not failed before and have reached
+                                               // enough completed checkpoints 
and elements
+                                               failedBefore = true;
+                                               throw new Exception("Artificial 
Failure");
+                                       }
+                               }
+
+                               if (numElementsEmitted < numElementsToEmit &&
+                                               (failedBefore || 
numElementsEmitted <= failureAfterNumElements))
+                               {
+                                       // the function failed before, or we 
are in the elements before the failure
+                                       synchronized (ctx.getCheckpointLock()) {
+                                               int next = numElementsEmitted++;
+                                               for (long i = 0; i < numKeys; 
i++) {
+                                                       
ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next);
+                                               }
+                                               ctx.emitWatermark(new 
Watermark(next));
+                                       }
+                               }
+                               else {
+
+                                       // if our work is done, delay a bit to 
prevent busy waiting
+                                       Thread.sleep(1);
+                               }
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       running = false;
+               }
+
+               @Override
+               public void notifyCheckpointComplete(long checkpointId) {
+                       numSuccessfulCheckpoints++;
+               }
+
+               @Override
+               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
+                       return numElementsEmitted;
+               }
+
+               @Override
+               public void restoreState(Integer state) {
+                       numElementsEmitted = state;
+               }
+
+               public static void reset() {
+                       failedBefore = false;
+               }
+       }
+
+       private static class ValidatingSink extends 
RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
+                       implements Checkpointed<HashMap<Long, Integer>> {
+
+               private final HashMap<Long, Integer> windowCounts = new 
HashMap<>();
+
+               private final int numKeys;
+               private final int numWindowsExpected;
+
+               private ValidatingSink(int numKeys, int numWindowsExpected) {
+                       this.numKeys = numKeys;
+                       this.numWindowsExpected = numWindowsExpected;
+               }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       // this sink can only work with DOP 1
+                       assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
+
+                       // it can happen that a checkpoint happens when the 
complete success state is
+                       // already set. In that case we restart with the final 
state and would never
+                       // finish because no more elements arrive.
+                       if (windowCounts.size() == numKeys) {
+                               boolean seenAll = true;
+                               for (Integer windowCount: 
windowCounts.values()) {
+                                       if (windowCount != numWindowsExpected) {
+                                               seenAll = false;
+                                               break;
+                                       }
+                               }
+                               if (seenAll) {
+                                       throw new SuccessException();
+                               }
+                       }
+               }
+
+               @Override
+               public void close() throws Exception {
+                       boolean seenAll = true;
+                       if (windowCounts.size() == numKeys) {
+                               for (Integer windowCount: 
windowCounts.values()) {
+                                       if (windowCount < numWindowsExpected) {
+                                               seenAll = false;
+                                               break;
+                                       }
+                               }
+                       }
+                       assertTrue("The sink must see all expected windows.", 
seenAll);
+               }
+
+               @Override
+               public void invoke(Tuple4<Long, Long, Long, IntType> value) 
throws Exception {
+
+                       // verify the contents of that window, Tuple4.f1 and 
.f2 are the window start/end
+                       // the sum should be "sum (start .. end-1)"
+
+                       int expectedSum = 0;
+                       for (long i = value.f1; i < value.f2; i++) {
+                               // only sum up positive vals, to filter out the 
negative start of the
+                               // first sliding windows
+                               if (i > 0) {
+                                       expectedSum += i;
+                               }
+                       }
+
+                       assertEquals("Window start: " + value.f1 + " end: " + 
value.f2, expectedSum, value.f3.value);
+
+
+                       Integer curr = windowCounts.get(value.f0);
+                       if (curr != null) {
+                               windowCounts.put(value.f0, curr + 1);
+                       }
+                       else {
+                               windowCounts.put(value.f0, 1);
+                       }
+
+                       if (windowCounts.size() == numKeys) {
+                               boolean seenAll = true;
+                               for (Integer windowCount: 
windowCounts.values()) {
+                                       if (windowCount < numWindowsExpected) {
+                                               seenAll = false;
+                                               break;
+                                       } else if (windowCount > 
numWindowsExpected) {
+                                               fail("Window count to high: " + 
windowCount);
+                                       }
+                               }
+
+                               if (seenAll) {
+                                       // exit
+                                       throw new SuccessException();
+                               }
+
+                       }
+               }
+
+               @Override
+               public HashMap<Long, Integer> snapshotState(long checkpointId, 
long checkpointTimestamp) {
+                       return this.windowCounts;
+               }
+
+               @Override
+               public void restoreState(HashMap<Long, Integer> state) {
+                       this.windowCounts.putAll(state);
+               }
+       }
+
+       // Sink for validating the stateful window counts
+       private static class CountValidatingSink extends 
RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
+                       implements Checkpointed<HashMap<Long, Integer>> {
+
+               private final HashMap<Long, Integer> windowCounts = new 
HashMap<>();
+
+               private final int numKeys;
+               private final int numWindowsExpected;
+
+               private CountValidatingSink(int numKeys, int 
numWindowsExpected) {
+                       this.numKeys = numKeys;
+                       this.numWindowsExpected = numWindowsExpected;
+               }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       // this sink can only work with DOP 1
+                       assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
+               }
+
+               @Override
+               public void close() throws Exception {
+                       boolean seenAll = true;
+                       if (windowCounts.size() == numKeys) {
+                               for (Integer windowCount: 
windowCounts.values()) {
+                                       if (windowCount < numWindowsExpected) {
+                                               seenAll = false;
+                                               break;
+                                       }
+                               }
+                       }
+                       assertTrue("The source must see all expected windows.", 
seenAll);
+               }
+
+               @Override
+               public void invoke(Tuple4<Long, Long, Long, IntType> value) 
throws Exception {
+
+                       Integer curr = windowCounts.get(value.f0);
+                       if (curr != null) {
+                               windowCounts.put(value.f0, curr + 1);
+                       }
+                       else {
+                               windowCounts.put(value.f0, 1);
+                       }
+
+
+                       // verify the contents of that window, the contents 
should be:
+                       // (key + num windows so far)
+
+                       assertEquals("Window counts don't match for key " + 
value.f0 + ".", value.f0.intValue() + windowCounts.get(value.f0), 
value.f3.value);
+
+                       boolean seenAll = true;
+                       if (windowCounts.size() == numKeys) {
+                               for (Integer windowCount: 
windowCounts.values()) {
+                                       if (windowCount < numWindowsExpected) {
+                                               seenAll = false;
+                                               break;
+                                       } else if (windowCount > 
numWindowsExpected) {
+                                               fail("Window count to high: " + 
windowCount);
+                                       }
+                               }
+
+                               if (seenAll) {
+                                       // exit
+                                       throw new SuccessException();
+                               }
+
+                       }
+               }
+
+               @Override
+               public HashMap<Long, Integer> snapshotState(long checkpointId, 
long checkpointTimestamp) {
+                       return this.windowCounts;
+               }
+
+               @Override
+               public void restoreState(HashMap<Long, Integer> state) {
+                       this.windowCounts.putAll(state);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       public static class IntType {
+
+               public int value;
+
+               public IntType() {}
+
+               public IntType(int value) { this.value = value; }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e288617f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 5d17608..b493e42 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -53,7 +53,7 @@ import static org.junit.Assert.*;
  * This verfies that checkpointing works correctly with event time windows.
  *
  * <p>
- * This is a version of {@link EventTimeWindowCheckpointingITCase} for 
All-Windows.
+ * This is a version of {@link AbstractEventTimeWindowCheckpointingITCase} for 
All-Windows.
  */
 @SuppressWarnings("serial")
 public class EventTimeAllWindowCheckpointingITCase extends TestLogger {

http://git-wip-us.apache.org/repos/asf/flink/blob/e288617f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index 50079d1..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,800 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.test.util.SuccessException;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.flink.test.util.TestUtils.tryExecute;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * This verifies that checkpointing works correctly with event time windows. 
This is more
- * strict than {@link WindowCheckpointingITCase} because for event-time the 
contents
- * of the emitted windows are deterministic.
- */
-@SuppressWarnings("serial")
-@RunWith(Parameterized.class)
-public class EventTimeWindowCheckpointingITCase extends TestLogger {
-
-       private static final int MAX_MEM_STATE_SIZE = 10 * 1024 * 1024;
-       private static final int PARALLELISM = 4;
-
-       private static LocalFlinkMiniCluster cluster;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       private StateBackendEnum stateBackendEnum;
-       private AbstractStateBackend stateBackend;
-
-       public EventTimeWindowCheckpointingITCase(StateBackendEnum 
stateBackendEnum) {
-               this.stateBackendEnum = stateBackendEnum;
-       }
-
-       @BeforeClass
-       public static void startTestCluster() {
-               Configuration config = new Configuration();
-               config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
PARALLELISM / 2);
-               config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 
48);
-
-               cluster = new LocalFlinkMiniCluster(config, false);
-               cluster.start();
-       }
-
-       @AfterClass
-       public static void stopTestCluster() {
-               if (cluster != null) {
-                       cluster.stop();
-               }
-       }
-
-       @Before
-       public void initStateBackend() throws IOException {
-               switch (stateBackendEnum) {
-                       case MEM:
-                               this.stateBackend = new 
MemoryStateBackend(MAX_MEM_STATE_SIZE);
-                               break;
-                       case FILE: {
-                               String backups = 
tempFolder.newFolder().getAbsolutePath();
-                               this.stateBackend = new 
FsStateBackend("file://" + backups);
-                               break;
-                       }
-                       case ROCKSDB_FULLY_ASYNC: {
-                               String rocksDb = 
tempFolder.newFolder().getAbsolutePath();
-                               RocksDBStateBackend rdb = new 
RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE));
-                               rdb.setDbStoragePath(rocksDb);
-                               this.stateBackend = rdb;
-                               break;
-                       }
-
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Test
-       public void testTumblingTimeWindow() {
-               final int NUM_ELEMENTS_PER_KEY = 3000;
-               final int WINDOW_SIZE = 100;
-               final int NUM_KEYS = 100;
-               FailingSource.reset();
-               
-               try {
-                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
-                                       "localhost", 
cluster.getLeaderRPCPort());
-                       
-                       env.setParallelism(PARALLELISM);
-                       
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-                       env.enableCheckpointing(100);
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
-                       env.getConfig().disableSysoutLogging();
-                       env.setStateBackend(this.stateBackend);
-
-                       env
-                                       .addSource(new FailingSource(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
-                                       .rebalance()
-                                       .keyBy(0)
-                                       .timeWindow(Time.of(WINDOW_SIZE, 
MILLISECONDS))
-                                       .apply(new 
RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, 
Tuple, TimeWindow>() {
-
-                                               private boolean open = false;
-
-                                               @Override
-                                               public void open(Configuration 
parameters) {
-                                                       
assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
-                                                       open = true;
-                                               }
-
-                                               @Override
-                                               public void apply(
-                                                               Tuple tuple,
-                                                               TimeWindow 
window,
-                                                               
Iterable<Tuple2<Long, IntType>> values,
-                                                               
Collector<Tuple4<Long, Long, Long, IntType>> out) {
-
-                                                       // validate that the 
function has been opened properly
-                                                       assertTrue(open);
-
-                                                       int sum = 0;
-                                                       long key = -1;
-
-                                                       for (Tuple2<Long, 
IntType> value : values) {
-                                                               sum += 
value.f1.value;
-                                                               key = value.f0;
-                                                       }
-                                                       out.collect(new 
Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
-                                               }
-                                       })
-                                       .addSink(new ValidatingSink(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
-
-
-                       tryExecute(env, "Tumbling Window Test");
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testTumblingTimeWindowWithKVStateMinMaxParallelism() {
-               doTestTumblingTimeWindowWithKVState(PARALLELISM);
-       }
-
-       @Test
-       public void testTumblingTimeWindowWithKVStateMaxMaxParallelism() {
-               doTestTumblingTimeWindowWithKVState(1 << 15);
-       }
-
-       public void doTestTumblingTimeWindowWithKVState(int maxParallelism) {
-               final int NUM_ELEMENTS_PER_KEY = 3000;
-               final int WINDOW_SIZE = 100;
-               final int NUM_KEYS = 100;
-               FailingSource.reset();
-
-               try {
-                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
-                                       "localhost", 
cluster.getLeaderRPCPort());
-
-                       env.setParallelism(PARALLELISM);
-                       env.setMaxParallelism(maxParallelism);
-                       
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-                       env.enableCheckpointing(100);
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
-                       env.getConfig().disableSysoutLogging();
-                       env.setStateBackend(this.stateBackend);
-
-                       env
-                                       .addSource(new FailingSource(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
-                                       .rebalance()
-                                       .keyBy(0)
-                                       .timeWindow(Time.of(WINDOW_SIZE, 
MILLISECONDS))
-                                       .apply(new 
RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, 
Tuple, TimeWindow>() {
-
-                                               private boolean open = false;
-
-                                               private ValueState<Integer> 
count;
-
-                                               @Override
-                                               public void open(Configuration 
parameters) {
-                                                       
assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
-                                                       open = true;
-                                                       count = 
getRuntimeContext().getState(
-                                                                       new 
ValueStateDescriptor<>("count", Integer.class, 0));
-                                               }
-
-                                               @Override
-                                               public void apply(
-                                                               Tuple tuple,
-                                                               TimeWindow 
window,
-                                                               
Iterable<Tuple2<Long, IntType>> values,
-                                                               
Collector<Tuple4<Long, Long, Long, IntType>> out) throws Exception {
-
-                                                       // the window count 
state starts with the key, so that we get
-                                                       // different count 
results for each key
-                                                       if (count.value() == 0) 
{
-                                                               
count.update(tuple.<Long>getField(0).intValue());
-                                                       }
-
-                                                       // validate that the 
function has been opened properly
-                                                       assertTrue(open);
-
-                                                       
count.update(count.value() + 1);
-                                                       out.collect(new 
Tuple4<>(tuple.<Long>getField(0), window.getStart(), window.getEnd(), new 
IntType(count.value())));
-                                               }
-                                       })
-                                       .addSink(new 
CountValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / 
WINDOW_SIZE)).setParallelism(1);
-
-
-                       tryExecute(env, "Tumbling Window Test");
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testSlidingTimeWindow() {
-               final int NUM_ELEMENTS_PER_KEY = 3000;
-               final int WINDOW_SIZE = 1000;
-               final int WINDOW_SLIDE = 100;
-               final int NUM_KEYS = 100;
-               FailingSource.reset();
-
-               try {
-                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
-                                       "localhost", 
cluster.getLeaderRPCPort());
-
-                       env.setMaxParallelism(2 * PARALLELISM);
-                       env.setParallelism(PARALLELISM);
-                       
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-                       env.enableCheckpointing(100);
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
-                       env.getConfig().disableSysoutLogging();
-                       env.setStateBackend(this.stateBackend);
-
-                       env
-                                       .addSource(new FailingSource(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
-                                       .rebalance()
-                                       .keyBy(0)
-                                       .timeWindow(Time.of(WINDOW_SIZE, 
MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
-                                       .apply(new 
RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, 
Tuple, TimeWindow>() {
-
-                                               private boolean open = false;
-
-                                               @Override
-                                               public void open(Configuration 
parameters) {
-                                                       
assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
-                                                       open = true;
-                                               }
-
-                                               @Override
-                                               public void apply(
-                                                               Tuple tuple,
-                                                               TimeWindow 
window,
-                                                               
Iterable<Tuple2<Long, IntType>> values,
-                                                               
Collector<Tuple4<Long, Long, Long, IntType>> out) {
-
-                                                       // validate that the 
function has been opened properly
-                                                       assertTrue(open);
-
-                                                       int sum = 0;
-                                                       long key = -1;
-
-                                                       for (Tuple2<Long, 
IntType> value : values) {
-                                                               sum += 
value.f1.value;
-                                                               key = value.f0;
-                                                       }
-                                                       out.collect(new 
Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
-                                               }
-                                       })
-                                       .addSink(new ValidatingSink(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
-
-
-                       tryExecute(env, "Tumbling Window Test");
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testPreAggregatedTumblingTimeWindow() {
-               final int NUM_ELEMENTS_PER_KEY = 3000;
-               final int WINDOW_SIZE = 100;
-               final int NUM_KEYS = 100;
-               FailingSource.reset();
-
-               try {
-                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
-                                       "localhost", 
cluster.getLeaderRPCPort());
-
-                       env.setParallelism(PARALLELISM);
-                       
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-                       env.enableCheckpointing(100);
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
-                       env.getConfig().disableSysoutLogging();
-                       env.setStateBackend(this.stateBackend);
-
-                       env
-                                       .addSource(new FailingSource(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
-                                       .rebalance()
-                                       .keyBy(0)
-                                       .timeWindow(Time.of(WINDOW_SIZE, 
MILLISECONDS))
-                                       .reduce(
-                                                       new 
ReduceFunction<Tuple2<Long, IntType>>() {
-
-                                                               @Override
-                                                               public 
Tuple2<Long, IntType> reduce(
-                                                                               
Tuple2<Long, IntType> a,
-                                                                               
Tuple2<Long, IntType> b) {
-                                                                       return 
new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
-                                                               }
-                                                       },
-                                                       new 
RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, 
Tuple, TimeWindow>() {
-
-                                               private boolean open = false;
-
-                                               @Override
-                                               public void open(Configuration 
parameters) {
-                                                       
assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
-                                                       open = true;
-                                               }
-
-                                               @Override
-                                               public void apply(
-                                                               Tuple tuple,
-                                                               TimeWindow 
window,
-                                                               
Iterable<Tuple2<Long, IntType>> input,
-                                                               
Collector<Tuple4<Long, Long, Long, IntType>> out) {
-
-                                                       // validate that the 
function has been opened properly
-                                                       assertTrue(open);
-
-                                                       for (Tuple2<Long, 
IntType> in: input) {
-                                                               out.collect(new 
Tuple4<>(in.f0,
-                                                                               
window.getStart(),
-                                                                               
window.getEnd(),
-                                                                               
in.f1));
-                                                       }
-                                               }
-                                       })
-                                       .addSink(new ValidatingSink(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1);
-
-
-                       tryExecute(env, "Tumbling Window Test");
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testPreAggregatedSlidingTimeWindow() {
-               final int NUM_ELEMENTS_PER_KEY = 3000;
-               final int WINDOW_SIZE = 1000;
-               final int WINDOW_SLIDE = 100;
-               final int NUM_KEYS = 100;
-               FailingSource.reset();
-
-               try {
-                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
-                                       "localhost", 
cluster.getLeaderRPCPort());
-
-                       env.setParallelism(PARALLELISM);
-                       
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-                       env.enableCheckpointing(100);
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
-                       env.getConfig().disableSysoutLogging();
-                       env.setStateBackend(this.stateBackend);
-
-                       env
-                                       .addSource(new FailingSource(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))
-                                       .rebalance()
-                                       .keyBy(0)
-                                       .timeWindow(Time.of(WINDOW_SIZE, 
MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS))
-                                       .reduce(
-                                                       new 
ReduceFunction<Tuple2<Long, IntType>>() {
-
-                                                               @Override
-                                                               public 
Tuple2<Long, IntType> reduce(
-                                                                               
Tuple2<Long, IntType> a,
-                                                                               
Tuple2<Long, IntType> b) {
-
-                                                                       // 
validate that the function has been opened properly
-                                                                       return 
new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
-                                                               }
-                                                       },
-                                                       new 
RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, 
Tuple, TimeWindow>() {
-
-                                               private boolean open = false;
-
-                                               @Override
-                                               public void open(Configuration 
parameters) {
-                                                       
assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
-                                                       open = true;
-                                               }
-
-                                               @Override
-                                               public void apply(
-                                                               Tuple tuple,
-                                                               TimeWindow 
window,
-                                                               
Iterable<Tuple2<Long, IntType>> input,
-                                                               
Collector<Tuple4<Long, Long, Long, IntType>> out) {
-
-                                                       // validate that the 
function has been opened properly
-                                                       assertTrue(open);
-
-                                                       for (Tuple2<Long, 
IntType> in: input) {
-                                                               out.collect(new 
Tuple4<>(in.f0,
-                                                                               
window.getStart(),
-                                                                               
window.getEnd(),
-                                                                               
in.f1));
-                                                       }
-                                               }
-                                       })
-                                       .addSink(new ValidatingSink(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1);
-
-
-                       tryExecute(env, "Tumbling Window Test");
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       private static class FailingSource extends 
RichSourceFunction<Tuple2<Long, IntType>>
-                       implements Checkpointed<Integer>, CheckpointListener
-       {
-               private static volatile boolean failedBefore = false;
-
-               private final int numKeys;
-               private final int numElementsToEmit;
-               private final int failureAfterNumElements;
-
-               private volatile int numElementsEmitted;
-               private volatile int numSuccessfulCheckpoints;
-               private volatile boolean running = true;
-
-               private FailingSource(int numKeys, int numElementsToEmitPerKey, 
int failureAfterNumElements) {
-                       this.numKeys = numKeys;
-                       this.numElementsToEmit = numElementsToEmitPerKey;
-                       this.failureAfterNumElements = failureAfterNumElements;
-               }
-
-               @Override
-               public void open(Configuration parameters) {
-                       // non-parallel source
-                       assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
-               }
-
-               @Override
-               public void run(SourceContext<Tuple2<Long, IntType>> ctx) 
throws Exception {
-                       // we loop longer than we have elements, to permit 
delayed checkpoints
-                       // to still cause a failure
-                       while (running) {
-
-                               if (!failedBefore) {
-                                       // delay a bit, if we have not failed 
before
-                                       Thread.sleep(1);
-                                       if (numSuccessfulCheckpoints >= 2 && 
numElementsEmitted >= failureAfterNumElements) {
-                                               // cause a failure if we have 
not failed before and have reached
-                                               // enough completed checkpoints 
and elements
-                                               failedBefore = true;
-                                               throw new Exception("Artificial 
Failure");
-                                       }
-                               }
-
-                               if (numElementsEmitted < numElementsToEmit &&
-                                               (failedBefore || 
numElementsEmitted <= failureAfterNumElements))
-                               {
-                                       // the function failed before, or we 
are in the elements before the failure
-                                       synchronized (ctx.getCheckpointLock()) {
-                                               int next = numElementsEmitted++;
-                                               for (long i = 0; i < numKeys; 
i++) {
-                                                       
ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next);
-                                               }
-                                               ctx.emitWatermark(new 
Watermark(next));
-                                       }
-                               }
-                               else {
-
-                                       // if our work is done, delay a bit to 
prevent busy waiting
-                                       Thread.sleep(1);
-                               }
-                       }
-               }
-
-               @Override
-               public void cancel() {
-                       running = false;
-               }
-
-               @Override
-               public void notifyCheckpointComplete(long checkpointId) {
-                       numSuccessfulCheckpoints++;
-               }
-
-               @Override
-               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return numElementsEmitted;
-               }
-
-               @Override
-               public void restoreState(Integer state) {
-                       numElementsEmitted = state;
-               }
-
-               public static void reset() {
-                       failedBefore = false;
-               }
-       }
-
-       private static class ValidatingSink extends 
RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
-                       implements Checkpointed<HashMap<Long, Integer>> {
-
-               private final HashMap<Long, Integer> windowCounts = new 
HashMap<>();
-
-               private final int numKeys;
-               private final int numWindowsExpected;
-
-               private ValidatingSink(int numKeys, int numWindowsExpected) {
-                       this.numKeys = numKeys;
-                       this.numWindowsExpected = numWindowsExpected;
-               }
-
-               @Override
-               public void open(Configuration parameters) throws Exception {
-                       // this sink can only work with DOP 1
-                       assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
-
-                       // it can happen that a checkpoint happens when the 
complete success state is
-                       // already set. In that case we restart with the final 
state and would never
-                       // finish because no more elements arrive.
-                       if (windowCounts.size() == numKeys) {
-                               boolean seenAll = true;
-                               for (Integer windowCount: 
windowCounts.values()) {
-                                       if (windowCount != numWindowsExpected) {
-                                               seenAll = false;
-                                               break;
-                                       }
-                               }
-                               if (seenAll) {
-                                       throw new SuccessException();
-                               }
-                       }
-               }
-
-               @Override
-               public void close() throws Exception {
-                       boolean seenAll = true;
-                       if (windowCounts.size() == numKeys) {
-                               for (Integer windowCount: 
windowCounts.values()) {
-                                       if (windowCount < numWindowsExpected) {
-                                               seenAll = false;
-                                               break;
-                                       }
-                               }
-                       }
-                       assertTrue("The sink must see all expected windows.", 
seenAll);
-               }
-
-               @Override
-               public void invoke(Tuple4<Long, Long, Long, IntType> value) 
throws Exception {
-
-                       // verify the contents of that window, Tuple4.f1 and 
.f2 are the window start/end
-                       // the sum should be "sum (start .. end-1)"
-
-                       int expectedSum = 0;
-                       for (long i = value.f1; i < value.f2; i++) {
-                               // only sum up positive vals, to filter out the 
negative start of the
-                               // first sliding windows
-                               if (i > 0) {
-                                       expectedSum += i;
-                               }
-                       }
-
-                       assertEquals("Window start: " + value.f1 + " end: " + 
value.f2, expectedSum, value.f3.value);
-
-
-                       Integer curr = windowCounts.get(value.f0);
-                       if (curr != null) {
-                               windowCounts.put(value.f0, curr + 1);
-                       }
-                       else {
-                               windowCounts.put(value.f0, 1);
-                       }
-
-                       if (windowCounts.size() == numKeys) {
-                               boolean seenAll = true;
-                               for (Integer windowCount: 
windowCounts.values()) {
-                                       if (windowCount < numWindowsExpected) {
-                                               seenAll = false;
-                                               break;
-                                       } else if (windowCount > 
numWindowsExpected) {
-                                               fail("Window count to high: " + 
windowCount);
-                                       }
-                               }
-
-                               if (seenAll) {
-                                       // exit
-                                       throw new SuccessException();
-                               }
-
-                       }
-               }
-
-               @Override
-               public HashMap<Long, Integer> snapshotState(long checkpointId, 
long checkpointTimestamp) {
-                       return this.windowCounts;
-               }
-
-               @Override
-               public void restoreState(HashMap<Long, Integer> state) {
-                       this.windowCounts.putAll(state);
-               }
-       }
-
-       // Sink for validating the stateful window counts
-       private static class CountValidatingSink extends 
RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
-                       implements Checkpointed<HashMap<Long, Integer>> {
-
-               private final HashMap<Long, Integer> windowCounts = new 
HashMap<>();
-
-               private final int numKeys;
-               private final int numWindowsExpected;
-
-               private CountValidatingSink(int numKeys, int 
numWindowsExpected) {
-                       this.numKeys = numKeys;
-                       this.numWindowsExpected = numWindowsExpected;
-               }
-
-               @Override
-               public void open(Configuration parameters) throws Exception {
-                       // this sink can only work with DOP 1
-                       assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
-               }
-
-               @Override
-               public void close() throws Exception {
-                       boolean seenAll = true;
-                       if (windowCounts.size() == numKeys) {
-                               for (Integer windowCount: 
windowCounts.values()) {
-                                       if (windowCount < numWindowsExpected) {
-                                               seenAll = false;
-                                               break;
-                                       }
-                               }
-                       }
-                       assertTrue("The source must see all expected windows.", 
seenAll);
-               }
-
-               @Override
-               public void invoke(Tuple4<Long, Long, Long, IntType> value) 
throws Exception {
-
-                       Integer curr = windowCounts.get(value.f0);
-                       if (curr != null) {
-                               windowCounts.put(value.f0, curr + 1);
-                       }
-                       else {
-                               windowCounts.put(value.f0, 1);
-                       }
-
-
-                       // verify the contents of that window, the contents 
should be:
-                       // (key + num windows so far)
-
-                       assertEquals("Window counts don't match for key " + 
value.f0 + ".", value.f0.intValue() + windowCounts.get(value.f0), 
value.f3.value);
-
-                       boolean seenAll = true;
-                       if (windowCounts.size() == numKeys) {
-                               for (Integer windowCount: 
windowCounts.values()) {
-                                       if (windowCount < numWindowsExpected) {
-                                               seenAll = false;
-                                               break;
-                                       } else if (windowCount > 
numWindowsExpected) {
-                                               fail("Window count to high: " + 
windowCount);
-                                       }
-                               }
-
-                               if (seenAll) {
-                                       // exit
-                                       throw new SuccessException();
-                               }
-
-                       }
-               }
-
-               @Override
-               public HashMap<Long, Integer> snapshotState(long checkpointId, 
long checkpointTimestamp) {
-                       return this.windowCounts;
-               }
-
-               @Override
-               public void restoreState(HashMap<Long, Integer> state) {
-                       this.windowCounts.putAll(state);
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Parametrization for testing with different state backends
-       // 
------------------------------------------------------------------------
-
-
-       @Parameterized.Parameters(name = "StateBackend = {0}")
-       @SuppressWarnings("unchecked,rawtypes")
-       public static Collection<Object[]> parameters(){
-               return Arrays.asList(new Object[][] {
-                               {StateBackendEnum.MEM},
-                               {StateBackendEnum.FILE},
-                               {StateBackendEnum.ROCKSDB_FULLY_ASYNC}
-                       }
-               );
-       }
-
-       private enum StateBackendEnum {
-               MEM, FILE, ROCKSDB_FULLY_ASYNC
-       }
-
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       public static class IntType {
-
-               public int value;
-
-               public IntType() {}
-
-               public IntType(int value) { this.value = value; }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e288617f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..65fda09
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+public class FileBackendEventTimeWindowCheckpointingITCase extends 
AbstractEventTimeWindowCheckpointingITCase {
+
+       public FileBackendEventTimeWindowCheckpointingITCase() {
+               super(StateBackendEnum.FILE);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e288617f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..899b8d6
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+public class MemBackendEventTimeWindowCheckpointingITCase extends 
AbstractEventTimeWindowCheckpointingITCase {
+
+       public MemBackendEventTimeWindowCheckpointingITCase() {
+               super(StateBackendEnum.MEM);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e288617f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..14feb78
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+public class RocksDbBackendEventTimeWindowCheckpointingITCase extends 
AbstractEventTimeWindowCheckpointingITCase {
+
+       public RocksDbBackendEventTimeWindowCheckpointingITCase() {
+               super(StateBackendEnum.ROCKSDB_FULLY_ASYNC);
+       }
+}

Reply via email to