http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
deleted file mode 100644
index f517f83..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * A simple test that runs a streaming topology with checkpointing enabled.
- *
- * The test triggers a failure after a while and verifies that, after 
completion, the
- * state defined with either the {@link OperatorState} or the {@link 
Checkpointed}
- * interface reflects the "exactly once" semantics.
- * 
- * The test throttles the input until at least two checkpoints are completed, 
to make sure that
- * the recovery does not fall back to "square one" (which would naturally lead 
to correct
- * results without testing the checkpointing).
- */
-@SuppressWarnings("serial")
-public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(StateCheckpoinedITCase.class);
-
-       final long NUM_STRINGS = 10_000_000L;
-
-       /**
-        * Runs the following program:
-        *
-        * <pre>
-        *     [ (source)->(filter)] -> [ (map) -> (map) ] -> [ 
(groupBy/reduce)->(sink) ]
-        * </pre>
-        */
-       @Override
-       public void testProgram(StreamExecutionEnvironment env) {
-               assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
-
-               final long failurePosMin = (long) (0.4 * NUM_STRINGS / 
PARALLELISM);
-               final long failurePosMax = (long) (0.7 * NUM_STRINGS / 
PARALLELISM);
-
-               final long failurePos = (new Random().nextLong() % 
(failurePosMax - failurePosMin)) + failurePosMin;
-
-               env.enableCheckpointing(200);
-
-               DataStream<String> stream = env.addSource(new 
StringGeneratingSourceFunction(NUM_STRINGS));
-
-               stream
-                               // first vertex, chained to the source
-                               // this filter throttles the flow until at 
least one checkpoint
-                               // is complete, to make sure this program does 
not run without 
-                               .filter(new StringRichFilterFunction())
-
-                                               // -------------- seconds 
vertex - one-to-one connected ----------------
-                               .map(new StringPrefixCountRichMapFunction())
-                               .startNewChain()
-                               .map(new StatefulCounterFunction())
-
-                                               // -------------- third vertex 
- reducer and the sink ----------------
-                               .partitionByHash("prefix")
-                               .flatMap(new OnceFailingAggregator(failurePos))
-                               .addSink(new ValidatingSink());
-       }
-
-       @Override
-       public void postSubmit() {
-               
-               //assertTrue("Test inconclusive: failure occurred before first 
checkpoint",
-               //              
OnceFailingAggregator.wasCheckpointedBeforeFailure);
-               if(!OnceFailingAggregator.wasCheckpointedBeforeFailure) {
-                       LOG.warn("Test inconclusive: failure occurred before 
first checkpoint");
-               }
-               
-               long filterSum = 0;
-               for (long l : StringRichFilterFunction.counts) {
-                       filterSum += l;
-               }
-
-               long mapSum = 0;
-               for (long l : StringPrefixCountRichMapFunction.counts) {
-                       mapSum += l;
-               }
-
-               long countSum = 0;
-               for (long l : StatefulCounterFunction.counts) {
-                       countSum += l;
-               }
-
-               // verify that we counted exactly right
-               assertEquals(NUM_STRINGS, filterSum);
-               assertEquals(NUM_STRINGS, mapSum);
-               assertEquals(NUM_STRINGS, countSum);
-
-               for (Map<Character, Long> map : ValidatingSink.maps) {
-                       for (Long count : map.values()) {
-                               assertEquals(NUM_STRINGS / 40, 
count.longValue());
-                       }
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Custom Functions
-       // 
--------------------------------------------------------------------------------------------
-       
-       private static class StringGeneratingSourceFunction extends 
RichParallelSourceFunction<String> 
-                       implements CheckpointedAsynchronously<Integer>
-       {
-               private final long numElements;
-
-               private int index;
-
-               private volatile boolean isRunning = true;
-               
-               
-               StringGeneratingSourceFunction(long numElements) {
-                       this.numElements = numElements;
-               }
-
-               @Override
-               public void run(SourceContext<String> ctx) throws Exception {
-                       final Object lockingObject = ctx.getCheckpointLock();
-
-                       final Random rnd = new Random();
-                       final StringBuilder stringBuilder = new StringBuilder();
-                       
-                       final int step = 
getRuntimeContext().getNumberOfParallelSubtasks();
-                       
-                       if (index == 0) {
-                               index = 
getRuntimeContext().getIndexOfThisSubtask();
-                       }
-
-                       while (isRunning && index < numElements) {
-                               char first = (char) ((index % 40) + 40);
-
-                               stringBuilder.setLength(0);
-                               stringBuilder.append(first);
-
-                               String result = randomString(stringBuilder, 
rnd);
-
-                               synchronized (lockingObject) {
-                                       index += step;
-                                       ctx.collect(result);
-                               }
-                       }
-               }
-               
-               @Override
-               public void cancel() {
-                       isRunning = false;
-               }
-
-               private static String randomString(StringBuilder bld, Random 
rnd) {
-                       final int len = rnd.nextInt(10) + 5;
-
-                       for (int i = 0; i < len; i++) {
-                               char next = (char) (rnd.nextInt(20000) + 33);
-                               bld.append(next);
-                       }
-
-                       return bld.toString();
-               }
-
-               @Override
-               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return index;
-               }
-
-               @Override
-               public void restoreState(Integer state) {
-                       index = state;
-               }
-       }
-
-       private static class StringRichFilterFunction extends 
RichFilterFunction<String> 
-                       implements Checkpointed<Long> {
-
-               static final long[] counts = new long[PARALLELISM];
-               
-               private long count;
-               
-               @Override
-               public boolean filter(String value) throws Exception {
-                       count++;
-                       return value.length() < 100; // should be always true
-               }
-
-               @Override
-               public void close() {
-                       counts[getRuntimeContext().getIndexOfThisSubtask()] = 
count;
-               }
-
-               @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return count;
-               }
-
-               @Override
-               public void restoreState(Long state) {
-                       count = state;
-               }
-       }
-
-       private static class StringPrefixCountRichMapFunction extends 
RichMapFunction<String, PrefixCount> 
-                       implements CheckpointedAsynchronously<Long> {
-               
-               static final long[] counts = new long[PARALLELISM];
-
-               private long count;
-               
-               @Override
-               public PrefixCount map(String value) {
-                       count++;
-                       return new PrefixCount(value.substring(0, 1), value, 
1L);
-               }
-
-               @Override
-               public void close() {
-                       counts[getRuntimeContext().getIndexOfThisSubtask()] = 
count;
-               }
-
-               @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return count;
-               }
-
-               @Override
-               public void restoreState(Long state) {
-                       count = state;
-               }
-       }
-       
-       private static class StatefulCounterFunction extends 
RichMapFunction<PrefixCount, PrefixCount> 
-               implements Checkpointed<Long> {
-
-               static final long[] counts = new long[PARALLELISM];
-               
-               private long count;
-
-               @Override
-               public PrefixCount map(PrefixCount value) throws Exception {
-                       count++;
-                       return value;
-               }
-
-               @Override
-               public void close() throws IOException {
-                       counts[getRuntimeContext().getIndexOfThisSubtask()] = 
count;
-               }
-
-               @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return count;
-               }
-
-               @Override
-               public void restoreState(Long state) {
-                       count = state;
-               }
-       }
-       
-       private static class OnceFailingAggregator extends 
RichFlatMapFunction<PrefixCount, PrefixCount> 
-               implements Checkpointed<HashMap<String, PrefixCount>>, 
CheckpointNotifier {
-
-               static boolean wasCheckpointedBeforeFailure = false;
-               
-               private static volatile boolean hasFailed = false;
-
-               private final HashMap<String, PrefixCount> aggregationMap = new 
HashMap<String, PrefixCount>();
-               
-               private long failurePos;
-               private long count;
-               
-               private boolean wasCheckpointed;
-               
-
-               OnceFailingAggregator(long failurePos) {
-                       this.failurePos = failurePos;
-               }
-               
-               @Override
-               public void open(Configuration parameters) {
-                       count = 0;
-               }
-
-               @Override
-               public void flatMap(PrefixCount value, Collector<PrefixCount> 
out) throws Exception {
-                       count++;
-                       if (!hasFailed && count >= failurePos && 
getRuntimeContext().getIndexOfThisSubtask() == 1) {
-                               wasCheckpointedBeforeFailure = wasCheckpointed;
-                               hasFailed = true;
-                               throw new Exception("Test Failure");
-                       }
-                       
-                       PrefixCount curr = aggregationMap.get(value.prefix);
-                       if (curr == null) {
-                               aggregationMap.put(value.prefix, value);
-                               out.collect(value);
-                       }
-                       else {
-                               curr.count += value.count;
-                               out.collect(curr);
-                       }
-               }
-
-               @Override
-               public HashMap<String, PrefixCount> snapshotState(long 
checkpointId, long checkpointTimestamp) {
-                       return aggregationMap;
-               }
-
-               @Override
-               public void restoreState(HashMap<String, PrefixCount> state) {
-                       aggregationMap.putAll(state);
-               }
-
-               @Override
-               public void notifyCheckpointComplete(long checkpointId) {
-                       this.wasCheckpointed = true;
-               }
-       }
-
-       private static class ValidatingSink extends 
RichSinkFunction<PrefixCount> 
-                       implements Checkpointed<HashMap<Character, Long>> {
-
-               @SuppressWarnings("unchecked")
-               private static Map<Character, Long>[] maps = (Map<Character, 
Long>[]) new Map<?, ?>[PARALLELISM];
-               
-               private HashMap<Character, Long> counts = new 
HashMap<Character, Long>();
-
-               @Override
-               public void invoke(PrefixCount value) {
-                       Character first = value.prefix.charAt(0);
-                       Long previous = counts.get(first);
-                       if (previous == null) {
-                               counts.put(first, value.count);
-                       } else {
-                               counts.put(first, Math.max(previous, 
value.count));
-                       }
-               }
-
-               @Override
-               public void close() throws Exception {
-                       maps[getRuntimeContext().getIndexOfThisSubtask()] = 
counts;
-               }
-
-               @Override
-               public HashMap<Character, Long> snapshotState(long 
checkpointId, long checkpointTimestamp) {
-                       return counts;
-               }
-
-               @Override
-               public void restoreState(HashMap<Character, Long> state) {
-                       counts.putAll(state);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
new file mode 100644
index 0000000..d7c06f6
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * A simple test that runs a streaming topology with checkpointing enabled.
+ *
+ * The test triggers a failure after a while and verifies that, after 
completion, the
+ * state defined with either the {@link OperatorState} or the {@link 
Checkpointed}
+ * interface reflects the "exactly once" semantics.
+ * 
+ * The test throttles the input until at least two checkpoints are completed, 
to make sure that
+ * the recovery does not fall back to "square one" (which would naturally lead 
to correct
+ * results without testing the checkpointing).
+ */
+@SuppressWarnings("serial")
+public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(StateCheckpointedITCase.class);
+
+       final long NUM_STRINGS = 10_000_000L;
+
+       /**
+        * Runs the following program:
+        *
+        * <pre>
+        *     [ (source)->(filter)] -> [ (map) -> (map) ] -> [ 
(groupBy/reduce)->(sink) ]
+        * </pre>
+        */
+       @Override
+       public void testProgram(StreamExecutionEnvironment env) {
+               assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
+
+               final long failurePosMin = (long) (0.4 * NUM_STRINGS / 
PARALLELISM);
+               final long failurePosMax = (long) (0.7 * NUM_STRINGS / 
PARALLELISM);
+
+               final long failurePos = (new Random().nextLong() % 
(failurePosMax - failurePosMin)) + failurePosMin;
+
+               env.enableCheckpointing(200);
+
+               DataStream<String> stream = env.addSource(new 
StringGeneratingSourceFunction(NUM_STRINGS));
+
+               stream
+                               // first vertex, chained to the source
+                               // this filter throttles the flow until at 
least one checkpoint
+                               // is complete, to make sure this program does 
not run without 
+                               .filter(new StringRichFilterFunction())
+
+                                               // -------------- seconds 
vertex - one-to-one connected ----------------
+                               .map(new StringPrefixCountRichMapFunction())
+                               .startNewChain()
+                               .map(new StatefulCounterFunction())
+
+                                               // -------------- third vertex 
- reducer and the sink ----------------
+                               .partitionByHash("prefix")
+                               .flatMap(new OnceFailingAggregator(failurePos))
+                               .addSink(new ValidatingSink());
+       }
+
+       @Override
+       public void postSubmit() {
+               
+               //assertTrue("Test inconclusive: failure occurred before first 
checkpoint",
+               //              
OnceFailingAggregator.wasCheckpointedBeforeFailure);
+               if(!OnceFailingAggregator.wasCheckpointedBeforeFailure) {
+                       LOG.warn("Test inconclusive: failure occurred before 
first checkpoint");
+               }
+               
+               long filterSum = 0;
+               for (long l : StringRichFilterFunction.counts) {
+                       filterSum += l;
+               }
+
+               long mapSum = 0;
+               for (long l : StringPrefixCountRichMapFunction.counts) {
+                       mapSum += l;
+               }
+
+               long countSum = 0;
+               for (long l : StatefulCounterFunction.counts) {
+                       countSum += l;
+               }
+
+               // verify that we counted exactly right
+               assertEquals(NUM_STRINGS, filterSum);
+               assertEquals(NUM_STRINGS, mapSum);
+               assertEquals(NUM_STRINGS, countSum);
+
+               for (Map<Character, Long> map : ValidatingSink.maps) {
+                       for (Long count : map.values()) {
+                               assertEquals(NUM_STRINGS / 40, 
count.longValue());
+                       }
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Custom Functions
+       // 
--------------------------------------------------------------------------------------------
+       
+       private static class StringGeneratingSourceFunction extends 
RichParallelSourceFunction<String> 
+                       implements CheckpointedAsynchronously<Integer>
+       {
+               private final long numElements;
+
+               private int index;
+
+               private volatile boolean isRunning = true;
+               
+               
+               StringGeneratingSourceFunction(long numElements) {
+                       this.numElements = numElements;
+               }
+
+               @Override
+               public void run(SourceContext<String> ctx) throws Exception {
+                       final Object lockingObject = ctx.getCheckpointLock();
+
+                       final Random rnd = new Random();
+                       final StringBuilder stringBuilder = new StringBuilder();
+                       
+                       final int step = 
getRuntimeContext().getNumberOfParallelSubtasks();
+                       
+                       if (index == 0) {
+                               index = 
getRuntimeContext().getIndexOfThisSubtask();
+                       }
+
+                       while (isRunning && index < numElements) {
+                               char first = (char) ((index % 40) + 40);
+
+                               stringBuilder.setLength(0);
+                               stringBuilder.append(first);
+
+                               String result = randomString(stringBuilder, 
rnd);
+
+                               synchronized (lockingObject) {
+                                       index += step;
+                                       ctx.collect(result);
+                               }
+                       }
+               }
+               
+               @Override
+               public void cancel() {
+                       isRunning = false;
+               }
+
+               private static String randomString(StringBuilder bld, Random 
rnd) {
+                       final int len = rnd.nextInt(10) + 5;
+
+                       for (int i = 0; i < len; i++) {
+                               char next = (char) (rnd.nextInt(20000) + 33);
+                               bld.append(next);
+                       }
+
+                       return bld.toString();
+               }
+
+               @Override
+               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
+                       return index;
+               }
+
+               @Override
+               public void restoreState(Integer state) {
+                       index = state;
+               }
+       }
+
+       private static class StringRichFilterFunction extends 
RichFilterFunction<String> 
+                       implements Checkpointed<Long> {
+
+               static final long[] counts = new long[PARALLELISM];
+               
+               private long count;
+               
+               @Override
+               public boolean filter(String value) throws Exception {
+                       count++;
+                       return value.length() < 100; // should be always true
+               }
+
+               @Override
+               public void close() {
+                       counts[getRuntimeContext().getIndexOfThisSubtask()] = 
count;
+               }
+
+               @Override
+               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) {
+                       return count;
+               }
+
+               @Override
+               public void restoreState(Long state) {
+                       count = state;
+               }
+       }
+
+       private static class StringPrefixCountRichMapFunction extends 
RichMapFunction<String, PrefixCount> 
+                       implements CheckpointedAsynchronously<Long> {
+               
+               static final long[] counts = new long[PARALLELISM];
+
+               private long count;
+               
+               @Override
+               public PrefixCount map(String value) {
+                       count++;
+                       return new PrefixCount(value.substring(0, 1), value, 
1L);
+               }
+
+               @Override
+               public void close() {
+                       counts[getRuntimeContext().getIndexOfThisSubtask()] = 
count;
+               }
+
+               @Override
+               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) {
+                       return count;
+               }
+
+               @Override
+               public void restoreState(Long state) {
+                       count = state;
+               }
+       }
+       
+       private static class StatefulCounterFunction extends 
RichMapFunction<PrefixCount, PrefixCount> 
+               implements Checkpointed<Long> {
+
+               static final long[] counts = new long[PARALLELISM];
+               
+               private long count;
+
+               @Override
+               public PrefixCount map(PrefixCount value) throws Exception {
+                       count++;
+                       return value;
+               }
+
+               @Override
+               public void close() throws IOException {
+                       counts[getRuntimeContext().getIndexOfThisSubtask()] = 
count;
+               }
+
+               @Override
+               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) {
+                       return count;
+               }
+
+               @Override
+               public void restoreState(Long state) {
+                       count = state;
+               }
+       }
+       
+       private static class OnceFailingAggregator extends 
RichFlatMapFunction<PrefixCount, PrefixCount> 
+               implements Checkpointed<HashMap<String, PrefixCount>>, 
CheckpointNotifier {
+
+               static boolean wasCheckpointedBeforeFailure = false;
+               
+               private static volatile boolean hasFailed = false;
+
+               private final HashMap<String, PrefixCount> aggregationMap = new 
HashMap<String, PrefixCount>();
+               
+               private long failurePos;
+               private long count;
+               
+               private boolean wasCheckpointed;
+               
+
+               OnceFailingAggregator(long failurePos) {
+                       this.failurePos = failurePos;
+               }
+               
+               @Override
+               public void open(Configuration parameters) {
+                       count = 0;
+               }
+
+               @Override
+               public void flatMap(PrefixCount value, Collector<PrefixCount> 
out) throws Exception {
+                       count++;
+                       if (!hasFailed && count >= failurePos && 
getRuntimeContext().getIndexOfThisSubtask() == 1) {
+                               wasCheckpointedBeforeFailure = wasCheckpointed;
+                               hasFailed = true;
+                               throw new Exception("Test Failure");
+                       }
+                       
+                       PrefixCount curr = aggregationMap.get(value.prefix);
+                       if (curr == null) {
+                               aggregationMap.put(value.prefix, value);
+                               out.collect(value);
+                       }
+                       else {
+                               curr.count += value.count;
+                               out.collect(curr);
+                       }
+               }
+
+               @Override
+               public HashMap<String, PrefixCount> snapshotState(long 
checkpointId, long checkpointTimestamp) {
+                       return aggregationMap;
+               }
+
+               @Override
+               public void restoreState(HashMap<String, PrefixCount> state) {
+                       aggregationMap.putAll(state);
+               }
+
+               @Override
+               public void notifyCheckpointComplete(long checkpointId) {
+                       this.wasCheckpointed = true;
+               }
+       }
+
+       private static class ValidatingSink extends 
RichSinkFunction<PrefixCount> 
+                       implements Checkpointed<HashMap<Character, Long>> {
+
+               @SuppressWarnings("unchecked")
+               private static Map<Character, Long>[] maps = (Map<Character, 
Long>[]) new Map<?, ?>[PARALLELISM];
+               
+               private HashMap<Character, Long> counts = new 
HashMap<Character, Long>();
+
+               @Override
+               public void invoke(PrefixCount value) {
+                       Character first = value.prefix.charAt(0);
+                       Long previous = counts.get(first);
+                       if (previous == null) {
+                               counts.put(first, value.count);
+                       } else {
+                               counts.put(first, Math.max(previous, 
value.count));
+                       }
+               }
+
+               @Override
+               public void close() throws Exception {
+                       maps[getRuntimeContext().getIndexOfThisSubtask()] = 
counts;
+               }
+
+               @Override
+               public HashMap<Character, Long> snapshotState(long 
checkpointId, long checkpointTimestamp) {
+                       return counts;
+               }
+
+               @Override
+               public void restoreState(HashMap<Character, Long> state) {
+                       counts.putAll(state);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java
new file mode 100644
index 0000000..ba5ff1c
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderelection.TestingListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
+import org.apache.flink.runtime.testutils.JobManagerProcess;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import scala.Option;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.createTempDirectory;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+/**
+ * Verify behaviour in case of JobManager process failure during job execution.
+ *
+ * <p>The test works with multiple job managers processes by spawning JVMs.
+ *
+ * <p>Initially, it starts two TaskManager (2 slots each) and two JobManager 
JVMs.
+ *
+ * <p>It submits a program with parallelism 4 and waits until all tasks are 
brought up.
+ * Coordination between the test and the tasks happens via checking for the 
existence of
+ * temporary files. It then kills the leading JobManager process. The recovery 
should restart the
+ * tasks on the new JobManager.
+ *
+ * <p>This follows the same structure as {@link 
AbstractTaskManagerProcessFailureRecoveryTest}.
+ */
+public abstract class AbstractJobManagerProcessFailureRecoveryITCase extends 
TestLogger {
+
+       private final static ZooKeeperTestEnvironment ZooKeeper = new 
ZooKeeperTestEnvironment(1);
+
+       private final static FiniteDuration TestTimeOut = new FiniteDuration(5, 
TimeUnit.MINUTES);
+
+       private static final File FileStateBackendBasePath;
+
+       static {
+               try {
+                       FileStateBackendBasePath = 
CommonTestUtils.createTempDirectory();
+               }
+               catch (IOException e) {
+                       throw new RuntimeException("Error in test setup. Could 
not create directory.", e);
+               }
+       }
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               if (ZooKeeper != null) {
+                       ZooKeeper.shutdown();
+               }
+
+               if (FileStateBackendBasePath != null) {
+                       FileUtils.deleteDirectory(FileStateBackendBasePath);
+               }
+       }
+
+       @Before
+       public void cleanUp() throws Exception {
+               ZooKeeper.deleteAll();
+
+               FileUtils.cleanDirectory(FileStateBackendBasePath);
+       }
+
+       protected static final String READY_MARKER_FILE_PREFIX = "ready_";
+       protected static final String FINISH_MARKER_FILE_PREFIX = "finish_";
+       protected static final String PROCEED_MARKER_FILE = "proceed";
+
+       protected static final int PARALLELISM = 4;
+
+       /**
+        * Test program with JobManager failure.
+        *
+        * @param zkQuorum ZooKeeper quorum to connect to
+        * @param coordinateDir Coordination directory
+        * @throws Exception
+        */
+       public abstract void testJobManagerFailure(String zkQuorum, File 
coordinateDir) throws Exception;
+
+       @Test
+       public void testJobManagerProcessFailure() throws Exception {
+               // Config
+               final int numberOfJobManagers = 2;
+               final int numberOfTaskManagers = 2;
+               final int numberOfSlotsPerTaskManager = 2;
+
+               assertEquals(PARALLELISM, numberOfTaskManagers * 
numberOfSlotsPerTaskManager);
+
+               // Setup
+               // Test actor system
+               ActorSystem testActorSystem;
+
+               // Job managers
+               final JobManagerProcess[] jmProcess = new 
JobManagerProcess[numberOfJobManagers];
+
+               // Task managers
+               final ActorSystem[] tmActorSystem = new 
ActorSystem[numberOfTaskManagers];
+
+               // Leader election service
+               LeaderRetrievalService leaderRetrievalService = null;
+
+               // Coordination between the processes goes through a directory
+               File coordinateTempDir = null;
+
+               try {
+                       final Deadline deadline = TestTimeOut.fromNow();
+
+                       // Coordination directory
+                       coordinateTempDir = createTempDirectory();
+
+                       // Job Managers
+                       Configuration config = 
ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+                                       ZooKeeper.getConnectString(), 
FileStateBackendBasePath.getPath());
+
+                       // Start first process
+                       jmProcess[0] = new JobManagerProcess(0, config);
+                       jmProcess[0].createAndStart();
+
+                       // Task manager configuration
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+
+                       // Start the task manager process
+                       for (int i = 0; i < numberOfTaskManagers; i++) {
+                               tmActorSystem[i] = 
AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+                               TaskManager.startTaskManagerComponentsAndActor(
+                                               config, tmActorSystem[i], 
"localhost",
+                                               Option.<String>empty(), 
Option.<LeaderRetrievalService>empty(),
+                                               false, StreamingMode.STREAMING, 
TaskManager.class);
+                       }
+
+                       // Test actor system
+                       testActorSystem = 
AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+
+                       jmProcess[0].getActorRef(testActorSystem, 
deadline.timeLeft());
+
+                       // Leader listener
+                       TestingListener leaderListener = new TestingListener();
+                       leaderRetrievalService = 
ZooKeeperUtils.createLeaderRetrievalService(config);
+                       leaderRetrievalService.start(leaderListener);
+
+                       // Initial submission
+                       
leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
+
+                       String leaderAddress = leaderListener.getAddress();
+                       UUID leaderId = leaderListener.getLeaderSessionID();
+
+                       // Get the leader ref
+                       ActorRef leaderRef = 
AkkaUtils.getActorRef(leaderAddress, testActorSystem, deadline.timeLeft());
+                       ActorGateway leaderGateway = new 
AkkaActorGateway(leaderRef, leaderId);
+
+                       // Wait for all task managers to connect to the leading 
job manager
+                       
JobManagerActorTestUtils.waitForTaskManagers(numberOfTaskManagers, 
leaderGateway,
+                                       deadline.timeLeft());
+
+                       final File coordinateDirClosure = coordinateTempDir;
+                       final Throwable[] errorRef = new Throwable[1];
+
+                       // we trigger program execution in a separate thread
+                       Thread programTrigger = new Thread("Program Trigger") {
+                               @Override
+                               public void run() {
+                                       try {
+                                               
testJobManagerFailure(ZooKeeper.getConnectString(), coordinateDirClosure);
+                                       }
+                                       catch (Throwable t) {
+                                               t.printStackTrace();
+                                               errorRef[0] = t;
+                                       }
+                               }
+                       };
+
+                       //start the test program
+                       programTrigger.start();
+
+                       // wait until all marker files are in place, indicating 
that all tasks have started
+                       
AbstractTaskManagerProcessFailureRecoveryTest.waitForMarkerFiles(coordinateTempDir,
+                                       READY_MARKER_FILE_PREFIX, PARALLELISM, 
deadline.timeLeft().toMillis());
+
+                       // Kill one of the job managers and trigger recovery
+                       jmProcess[0].destroy();
+
+                       jmProcess[1] = new JobManagerProcess(1, config);
+                       jmProcess[1].createAndStart();
+
+                       jmProcess[1].getActorRef(testActorSystem, 
deadline.timeLeft());
+
+                       // we create the marker file which signals the program 
functions tasks that they can complete
+                       
AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new 
File(coordinateTempDir, PROCEED_MARKER_FILE));
+
+                       programTrigger.join(deadline.timeLeft().toMillis());
+
+                       // We wait for the finish marker file. We don't wait 
for the program trigger, because
+                       // we submit in detached mode.
+                       
AbstractTaskManagerProcessFailureRecoveryTest.waitForMarkerFiles(coordinateTempDir,
+                                       FINISH_MARKER_FILE_PREFIX, 1, 
deadline.timeLeft().toMillis());
+
+                       // check that the program really finished
+                       assertFalse("The program did not finish in time", 
programTrigger.isAlive());
+
+                       // check whether the program encountered an error
+                       if (errorRef[0] != null) {
+                               Throwable error = errorRef[0];
+                               error.printStackTrace();
+                               fail("The program encountered a " + 
error.getClass().getSimpleName() + " : " + error.getMessage());
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+
+                       for (JobManagerProcess p : jmProcess) {
+                               if (p != null) {
+                                       p.printProcessLog();
+                               }
+                       }
+
+                       fail(e.getMessage());
+               }
+               finally {
+                       for (int i = 0; i < numberOfTaskManagers; i++) {
+                               if (tmActorSystem[i] != null) {
+                                       tmActorSystem[i].shutdown();
+                               }
+                       }
+
+                       if (leaderRetrievalService != null) {
+                               leaderRetrievalService.stop();
+                       }
+
+                       for (JobManagerProcess jmProces : jmProcess) {
+                               if (jmProces != null) {
+                                       jmProces.destroy();
+                               }
+                       }
+
+                       // Delete coordination directory
+                       if (coordinateTempDir != null) {
+                               try {
+                                       
FileUtils.deleteDirectory(coordinateTempDir);
+                               }
+                               catch (Throwable ignored) {
+                               }
+                       }
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
deleted file mode 100644
index 7e16baf..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
+++ /dev/null
@@ -1,444 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recovery;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
-import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
-
-/**
- * Abstract base for tests verifying the behavior of the recovery in the
- * case when a TaskManager fails (process is killed) in the middle of a job 
execution.
- *
- * The test works with multiple task managers processes by spawning JVMs.
- * Initially, it starts a JobManager in process and two TaskManagers JVMs with
- * 2 task slots each.
- * It submits a program with parallelism 4 and waits until all tasks are 
brought up.
- * Coordination between the test and the tasks happens via checking for the
- * existence of temporary files. It then starts another TaskManager, which is
- * guaranteed to remain empty (all tasks are already deployed) and kills one of
- * the original task managers. The recovery should restart the tasks on the 
new TaskManager.
- */
-public abstract class AbstractProcessFailureRecoveryTest extends TestLogger {
-
-       protected static final String READY_MARKER_FILE_PREFIX = "ready_";
-       protected static final String PROCEED_MARKER_FILE = "proceed";
-
-       protected static final int PARALLELISM = 4;
-
-       @Test
-       public void testTaskManagerProcessFailure() {
-
-               final StringWriter processOutput1 = new StringWriter();
-               final StringWriter processOutput2 = new StringWriter();
-               final StringWriter processOutput3 = new StringWriter();
-
-               ActorSystem jmActorSystem = null;
-               Process taskManagerProcess1 = null;
-               Process taskManagerProcess2 = null;
-               Process taskManagerProcess3 = null;
-
-               File coordinateTempDir = null;
-
-               try {
-                       // check that we run this test only if the java command
-                       // is available on this machine
-                       String javaCommand = getJavaCommandPath();
-                       if (javaCommand == null) {
-                               System.out.println("---- Skipping Process 
Failure test : Could not find java executable ----");
-                               return;
-                       }
-
-                       // create a logging file for the process
-                       File tempLogFile = 
File.createTempFile(getClass().getSimpleName() + "-", "-log4j.properties");
-                       tempLogFile.deleteOnExit();
-                       CommonTestUtils.printLog4jDebugConfig(tempLogFile);
-
-                       // coordination between the processes goes through a 
directory
-                       coordinateTempDir = createTempDirectory();
-
-                       // find a free port to start the JobManager
-                       final int jobManagerPort = NetUtils.getAvailablePort();
-
-                       // start a JobManager
-                       Tuple2<String, Object> localAddress = new 
Tuple2<String, Object>("localhost", jobManagerPort);
-
-                       Configuration jmConfig = new Configuration();
-                       
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");
-                       
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s");
-                       
jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
-                       
jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "10 s");
-
-                       jmActorSystem = AkkaUtils.createActorSystem(jmConfig, 
new Some<Tuple2<String, Object>>(localAddress));
-                       ActorRef jmActor = JobManager.startJobManagerActors(
-                               jmConfig,
-                               jmActorSystem,
-                               StreamingMode.STREAMING,
-                               JobManager.class,
-                               MemoryArchivist.class)._1();
-
-                       // the TaskManager java command
-                       String[] command = new String[] {
-                                       javaCommand,
-                                       "-Dlog.level=DEBUG",
-                                       "-Dlog4j.configuration=file:" + 
tempLogFile.getAbsolutePath(),
-                                       "-Xms80m", "-Xmx80m",
-                                       "-classpath", getCurrentClasspath(),
-                                       
TaskManagerProcessEntryPoint.class.getName(),
-                                       String.valueOf(jobManagerPort)
-                       };
-
-                       // start the first two TaskManager processes
-                       taskManagerProcess1 = new 
ProcessBuilder(command).start();
-                       new PipeForwarder(taskManagerProcess1.getErrorStream(), 
processOutput1);
-                       taskManagerProcess2 = new 
ProcessBuilder(command).start();
-                       new PipeForwarder(taskManagerProcess2.getErrorStream(), 
processOutput2);
-
-                       // we wait for the JobManager to have the two 
TaskManagers available
-                       // since some of the CI environments are very hostile, 
we need to give this a lot of time (2 minutes)
-                       waitUntilNumTaskManagersAreRegistered(jmActor, 2, 
120000);
-
-                       // the program will set a marker file in each of its 
parallel tasks once they are ready, so that
-                       // this coordinating code is aware of this.
-                       // the program will very slowly consume elements until 
the marker file (later created by the
-                       // test driver code) is present
-                       final File coordinateDirClosure = coordinateTempDir;
-                       final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
-
-                       // we trigger program execution in a separate thread
-                       Thread programTrigger = new Thread("Program Trigger") {
-                               @Override
-                               public void run() {
-                                       try {
-                                               testProgram(jobManagerPort, 
coordinateDirClosure);
-                                       }
-                                       catch (Throwable t) {
-                                               t.printStackTrace();
-                                               errorRef.set(t);
-                                       }
-                               }
-                       };
-
-                       //start the test program
-                       programTrigger.start();
-
-                       // wait until all marker files are in place, indicating 
that all tasks have started
-                       // max 20 seconds
-                       if (!waitForMarkerFiles(coordinateTempDir, PARALLELISM, 
120000)) {
-                               // check if the program failed for some reason
-                               if (errorRef.get() != null) {
-                                       Throwable error = errorRef.get();
-                                       error.printStackTrace();
-                                       fail("The program encountered a " + 
error.getClass().getSimpleName() + " : " + error.getMessage());
-                               }
-                               else {
-                                       // no error occurred, simply a timeout
-                                       fail("The tasks were not started within 
time (" + 120000 + "msecs)");
-                               }
-                       }
-
-                       // start the third TaskManager
-                       taskManagerProcess3 = new 
ProcessBuilder(command).start();
-                       new PipeForwarder(taskManagerProcess3.getErrorStream(), 
processOutput3);
-
-                       // we wait for the third TaskManager to register
-                       // since some of the CI environments are very hostile, 
we need to give this a lot of time (2 minutes)
-                       waitUntilNumTaskManagersAreRegistered(jmActor, 3, 
120000);
-
-                       // kill one of the previous TaskManagers, triggering a 
failure and recovery
-                       taskManagerProcess1.destroy();
-                       taskManagerProcess1 = null;
-
-                       // we create the marker file which signals the program 
functions tasks that they can complete
-                       touchFile(new File(coordinateTempDir, 
PROCEED_MARKER_FILE));
-
-                       // wait for at most 5 minutes for the program to 
complete
-                       programTrigger.join(300000);
-
-                       // check that the program really finished
-                       assertFalse("The program did not finish in time", 
programTrigger.isAlive());
-
-                       // check whether the program encountered an error
-                       if (errorRef.get() != null) {
-                               Throwable error = errorRef.get();
-                               error.printStackTrace();
-                               fail("The program encountered a " + 
error.getClass().getSimpleName() + " : " + error.getMessage());
-                       }
-
-                       // all seems well :-)
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       printProcessLog("TaskManager 1", 
processOutput1.toString());
-                       printProcessLog("TaskManager 2", 
processOutput2.toString());
-                       printProcessLog("TaskManager 3", 
processOutput3.toString());
-                       fail(e.getMessage());
-               }
-               catch (Error e) {
-                       e.printStackTrace();
-                       printProcessLog("TaskManager 1", 
processOutput1.toString());
-                       printProcessLog("TaskManager 2", 
processOutput2.toString());
-                       printProcessLog("TaskManager 3", 
processOutput3.toString());
-                       throw e;
-               }
-               finally {
-                       if (taskManagerProcess1 != null) {
-                               taskManagerProcess1.destroy();
-                       }
-                       if (taskManagerProcess2 != null) {
-                               taskManagerProcess2.destroy();
-                       }
-                       if (taskManagerProcess3 != null) {
-                               taskManagerProcess3.destroy();
-                       }
-                       if (jmActorSystem != null) {
-                               jmActorSystem.shutdown();
-                       }
-                       if (coordinateTempDir != null) {
-                               try {
-                                       
FileUtils.deleteDirectory(coordinateTempDir);
-                               }
-                               catch (Throwable t) {
-                                       // we can ignore this
-                               }
-                       }
-               }
-       }
-
-       /**
-        * The test program should be implemented here in a form of a separate 
thread.
-        * This provides a solution for checking that it has been terminated.
-        *
-        * @param jobManagerPort The port for submitting the topology to the 
local cluster
-        * @param coordinateDir TaskManager failure will be triggered only 
after processes
-        *                             have successfully created file under 
this directory
-        */
-       public abstract void testProgram(int jobManagerPort, File 
coordinateDir) throws Exception;
-
-
-       protected void waitUntilNumTaskManagersAreRegistered(ActorRef 
jobManager, int numExpected, long maxDelay)
-                       throws Exception
-       {
-               final long deadline = System.currentTimeMillis() + maxDelay;
-               while (true) {
-                       long remaining = deadline - System.currentTimeMillis();
-                       if (remaining <= 0) {
-                               fail("The TaskManagers did not register within 
the expected time (" + maxDelay + "msecs)");
-                       }
-
-                       FiniteDuration timeout = new FiniteDuration(remaining, 
TimeUnit.MILLISECONDS);
-
-                       try {
-                               Future<?> result = Patterns.ask(jobManager,
-                                               
JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-                                               new Timeout(timeout));
-                               Integer numTMs = (Integer) Await.result(result, 
timeout);
-                               if (numTMs == numExpected) {
-                                       break;
-                               }
-                       }
-                       catch (TimeoutException e) {
-                               // ignore and retry
-                       }
-                       catch (ClassCastException e) {
-                               fail("Wrong response: " + e.getMessage());
-                       }
-               }
-       }
-
-       protected static void printProcessLog(String processName, String log) {
-               if (log == null || log.length() == 0) {
-                       return;
-               }
-
-               System.out.println("-----------------------------------------");
-               System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + 
processName);
-               System.out.println("-----------------------------------------");
-               System.out.println(log);
-               System.out.println("-----------------------------------------");
-               System.out.println("            END SPAWNED PROCESS LOG");
-               System.out.println("-----------------------------------------");
-       }
-
-       protected static File createTempDirectory() throws IOException {
-               File tempDir = new File(System.getProperty("java.io.tmpdir"));
-
-               for (int i = 0; i < 10; i++) {
-                       File dir = new File(tempDir, 
UUID.randomUUID().toString());
-                       if (!dir.exists() && dir.mkdirs()) {
-                               return dir;
-                       }
-                       System.err.println("Could not use temporary directory " 
+ dir.getAbsolutePath());
-               }
-
-               throw new IOException("Could not create temporary file 
directory");
-       }
-
-       protected static void touchFile(File file) throws IOException {
-               if (!file.exists()) {
-                       new FileOutputStream(file).close();
-               }
-               if (!file.setLastModified(System.currentTimeMillis())) {
-                       throw new IOException("Could not touch the file.");
-               }
-       }
-
-       protected static boolean waitForMarkerFiles(File basedir, int num, long 
timeout) {
-               long now = System.currentTimeMillis();
-               final long deadline = now + timeout;
-
-
-               while (now < deadline) {
-                       boolean allFound = true;
-
-                       for (int i = 0; i < num; i++) {
-                               File nextToCheck = new File(basedir, 
READY_MARKER_FILE_PREFIX + i);
-                               if (!nextToCheck.exists()) {
-                                       allFound = false;
-                                       break;
-                               }
-                       }
-
-                       if (allFound) {
-                               return true;
-                       }
-                       else {
-                               // not all found, wait for a bit
-                               try {
-                                       Thread.sleep(10);
-                               }
-                               catch (InterruptedException e) {
-                                       throw new RuntimeException(e);
-                               }
-
-                               now = System.currentTimeMillis();
-                       }
-               }
-
-               return false;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * The entry point for the TaskManager JVM. Simply configures and runs 
a TaskManager.
-        */
-       public static class TaskManagerProcessEntryPoint {
-
-               private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
-
-               public static void main(String[] args) {
-                       try {
-                               int jobManagerPort = Integer.parseInt(args[0]);
-
-                               Configuration cfg = new Configuration();
-                               
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-                               
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
-                               
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
-                               
cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
-                               
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
-
-                               
TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, 
StreamingMode.STREAMING, TaskManager.class);
-
-                               // wait forever
-                               Object lock = new Object();
-                               synchronized (lock) {
-                                       lock.wait();
-                               }
-                       }
-                       catch (Throwable t) {
-                               LOG.error("Failed to start TaskManager 
process", t);
-                               System.exit(1);
-                       }
-               }
-       }
-
-       /**
-        * Utility class to read the output of a process stream and forward it 
into a StringWriter.
-        */
-       protected static class PipeForwarder extends Thread {
-
-               private final StringWriter target;
-               private final InputStream source;
-
-               public PipeForwarder(InputStream source, StringWriter target) {
-                       super("Pipe Forwarder");
-                       setDaemon(true);
-
-                       this.source = source;
-                       this.target = target;
-
-                       start();
-               }
-
-               @Override
-               public void run() {
-                       try {
-                               int next;
-                               while ((next = source.read()) != -1) {
-                                       target.write(next);
-                               }
-                       }
-                       catch (IOException e) {
-                               // terminate
-                       }
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
new file mode 100644
index 0000000..c02fa6c
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+/**
+ * Abstract base for tests verifying the behavior of the recovery in the
+ * case when a TaskManager fails (process is killed) in the middle of a job 
execution.
+ *
+ * The test works with multiple task managers processes by spawning JVMs.
+ * Initially, it starts a JobManager in process and two TaskManagers JVMs with
+ * 2 task slots each.
+ * It submits a program with parallelism 4 and waits until all tasks are 
brought up.
+ * Coordination between the test and the tasks happens via checking for the
+ * existence of temporary files. It then starts another TaskManager, which is
+ * guaranteed to remain empty (all tasks are already deployed) and kills one of
+ * the original task managers. The recovery should restart the tasks on the 
new TaskManager.
+ */
+public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends 
TestLogger {
+
+       protected static final String READY_MARKER_FILE_PREFIX = "ready_";
+       protected static final String PROCEED_MARKER_FILE = "proceed";
+       protected static final String FINISH_MARKER_FILE_PREFIX = "finish_";
+
+       protected static final int PARALLELISM = 4;
+
+       @Test
+       public void testTaskManagerProcessFailure() {
+
+               final StringWriter processOutput1 = new StringWriter();
+               final StringWriter processOutput2 = new StringWriter();
+               final StringWriter processOutput3 = new StringWriter();
+
+               ActorSystem jmActorSystem = null;
+               Process taskManagerProcess1 = null;
+               Process taskManagerProcess2 = null;
+               Process taskManagerProcess3 = null;
+
+               File coordinateTempDir = null;
+
+               try {
+                       // check that we run this test only if the java command
+                       // is available on this machine
+                       String javaCommand = getJavaCommandPath();
+                       if (javaCommand == null) {
+                               System.out.println("---- Skipping Process 
Failure test : Could not find java executable ----");
+                               return;
+                       }
+
+                       // create a logging file for the process
+                       File tempLogFile = 
File.createTempFile(getClass().getSimpleName() + "-", "-log4j.properties");
+                       tempLogFile.deleteOnExit();
+                       CommonTestUtils.printLog4jDebugConfig(tempLogFile);
+
+                       // coordination between the processes goes through a 
directory
+                       coordinateTempDir = 
CommonTestUtils.createTempDirectory();
+
+                       // find a free port to start the JobManager
+                       final int jobManagerPort = NetUtils.getAvailablePort();
+
+                       // start a JobManager
+                       Tuple2<String, Object> localAddress = new 
Tuple2<String, Object>("localhost", jobManagerPort);
+
+                       Configuration jmConfig = new Configuration();
+                       
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");
+                       
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s");
+                       
jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
+                       
jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "10 s");
+
+                       jmActorSystem = AkkaUtils.createActorSystem(jmConfig, 
new Some<Tuple2<String, Object>>(localAddress));
+                       ActorRef jmActor = JobManager.startJobManagerActors(
+                               jmConfig,
+                               jmActorSystem,
+                               StreamingMode.STREAMING,
+                               JobManager.class,
+                               MemoryArchivist.class)._1();
+
+                       // the TaskManager java command
+                       String[] command = new String[] {
+                                       javaCommand,
+                                       "-Dlog.level=DEBUG",
+                                       "-Dlog4j.configuration=file:" + 
tempLogFile.getAbsolutePath(),
+                                       "-Xms80m", "-Xmx80m",
+                                       "-classpath", getCurrentClasspath(),
+                                       
TaskManagerProcessEntryPoint.class.getName(),
+                                       String.valueOf(jobManagerPort)
+                       };
+
+                       // start the first two TaskManager processes
+                       taskManagerProcess1 = new 
ProcessBuilder(command).start();
+                       new 
CommonTestUtils.PipeForwarder(taskManagerProcess1.getErrorStream(), 
processOutput1);
+                       taskManagerProcess2 = new 
ProcessBuilder(command).start();
+                       new 
CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(), 
processOutput2);
+
+                       // we wait for the JobManager to have the two 
TaskManagers available
+                       // since some of the CI environments are very hostile, 
we need to give this a lot of time (2 minutes)
+                       waitUntilNumTaskManagersAreRegistered(jmActor, 2, 
120000);
+
+                       // the program will set a marker file in each of its 
parallel tasks once they are ready, so that
+                       // this coordinating code is aware of this.
+                       // the program will very slowly consume elements until 
the marker file (later created by the
+                       // test driver code) is present
+                       final File coordinateDirClosure = coordinateTempDir;
+                       final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
+
+                       // we trigger program execution in a separate thread
+                       Thread programTrigger = new Thread("Program Trigger") {
+                               @Override
+                               public void run() {
+                                       try {
+                                               
testTaskManagerFailure(jobManagerPort, coordinateDirClosure);
+                                       }
+                                       catch (Throwable t) {
+                                               t.printStackTrace();
+                                               errorRef.set(t);
+                                       }
+                               }
+                       };
+
+                       //start the test program
+                       programTrigger.start();
+
+                       // wait until all marker files are in place, indicating 
that all tasks have started
+                       // max 20 seconds
+                       if (!waitForMarkerFiles(coordinateTempDir, 
READY_MARKER_FILE_PREFIX, PARALLELISM, 120000)) {
+                               // check if the program failed for some reason
+                               if (errorRef.get() != null) {
+                                       Throwable error = errorRef.get();
+                                       error.printStackTrace();
+                                       fail("The program encountered a " + 
error.getClass().getSimpleName() + " : " + error.getMessage());
+                               }
+                               else {
+                                       // no error occurred, simply a timeout
+                                       fail("The tasks were not started within 
time (" + 120000 + "msecs)");
+                               }
+                       }
+
+                       // start the third TaskManager
+                       taskManagerProcess3 = new 
ProcessBuilder(command).start();
+                       new 
CommonTestUtils.PipeForwarder(taskManagerProcess3.getErrorStream(), 
processOutput3);
+
+                       // we wait for the third TaskManager to register
+                       // since some of the CI environments are very hostile, 
we need to give this a lot of time (2 minutes)
+                       waitUntilNumTaskManagersAreRegistered(jmActor, 3, 
120000);
+
+                       // kill one of the previous TaskManagers, triggering a 
failure and recovery
+                       taskManagerProcess1.destroy();
+                       taskManagerProcess1 = null;
+
+                       // we create the marker file which signals the program 
functions tasks that they can complete
+                       touchFile(new File(coordinateTempDir, 
PROCEED_MARKER_FILE));
+
+                       // wait for at most 5 minutes for the program to 
complete
+                       programTrigger.join(300000);
+
+                       // check that the program really finished
+                       assertFalse("The program did not finish in time", 
programTrigger.isAlive());
+
+                       // check whether the program encountered an error
+                       if (errorRef.get() != null) {
+                               Throwable error = errorRef.get();
+                               error.printStackTrace();
+                               fail("The program encountered a " + 
error.getClass().getSimpleName() + " : " + error.getMessage());
+                       }
+
+                       // all seems well :-)
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       printProcessLog("TaskManager 1", 
processOutput1.toString());
+                       printProcessLog("TaskManager 2", 
processOutput2.toString());
+                       printProcessLog("TaskManager 3", 
processOutput3.toString());
+                       fail(e.getMessage());
+               }
+               catch (Error e) {
+                       e.printStackTrace();
+                       printProcessLog("TaskManager 1", 
processOutput1.toString());
+                       printProcessLog("TaskManager 2", 
processOutput2.toString());
+                       printProcessLog("TaskManager 3", 
processOutput3.toString());
+                       throw e;
+               }
+               finally {
+                       if (taskManagerProcess1 != null) {
+                               taskManagerProcess1.destroy();
+                       }
+                       if (taskManagerProcess2 != null) {
+                               taskManagerProcess2.destroy();
+                       }
+                       if (taskManagerProcess3 != null) {
+                               taskManagerProcess3.destroy();
+                       }
+                       if (jmActorSystem != null) {
+                               jmActorSystem.shutdown();
+                       }
+                       if (coordinateTempDir != null) {
+                               try {
+                                       
FileUtils.deleteDirectory(coordinateTempDir);
+                               }
+                               catch (Throwable t) {
+                                       // we can ignore this
+                               }
+                       }
+               }
+       }
+
+       /**
+        * The test program should be implemented here in a form of a separate 
thread.
+        * This provides a solution for checking that it has been terminated.
+        *
+        * @param jobManagerPort The port for submitting the topology to the 
local cluster
+        * @param coordinateDir TaskManager failure will be triggered only 
after processes
+        *                             have successfully created file under 
this directory
+        */
+       public abstract void testTaskManagerFailure(int jobManagerPort, File 
coordinateDir) throws Exception;
+
+
+       protected void waitUntilNumTaskManagersAreRegistered(ActorRef 
jobManager, int numExpected, long maxDelay)
+                       throws Exception
+       {
+               final long deadline = System.currentTimeMillis() + maxDelay;
+               while (true) {
+                       long remaining = deadline - System.currentTimeMillis();
+                       if (remaining <= 0) {
+                               fail("The TaskManagers did not register within 
the expected time (" + maxDelay + "msecs)");
+                       }
+
+                       FiniteDuration timeout = new FiniteDuration(remaining, 
TimeUnit.MILLISECONDS);
+
+                       try {
+                               Future<?> result = Patterns.ask(jobManager,
+                                               
JobManagerMessages.getRequestNumberRegisteredTaskManager(),
+                                               new Timeout(timeout));
+                               Integer numTMs = (Integer) Await.result(result, 
timeout);
+                               if (numTMs == numExpected) {
+                                       break;
+                               }
+                       }
+                       catch (TimeoutException e) {
+                               // ignore and retry
+                       }
+                       catch (ClassCastException e) {
+                               fail("Wrong response: " + e.getMessage());
+                       }
+               }
+       }
+
+       protected static void printProcessLog(String processName, String log) {
+               if (log == null || log.length() == 0) {
+                       return;
+               }
+
+               System.out.println("-----------------------------------------");
+               System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + 
processName);
+               System.out.println("-----------------------------------------");
+               System.out.println(log);
+               System.out.println("-----------------------------------------");
+               System.out.println("            END SPAWNED PROCESS LOG");
+               System.out.println("-----------------------------------------");
+       }
+
+       protected static void touchFile(File file) throws IOException {
+               if (!file.exists()) {
+                       new FileOutputStream(file).close();
+               }
+               if (!file.setLastModified(System.currentTimeMillis())) {
+                       throw new IOException("Could not touch the file.");
+               }
+       }
+
+       protected static boolean waitForMarkerFiles(File basedir, String 
prefix, int num, long timeout) {
+               long now = System.currentTimeMillis();
+               final long deadline = now + timeout;
+
+
+               while (now < deadline) {
+                       boolean allFound = true;
+
+                       for (int i = 0; i < num; i++) {
+                               File nextToCheck = new File(basedir, prefix + 
i);
+                               if (!nextToCheck.exists()) {
+                                       allFound = false;
+                                       break;
+                               }
+                       }
+
+                       if (allFound) {
+                               return true;
+                       }
+                       else {
+                               // not all found, wait for a bit
+                               try {
+                                       Thread.sleep(10);
+                               }
+                               catch (InterruptedException e) {
+                                       throw new RuntimeException(e);
+                               }
+
+                               now = System.currentTimeMillis();
+                       }
+               }
+
+               return false;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * The entry point for the TaskManager JVM. Simply configures and runs 
a TaskManager.
+        */
+       public static class TaskManagerProcessEntryPoint {
+
+               private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
+
+               public static void main(String[] args) {
+                       try {
+                               int jobManagerPort = Integer.parseInt(args[0]);
+
+                               Configuration cfg = new Configuration();
+                               
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+                               
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+                               
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+                               
cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+                               
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+
+                               
TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg, 
StreamingMode.STREAMING, TaskManager.class);
+
+                               // wait forever
+                               Object lock = new Object();
+                               synchronized (lock) {
+                                       lock.wait();
+                               }
+                       }
+                       catch (Throwable t) {
+                               LOG.error("Failed to start TaskManager 
process", t);
+                               System.exit(1);
+                       }
+               }
+       }
+
+}

Reply via email to