Repository: flink
Updated Branches:
  refs/heads/master 5671c77c3 -> a6890b284


http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
new file mode 100644
index 0000000..a0c8312
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.leaderelection.TestingListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
+import org.apache.flink.runtime.testutils.JobManagerProcess;
+import org.apache.flink.runtime.testutils.TaskManagerProcess;
+import org.apache.flink.runtime.testutils.TestJvmProcess;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.junit.AfterClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@Ignore
+public class ChaosMonkeyITCase {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ChaosMonkeyITCase.class);
+
+       private final static ZooKeeperTestEnvironment ZooKeeper = new 
ZooKeeperTestEnvironment(1);
+
+       private final static File FileStateBackendBasePath;
+
+       private final static File CheckpointCompletedCoordination;
+
+       private final static File ProceedCoordination;
+
+       private final static String COMPLETED_PREFIX = "completed_";
+
+       private final static long LastElement = -1;
+
+       private final Random rand = new Random();
+
+       private int jobManagerPid;
+       private int taskManagerPid;
+
+       static {
+               try {
+                       FileStateBackendBasePath = 
CommonTestUtils.createTempDirectory();
+                       CheckpointCompletedCoordination = new 
File(FileStateBackendBasePath, COMPLETED_PREFIX);
+                       ProceedCoordination = new 
File(FileStateBackendBasePath, "proceed");
+               }
+               catch (IOException e) {
+                       throw new RuntimeException("Error in test setup. Could 
not create directory.", e);
+               }
+       }
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               if (ZooKeeper != null) {
+                       ZooKeeper.shutdown();
+               }
+
+               if (FileStateBackendBasePath != null) {
+                       FileUtils.deleteDirectory(FileStateBackendBasePath);
+               }
+       }
+
+       @Test
+       public void testChaosMonkey() throws Exception {
+               // Test config
+               final int numberOfJobManagers = 3;
+               final int numberOfTaskManagers = 3;
+               final int numberOfSlotsPerTaskManager = 2;
+
+               // The final count each source is counting to: 1...n
+               final int n = 5000;
+
+               // Parallelism for the program
+               final int parallelism = numberOfTaskManagers * 
numberOfSlotsPerTaskManager;
+
+               // The test should not run longer than this
+               final FiniteDuration testDuration = new FiniteDuration(10, 
TimeUnit.MINUTES);
+
+               // Every x seconds a random job or task manager is killed
+               //
+               // The job will will be running for $killEvery seconds and then 
a random Job/TaskManager
+               // will be killed. On recovery (which takes some time to bring 
up the new process etc.),
+               // this test will wait for task managers to reconnect before 
starting the next count down.
+               // Therefore the delay between retries is not important in this 
setup.
+               final FiniteDuration killEvery = new FiniteDuration(30, 
TimeUnit.SECONDS);
+
+               // Trigger a checkpoint every
+               final int checkpointingIntervalMs = 2000;
+
+               // Total number of kills
+               final int totalNumberOfKills = 5;
+
+               // 
-----------------------------------------------------------------------------------------
+
+               // Setup
+               Configuration config = 
ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+                               ZooKeeper.getConnectString(), 
FileStateBackendBasePath.getPath());
+
+               // Akka and restart timeouts
+               config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 
"1000 ms");
+               config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 
s");
+               config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
+
+               if (checkpointingIntervalMs >= killEvery.toMillis()) {
+                       throw new IllegalArgumentException("Relax! You want to 
kill processes every " +
+                                       killEvery + ", but the checkpointing 
interval is " +
+                                       checkpointingIntervalMs / 1000 + " 
seconds. Either decrease the interval or " +
+                                       "increase the kill interval. Otherwise, 
the program will not complete any " +
+                                       "checkpoint.");
+               }
+
+               // Task manager
+               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numberOfSlotsPerTaskManager);
+
+               ActorSystem testActorSystem = null;
+               LeaderRetrievalService leaderRetrievalService = null;
+               List<JobManagerProcess> jobManagerProcesses = new ArrayList<>();
+               List<TaskManagerProcess> taskManagerProcesses = new 
ArrayList<>();
+
+               try {
+                       // Initial state
+                       for (int i = 0; i < numberOfJobManagers; i++) {
+                               
jobManagerProcesses.add(createAndStartJobManagerProcess(config));
+                       }
+
+                       for (int i = 0; i < numberOfTaskManagers; i++) {
+                               
taskManagerProcesses.add(createAndStartTaskManagerProcess(config));
+                       }
+
+                       testActorSystem = AkkaUtils.createDefaultActorSystem();
+
+                       // Leader listener
+                       leaderRetrievalService = 
ZooKeeperUtils.createLeaderRetrievalService(config);
+                       TestingListener leaderListener = new TestingListener();
+                       leaderRetrievalService.start(leaderListener);
+
+                       Deadline deadline = testDuration.fromNow();
+
+                       // Wait for the new leader
+                       int leaderIndex = waitForNewLeader(
+                                       leaderListener, jobManagerProcesses, 
deadline.timeLeft());
+
+                       // Wait for the task managers to connect
+                       waitForTaskManagers(
+                                       numberOfTaskManagers,
+                                       jobManagerProcesses.get(leaderIndex),
+                                       testActorSystem,
+                                       deadline.timeLeft());
+
+                       // The job
+                       JobGraph jobGraph = createJobGraph(n, 
CheckpointCompletedCoordination.getPath(),
+                                       ProceedCoordination.getPath(), 
parallelism, checkpointingIntervalMs);
+
+                       LOG.info("Submitting job {}", jobGraph.getJobID());
+                       submitJobGraph(jobGraph, 
jobManagerProcesses.get(leaderIndex), leaderListener,
+                                       testActorSystem, deadline.timeLeft());
+
+                       LOG.info("Waiting for a checkpoint to complete before 
kicking off chaos");
+
+                       // Wait for a checkpoint to complete
+                       
TestJvmProcess.waitForMarkerFiles(FileStateBackendBasePath, COMPLETED_PREFIX,
+                                       parallelism, 
deadline.timeLeft().toMillis());
+
+                       LOG.info("Checkpoint completed... ready for chaos");
+
+                       int currentKillNumber = 1;
+                       int currentJobManagerKills = 0;
+                       int currentTaskManagerKills = 0;
+
+                       for (int i = 0; i < totalNumberOfKills; i++) {
+                               LOG.info("Waiting for {} before next kill 
({}/{})", killEvery, currentKillNumber++, totalNumberOfKills);
+                               Thread.sleep(killEvery.toMillis());
+
+                               LOG.info("Checking job status...");
+
+                               JobStatus jobStatus = 
requestJobStatus(jobGraph.getJobID(),
+                                               
jobManagerProcesses.get(leaderIndex), testActorSystem, deadline.timeLeft());
+
+                               if (jobStatus != JobStatus.RUNNING && jobStatus 
!= JobStatus.FINISHED) {
+                                       // Wait for it to run
+                                       LOG.info("Waiting for job status {}", 
JobStatus.RUNNING);
+                                       waitForJobRunning(jobGraph.getJobID(), 
jobManagerProcesses.get(leaderIndex),
+                                                       testActorSystem, 
deadline.timeLeft());
+                               }
+                               else if (jobStatus == JobStatus.FINISHED) {
+                                       // Early finish
+                                       LOG.info("Job finished");
+                                       return;
+                               }
+                               else {
+                                       LOG.info("Job status is {}", jobStatus);
+                               }
+
+                               if (rand.nextBoolean()) {
+                                       LOG.info("Killing the leading 
JobManager");
+
+                                       JobManagerProcess newJobManager = 
createAndStartJobManagerProcess(config);
+
+                                       JobManagerProcess leader = 
jobManagerProcesses.remove(leaderIndex);
+                                       leader.destroy();
+                                       currentJobManagerKills++;
+
+                                       LOG.info("Killed {}", leader);
+
+                                       // Make sure to add the new job manager 
before looking for a new leader
+                                       jobManagerProcesses.add(newJobManager);
+
+                                       // Wait for the new leader
+                                       leaderIndex = waitForNewLeader(
+                                                       leaderListener, 
jobManagerProcesses, deadline.timeLeft());
+
+                                       // Wait for the task managers to connect
+                                       waitForTaskManagers(
+                                                       numberOfTaskManagers,
+                                                       
jobManagerProcesses.get(leaderIndex),
+                                                       testActorSystem,
+                                                       deadline.timeLeft());
+                               }
+                               else {
+                                       LOG.info("Killing a random 
TaskManager");
+                                       TaskManagerProcess newTaskManager = 
createAndStartTaskManagerProcess(config);
+
+                                       // Wait for this new task manager to be 
connected
+                                       waitForTaskManagers(
+                                                       numberOfTaskManagers + 
1,
+                                                       
jobManagerProcesses.get(leaderIndex),
+                                                       testActorSystem,
+                                                       deadline.timeLeft());
+
+                                       // Now it's safe to kill a process
+                                       int next = 
rand.nextInt(numberOfTaskManagers);
+                                       TaskManagerProcess taskManager = 
taskManagerProcesses.remove(next);
+
+                                       LOG.info("{} has been chosen. Killing 
process...", taskManager);
+
+                                       taskManager.destroy();
+                                       currentTaskManagerKills++;
+
+                                       // Add the new task manager after 
killing an old one
+                                       
taskManagerProcesses.add(newTaskManager);
+                               }
+                       }
+
+                       LOG.info("Chaos is over. Total kills: {} ({} job 
manager + {} task managers). " +
+                                                       "Checking job 
status...",
+                                       totalNumberOfKills, 
currentJobManagerKills, currentTaskManagerKills);
+
+                       // Signal the job to speed up (if it is not done yet)
+                       TestJvmProcess.touchFile(ProceedCoordination);
+
+                       // Wait for the job to finish
+                       LOG.info("Waiting for job status {}", 
JobStatus.FINISHED);
+                       waitForJobFinished(jobGraph.getJobID(), 
jobManagerProcesses.get(leaderIndex),
+                                       testActorSystem, deadline.timeLeft());
+
+                       LOG.info("Job finished");
+
+                       LOG.info("Waiting for job removal");
+                       waitForJobRemoved(jobGraph.getJobID(), 
jobManagerProcesses.get(leaderIndex),
+                                       testActorSystem, deadline.timeLeft());
+                       LOG.info("Job removed");
+
+                       LOG.info("Checking clean recovery state...");
+                       checkCleanRecoveryState(config);
+                       LOG.info("Recovery state clean");
+               }
+               catch (Throwable t) {
+                       
System.out.println("#################################################");
+                       System.out.println(" TASK MANAGERS");
+                       
System.out.println("#################################################");
+
+                       for (TaskManagerProcess taskManagerProcess : 
taskManagerProcesses) {
+                               taskManagerProcess.printProcessLog();
+                       }
+
+                       
System.out.println("#################################################");
+                       System.out.println(" JOB MANAGERS");
+                       
System.out.println("#################################################");
+
+                       for (JobManagerProcess jobManagerProcess : 
jobManagerProcesses) {
+                               jobManagerProcess.printProcessLog();
+                       }
+
+                       t.printStackTrace();
+               }
+               finally {
+                       for (JobManagerProcess jobManagerProcess : 
jobManagerProcesses) {
+                               if (jobManagerProcess != null) {
+                                       jobManagerProcess.destroy();
+                               }
+                       }
+
+                       if (leaderRetrievalService != null) {
+                               leaderRetrievalService.stop();
+                       }
+
+                       if (testActorSystem != null) {
+                               testActorSystem.shutdown();
+                       }
+               }
+       }
+
+       // - The test program 
--------------------------------------------------------------------------
+
+       private JobGraph createJobGraph(
+                       int n,
+                       String completedCheckpointMarker,
+                       String proceedMarker,
+                       int parallelism,
+                       int checkpointingIntervalMs) {
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(parallelism);
+               env.enableCheckpointing(checkpointingIntervalMs);
+
+               int expectedResult = parallelism * n * (n + 1) / 2;
+
+               env.addSource(new CheckpointedSequenceSource(n, 
completedCheckpointMarker, proceedMarker))
+                               .addSink(new CountingSink(parallelism, 
expectedResult))
+                               .setParallelism(1);
+
+               return env.getStreamGraph().getJobGraph();
+       }
+
+       public static class CheckpointedSequenceSource extends 
RichParallelSourceFunction<Long>
+                       implements Checkpointed<Long>, CheckpointNotifier {
+
+               private static final long serialVersionUID = 0L;
+
+               private final long end;
+
+               private final String completedCheckpointMarkerFilePath;
+
+               private final File proceedFile;
+
+               private long current = 0;
+
+               private volatile boolean isRunning = true;
+
+               public CheckpointedSequenceSource(long end, String 
completedCheckpointMarkerFilePath, String proceedMarkerFilePath) {
+                       checkArgument(end >= 0, "Negative final count");
+
+                       this.end = end;
+                       this.completedCheckpointMarkerFilePath = 
completedCheckpointMarkerFilePath;
+                       this.proceedFile = new File(proceedMarkerFilePath);
+               }
+
+               @Override
+               public void run(SourceContext<Long> ctx) throws Exception {
+                       while (isRunning) {
+
+                               if (!proceedFile.exists()) {
+                                       Thread.sleep(50);
+                               }
+
+                               synchronized (ctx.getCheckpointLock()) {
+                                       if (current <= end) {
+                                               ctx.collect(current++);
+                                       }
+                                       else {
+                                               ctx.collect(LastElement);
+                                               return;
+                                       }
+                               }
+                       }
+               }
+
+               @Override
+               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+                       LOG.info("Snapshotting state {} @ ID {}.", current, 
checkpointId);
+                       return current;
+               }
+
+               @Override
+               public void restoreState(Long state) {
+                       LOG.info("Restoring state {}/{}", state, end);
+                       current = state;
+               }
+
+
+               @Override
+               public void cancel() {
+                       isRunning = false;
+               }
+
+               @Override
+               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+                       LOG.info("Checkpoint {} completed.", checkpointId);
+
+                       int taskIndex = 
getRuntimeContext().getIndexOfThisSubtask();
+                       TestJvmProcess.touchFile(new 
File(completedCheckpointMarkerFilePath + taskIndex));
+               }
+       }
+
+       public static class CountingSink extends RichSinkFunction<Long>
+                       implements Checkpointed<CountingSink>, 
CheckpointNotifier {
+
+               private static final Logger LOG = 
LoggerFactory.getLogger(CountingSink.class);
+
+               private static final long serialVersionUID = 0L;
+
+               private final int parallelism;
+
+               private final long expectedFinalCount;
+
+               private long current;
+
+               private int numberOfReceivedLastElements;
+
+
+               public CountingSink(int parallelism, long expectedFinalCount) {
+                       this.expectedFinalCount = expectedFinalCount;
+                       this.parallelism = parallelism;
+               }
+
+               @Override
+               public void invoke(Long value) throws Exception {
+                       if (value == LastElement) {
+                               numberOfReceivedLastElements++;
+
+                               if (numberOfReceivedLastElements == 
parallelism) {
+                                       if (current != expectedFinalCount) {
+                                               throw new Exception("Unexpected 
final result " + current);
+                                       }
+                                       else {
+                                               LOG.info("Final result " + 
current);
+                                       }
+                               }
+                               else if (numberOfReceivedLastElements > 
parallelism) {
+                                       throw new 
IllegalStateException("Received more elements than parallelism.");
+                               }
+                       }
+                       else {
+                               current += value;
+                       }
+               }
+
+               @Override
+               public CountingSink snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+                       LOG.info("Snapshotting state {}:{} @ ID {}.", current, 
numberOfReceivedLastElements, checkpointId);
+                       return this;
+               }
+
+               @Override
+               public void restoreState(CountingSink state) {
+                       LOG.info("Restoring state {}:{}", state.current, 
state.numberOfReceivedLastElements);
+                       this.current = state.current;
+                       this.numberOfReceivedLastElements = 
state.numberOfReceivedLastElements;
+               }
+
+               @Override
+               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+                       LOG.info("Checkpoint {} completed.", checkpointId);
+               }
+       }
+
+       // - Utilities 
---------------------------------------------------------------------------------
+
+       private void submitJobGraph(
+                       JobGraph jobGraph,
+                       JobManagerProcess jobManager,
+                       TestingListener leaderListener,
+                       ActorSystem actorSystem,
+                       FiniteDuration timeout) throws Exception {
+
+               ActorRef jobManagerRef = jobManager.getActorRef(actorSystem, 
timeout);
+               UUID jobManagerLeaderId = leaderListener.getLeaderSessionID();
+               AkkaActorGateway jobManagerGateway = new 
AkkaActorGateway(jobManagerRef, jobManagerLeaderId);
+
+               jobManagerGateway.tell(new 
JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED));
+       }
+
+       private void checkCleanRecoveryState(Configuration config) throws 
Exception {
+               LOG.info("Checking " + ZooKeeper.getClient().getNamespace() +
+                               
ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
+               List<String> jobGraphs = ZooKeeper.getClient().getChildren()
+                               
.forPath(ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
+               assertEquals("Unclean job graphs: " + jobGraphs, 0, 
jobGraphs.size());
+
+               LOG.info("Checking " + ZooKeeper.getClient().getNamespace() +
+                               
ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
+               List<String> checkpoints = ZooKeeper.getClient().getChildren()
+                               
.forPath(ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
+               assertEquals("Unclean checkpoints: " + checkpoints, 0, 
checkpoints.size());
+
+               LOG.info("Checking " + ZooKeeper.getClient().getNamespace() +
+                               
ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
+               List<String> checkpointCounter = 
ZooKeeper.getClient().getChildren()
+                               
.forPath(ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
+               assertEquals("Unclean checkpoint counter: " + 
checkpointCounter, 0, checkpointCounter.size());
+
+               LOG.info("ZooKeeper state is clean");
+
+               LOG.info("Checking file system backend state...");
+
+               File fsCheckpoints = new 
File(config.getString(ConfigConstants.STATE_BACKEND_FS_DIR, ""));
+
+               LOG.info("Checking " + fsCheckpoints);
+
+               File[] files = fsCheckpoints.listFiles();
+               if (files == null) {
+                       fail(fsCheckpoints + " does not exist: " + 
Arrays.toString(FileStateBackendBasePath.listFiles()));
+               }
+               else {
+                       assertEquals("Unclean file system checkpoints: " + 
Arrays.toString(fsCheckpoints.listFiles()),
+                                       0, files.length);
+               }
+
+               File fsRecovery = new 
File(config.getString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, ""));
+
+               LOG.info("Checking " + fsRecovery);
+
+               files = fsRecovery.listFiles();
+               if (files == null) {
+                       fail(fsRecovery + " does not exist: " + 
Arrays.toString(FileStateBackendBasePath.listFiles()));
+               }
+               else {
+                       assertEquals("Unclean file system checkpoints: " + 
Arrays.toString(fsRecovery.listFiles()),
+                                       0, files.length);
+               }
+       }
+
+       private void waitForJobRemoved(
+                       JobID jobId, JobManagerProcess jobManager, ActorSystem 
actorSystem, FiniteDuration timeout)
+                       throws Exception {
+
+               ActorRef jobManagerRef = jobManager.getActorRef(actorSystem, 
timeout);
+               AkkaActorGateway jobManagerGateway = new 
AkkaActorGateway(jobManagerRef, null);
+
+               Future<Object> archiveFuture = 
jobManagerGateway.ask(JobManagerMessages.getRequestArchive(), timeout);
+
+               ActorRef archive = ((JobManagerMessages.ResponseArchive) 
Await.result(archiveFuture, timeout)).actor();
+
+               AkkaActorGateway archiveGateway = new AkkaActorGateway(archive, 
null);
+
+               Deadline deadline = timeout.fromNow();
+
+               while (deadline.hasTimeLeft()) {
+                       JobManagerMessages.JobStatusResponse resp = 
JobManagerActorTestUtils
+                                       .requestJobStatus(jobId, 
archiveGateway, deadline.timeLeft());
+
+                       if (resp instanceof JobManagerMessages.JobNotFound) {
+                               Thread.sleep(100);
+                       }
+                       else {
+                               return;
+                       }
+               }
+       }
+
+       private JobStatus requestJobStatus(
+                       JobID jobId, JobManagerProcess jobManager, ActorSystem 
actorSystem, FiniteDuration timeout)
+                       throws Exception {
+
+               ActorRef jobManagerRef = jobManager.getActorRef(actorSystem, 
timeout);
+               AkkaActorGateway jobManagerGateway = new 
AkkaActorGateway(jobManagerRef, null);
+
+               JobManagerMessages.JobStatusResponse resp = 
JobManagerActorTestUtils
+                               .requestJobStatus(jobId, jobManagerGateway, 
timeout);
+
+               if (resp instanceof JobManagerMessages.CurrentJobStatus) {
+                       JobManagerMessages.CurrentJobStatus jobStatusResponse = 
(JobManagerMessages
+                                       .CurrentJobStatus) resp;
+
+                       return jobStatusResponse.status();
+               }
+               else if (resp instanceof JobManagerMessages.JobNotFound) {
+                       return JobStatus.RESTARTING;
+               }
+
+               throw new IllegalStateException("Unexpected response from 
JobManager");
+       }
+
+       private void waitForJobRunning(
+                       JobID jobId, JobManagerProcess jobManager, ActorSystem 
actorSystem, FiniteDuration timeout)
+                       throws Exception {
+
+               ActorRef jobManagerRef = jobManager.getActorRef(actorSystem, 
timeout);
+               AkkaActorGateway jobManagerGateway = new 
AkkaActorGateway(jobManagerRef, null);
+
+               JobManagerActorTestUtils.waitForJobStatus(jobId, 
JobStatus.RUNNING, jobManagerGateway, timeout);
+       }
+
+       private void waitForJobFinished(
+                       JobID jobId, JobManagerProcess jobManager, ActorSystem 
actorSystem, FiniteDuration timeout)
+                       throws Exception {
+
+               ActorRef jobManagerRef = jobManager.getActorRef(actorSystem, 
timeout);
+               AkkaActorGateway jobManagerGateway = new 
AkkaActorGateway(jobManagerRef, null);
+
+               JobManagerActorTestUtils.waitForJobStatus(jobId, 
JobStatus.FINISHED, jobManagerGateway, timeout);
+       }
+
+       private void waitForTaskManagers(
+                       int minimumNumberOfTaskManagers,
+                       JobManagerProcess jobManager,
+                       ActorSystem actorSystem,
+                       FiniteDuration timeout) throws Exception {
+
+               LOG.info("Waiting for {} task managers to connect to leading 
{}",
+                               minimumNumberOfTaskManagers, jobManager);
+
+               ActorRef jobManagerRef = jobManager.getActorRef(actorSystem, 
timeout);
+               AkkaActorGateway jobManagerGateway = new 
AkkaActorGateway(jobManagerRef, null);
+
+               JobManagerActorTestUtils.waitForTaskManagers(
+                               minimumNumberOfTaskManagers, jobManagerGateway, 
timeout);
+
+               LOG.info("All task managers connected");
+       }
+
+       private int waitForNewLeader(
+                       TestingListener leaderListener,
+                       List<JobManagerProcess> jobManagerProcesses,
+                       FiniteDuration timeout) throws Exception {
+
+               LOG.info("Waiting for new leader notification");
+               leaderListener.waitForNewLeader(timeout.toMillis());
+
+               LOG.info("Leader: {}:{}", leaderListener.getAddress(), 
leaderListener.getLeaderSessionID());
+
+               String currentLeader = leaderListener.getAddress();
+
+               int leaderIndex = -1;
+
+               for (int i = 0; i < jobManagerProcesses.size(); i++) {
+                       JobManagerProcess jobManager = 
jobManagerProcesses.get(i);
+                       if 
(jobManager.getJobManagerAkkaURL().equals(currentLeader)) {
+                               leaderIndex = i;
+                               break;
+                       }
+               }
+
+               if (leaderIndex == -1) {
+                       throw new IllegalStateException("Failed to determine 
which process is leader");
+               }
+
+               return leaderIndex;
+       }
+
+       private JobManagerProcess createAndStartJobManagerProcess(Configuration 
config)
+                       throws Exception {
+
+               JobManagerProcess jobManager = new 
JobManagerProcess(jobManagerPid++, config);
+               jobManager.createAndStart();
+               LOG.info("Created and started {}.", jobManager);
+
+               return jobManager;
+       }
+
+       private TaskManagerProcess 
createAndStartTaskManagerProcess(Configuration config)
+                       throws Exception {
+
+               TaskManagerProcess taskManager = new 
TaskManagerProcess(taskManagerPid++, config);
+               taskManager.createAndStart();
+               LOG.info("Created and started {}.", taskManager);
+
+               return taskManager;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
new file mode 100644
index 0000000..54ddf7e
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.leaderelection.TestingListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
+import org.apache.flink.runtime.testutils.JobManagerProcess;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class JobManagerCheckpointRecoveryITCase {
+
+       private final static ZooKeeperTestEnvironment ZooKeeper = new 
ZooKeeperTestEnvironment(1);
+
+       private final static FiniteDuration TestTimeOut = new FiniteDuration(5, 
TimeUnit.MINUTES);
+
+       private static final File FileStateBackendBasePath;
+
+       static {
+               try {
+                       FileStateBackendBasePath = 
CommonTestUtils.createTempDirectory();
+               }
+               catch (IOException e) {
+                       throw new RuntimeException("Error in test setup. Could 
not create directory.", e);
+               }
+       }
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               ZooKeeper.shutdown();
+
+               if (FileStateBackendBasePath != null) {
+                       FileUtils.deleteDirectory(FileStateBackendBasePath);
+               }
+       }
+
+       @Before
+       public void cleanUp() throws Exception {
+               if (FileStateBackendBasePath != null) {
+                       FileUtils.cleanDirectory(FileStateBackendBasePath);
+               }
+
+               ZooKeeper.deleteAll();
+       }
+
+       // 
---------------------------------------------------------------------------------------------
+
+       private static final int Parallelism = 8;
+
+       private static final CountDownLatch CompletedCheckpointsLatch = new 
CountDownLatch(2);
+
+       private static final AtomicLongArray RecoveredStates = new 
AtomicLongArray(Parallelism);
+
+       private static final CountDownLatch FinalCountLatch = new 
CountDownLatch(1);
+
+       private static final AtomicReference<Long> FinalCount = new 
AtomicReference<>();
+
+       private static final long LastElement = -1;
+
+       /**
+        * Simple checkpointed streaming sum.
+        *
+        * <p>The sources (Parallelism) count until sequenceEnd. The sink (1) 
sums up all counts and
+        * returns it to the main thread via a static variable. We wait until 
some checkpoints are
+        * completed and sanity check that the sources recover with an updated 
state to make sure that
+        * this test actually tests something.
+        */
+       @Test
+       public void testCheckpointedStreamingSumProgram() throws Exception {
+               // Config
+               final int checkpointingInterval = 200;
+               final int sequenceEnd = 5000;
+               final long expectedSum = Parallelism * sequenceEnd * 
(sequenceEnd + 1) / 2;
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
+               env.setParallelism(Parallelism);
+               env.enableCheckpointing(checkpointingInterval);
+
+               env
+                               .addSource(new 
CheckpointedSequenceSource(sequenceEnd))
+                               .addSink(new CountingSink())
+                               .setParallelism(1);
+
+               JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+
+               Configuration config = 
ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(ZooKeeper
+                               .getConnectString(), 
FileStateBackendBasePath.getPath());
+               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
Parallelism);
+
+               ActorSystem testSystem = null;
+               JobManagerProcess[] jobManagerProcess = new 
JobManagerProcess[2];
+               LeaderRetrievalService leaderRetrievalService = null;
+               ActorSystem taskManagerSystem = null;
+
+               try {
+                       final Deadline deadline = TestTimeOut.fromNow();
+
+                       // Test actor system
+                       testSystem = AkkaUtils.createActorSystem(new 
Configuration(),
+                                       new Some<>(new Tuple2<String, 
Object>("localhost", 0)));
+
+                       // The job managers
+                       jobManagerProcess[0] = new JobManagerProcess(0, config);
+                       jobManagerProcess[1] = new JobManagerProcess(1, config);
+
+                       jobManagerProcess[0].createAndStart();
+                       jobManagerProcess[1].createAndStart();
+
+                       // Leader listener
+                       TestingListener leaderListener = new TestingListener();
+                       leaderRetrievalService = 
ZooKeeperUtils.createLeaderRetrievalService(config);
+                       leaderRetrievalService.start(leaderListener);
+
+                       // The task manager
+                       taskManagerSystem = 
AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+                       TaskManager.startTaskManagerComponentsAndActor(
+                                       config, taskManagerSystem, "localhost",
+                                       Option.<String>empty(), 
Option.<LeaderRetrievalService>empty(),
+                                       false, StreamingMode.STREAMING, 
TaskManager.class);
+
+                       {
+                               // Initial submission
+                               
leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
+
+                               String leaderAddress = 
leaderListener.getAddress();
+                               UUID leaderId = 
leaderListener.getLeaderSessionID();
+
+                               // Get the leader ref
+                               ActorRef leaderRef = AkkaUtils.getActorRef(
+                                               leaderAddress, testSystem, 
deadline.timeLeft());
+                               ActorGateway leader = new 
AkkaActorGateway(leaderRef, leaderId);
+
+                               // Submit the job in detached mode
+                               leader.tell(new SubmitJob(jobGraph, 
ListeningBehaviour.DETACHED));
+
+                               JobManagerActorTestUtils.waitForJobStatus(
+                                               jobGraph.getJobID(), 
JobStatus.RUNNING, leader, deadline.timeLeft());
+                       }
+
+                       // Who's the boss?
+                       JobManagerProcess leadingJobManagerProcess;
+                       if 
(jobManagerProcess[0].getJobManagerAkkaURL().equals(leaderListener.getAddress()))
 {
+                               leadingJobManagerProcess = jobManagerProcess[0];
+                       }
+                       else {
+                               leadingJobManagerProcess = jobManagerProcess[1];
+                       }
+
+                       CompletedCheckpointsLatch.await();
+
+                       // Kill the leading job manager process
+                       leadingJobManagerProcess.destroy();
+
+                       {
+                               // Recovery by the standby JobManager
+                               
leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
+
+                               String leaderAddress = 
leaderListener.getAddress();
+                               UUID leaderId = 
leaderListener.getLeaderSessionID();
+
+                               ActorRef leaderRef = AkkaUtils.getActorRef(
+                                               leaderAddress, testSystem, 
deadline.timeLeft());
+                               ActorGateway leader = new 
AkkaActorGateway(leaderRef, leaderId);
+
+                               
JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), 
JobStatus.RUNNING,
+                                               leader, deadline.timeLeft());
+                       }
+
+                       // Wait to finish
+                       FinalCountLatch.await();
+
+                       assertEquals(expectedSum, (long) FinalCount.get());
+
+                       for (int i = 0; i < Parallelism; i++) {
+                               assertNotEquals(0, RecoveredStates.get(i));
+                       }
+               }
+               catch (Throwable t) {
+                       // In case of an error, print the job manager process 
logs.
+                       if (jobManagerProcess[0] != null) {
+                               jobManagerProcess[0].printProcessLog();
+                       }
+
+                       if (jobManagerProcess[1] != null) {
+                               jobManagerProcess[1].printProcessLog();
+                       }
+
+                       t.printStackTrace();
+               }
+               finally {
+                       if (jobManagerProcess[0] != null) {
+                               jobManagerProcess[0].destroy();
+                       }
+
+                       if (jobManagerProcess[1] != null) {
+                               jobManagerProcess[1].destroy();
+                       }
+
+                       if (leaderRetrievalService != null) {
+                               leaderRetrievalService.stop();
+                       }
+
+                       if (taskManagerSystem != null) {
+                               taskManagerSystem.shutdown();
+                       }
+
+                       if (testSystem != null) {
+                               testSystem.shutdown();
+                       }
+               }
+       }
+
+       // 
---------------------------------------------------------------------------------------------
+
+       /**
+        * A checkpointed source, which emits elements from 0 to a configured 
number.
+        */
+       public static class CheckpointedSequenceSource extends 
RichParallelSourceFunction<Long>
+                       implements Checkpointed<Long> {
+
+               private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointedSequenceSource.class);
+
+               private static final long serialVersionUID = 0L;
+
+               private static final CountDownLatch sync = new 
CountDownLatch(Parallelism);
+
+               private final long end;
+
+               private long current = 0;
+
+               private volatile boolean isRunning = true;
+
+               public CheckpointedSequenceSource(long end) {
+                       checkArgument(end >= 0, "Negative final count");
+                       this.end = end;
+               }
+
+               @Override
+               public void run(SourceContext<Long> ctx) throws Exception {
+                       while (isRunning) {
+                               synchronized (ctx.getCheckpointLock()) {
+                                       if (current <= end) {
+                                               ctx.collect(current++);
+                                       }
+                                       else {
+                                               ctx.collect(LastElement);
+                                               return;
+                                       }
+                               }
+
+                               // Slow down until some checkpoints are 
completed
+                               if (sync.getCount() != 0) {
+                                       Thread.sleep(100);
+                               }
+                       }
+               }
+
+               @Override
+               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+                       LOG.debug("Snapshotting state {} @ ID {}.", current, 
checkpointId);
+                       return current;
+               }
+
+               @Override
+               public void restoreState(Long state) {
+                       LOG.debug("Restoring state {}", state);
+
+                       // This is necessary to make sure that something is 
recovered at all. Otherwise it
+                       // might happen that the job is restarted from the 
beginning.
+                       
RecoveredStates.set(getRuntimeContext().getIndexOfThisSubtask(), state);
+
+                       sync.countDown();
+
+                       current = state;
+               }
+
+               @Override
+               public void cancel() {
+                       isRunning = false;
+               }
+       }
+
+       /**
+        * A checkpointed sink, which sums up its input and notifies the main 
thread after all inputs
+        * are exhausted.
+        */
+       public static class CountingSink implements SinkFunction<Long>, 
Checkpointed<CountingSink>,
+                       CheckpointNotifier {
+
+               private static final Logger LOG = 
LoggerFactory.getLogger(CountingSink.class);
+
+               private static final long serialVersionUID = 
1436484290453629091L;
+
+               private long current = 0;
+
+               private int numberOfReceivedLastElements;
+
+               @Override
+               public void invoke(Long value) throws Exception {
+                       if (value == LastElement) {
+                               numberOfReceivedLastElements++;
+
+                               if (numberOfReceivedLastElements == 
Parallelism) {
+                                       FinalCount.set(current);
+                                       FinalCountLatch.countDown();
+                               }
+                               else if (numberOfReceivedLastElements > 
Parallelism) {
+                                       throw new 
IllegalStateException("Received more elements than parallelism.");
+                               }
+                       }
+                       else {
+                               current += value;
+                       }
+               }
+
+               @Override
+               public CountingSink snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+                       LOG.debug("Snapshotting state {}:{} @ ID {}.", current, 
numberOfReceivedLastElements, checkpointId);
+                       return this;
+               }
+
+               @Override
+               public void restoreState(CountingSink state) {
+                       LOG.debug("Restoring state {}:{}", state.current, 
state.numberOfReceivedLastElements);
+                       this.current = state.current;
+                       this.numberOfReceivedLastElements = 
state.numberOfReceivedLastElements;
+               }
+
+               @Override
+               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+                       LOG.debug("Checkpoint {} completed.", checkpointId);
+                       CompletedCheckpointsLatch.countDown();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerProcessFailureBatchRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerProcessFailureBatchRecoveryITCase.java
new file mode 100644
index 0000000..66565dd
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerProcessFailureBatchRecoveryITCase.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test the recovery of a simple batch program in the case of JobManager 
process failure.
+ */
+@SuppressWarnings("serial")
+@RunWith(Parameterized.class)
+public class JobManagerProcessFailureBatchRecoveryITCase extends 
AbstractJobManagerProcessFailureRecoveryITCase {
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Parametrization (run pipelined and batch)
+       // 
--------------------------------------------------------------------------------------------
+
+       private final ExecutionMode executionMode;
+
+       public JobManagerProcessFailureBatchRecoveryITCase(ExecutionMode 
executionMode) {
+               this.executionMode = executionMode;
+       }
+
+       @Parameterized.Parameters
+       public static Collection<Object[]> executionMode() {
+               return Arrays.asList(new Object[][]{
+                               {ExecutionMode.PIPELINED},
+                               {ExecutionMode.BATCH}});
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Test the program
+       // 
--------------------------------------------------------------------------------------------
+
+       // This is slightly modified copy the task manager process failure 
program.
+       @Override
+       public void testJobManagerFailure(String zkQuorum, final File 
coordinateDir) throws Exception {
+               Configuration config = new Configuration();
+               config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
+               config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, 
zkQuorum);
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
+                               "leader", 1, config);
+               env.setParallelism(PARALLELISM);
+               env.setNumberOfExecutionRetries(1);
+               env.getConfig().setExecutionMode(executionMode);
+               env.getConfig().disableSysoutLogging();
+
+               final long NUM_ELEMENTS = 100000L;
+               final DataSet<Long> result = env.generateSequence(1, 
NUM_ELEMENTS)
+                               // make sure every mapper is involved (no one 
is skipped because of lazy split assignment)
+                               .rebalance()
+                               // the majority of the behavior is in the 
MapFunction
+                               .map(new RichMapFunction<Long, Long>() {
+
+                                       private final File proceedFile = new 
File(coordinateDir, PROCEED_MARKER_FILE);
+
+                                       private boolean markerCreated = false;
+                                       private boolean checkForProceedFile = 
true;
+
+                                       @Override
+                                       public Long map(Long value) throws 
Exception {
+                                               if (!markerCreated) {
+                                                       int taskIndex = 
getRuntimeContext().getIndexOfThisSubtask();
+                                                       
AbstractTaskManagerProcessFailureRecoveryTest.touchFile(
+                                                                       new 
File(coordinateDir, READY_MARKER_FILE_PREFIX + taskIndex));
+                                                       markerCreated = true;
+                                               }
+
+                                               // check if the proceed file 
exists
+                                               if (checkForProceedFile) {
+                                                       if 
(proceedFile.exists()) {
+                                                               
checkForProceedFile = false;
+                                                       }
+                                                       else {
+                                                               // otherwise 
wait so that we make slow progress
+                                                               
Thread.sleep(100);
+                                                       }
+                                               }
+                                               return value;
+                                       }
+                               })
+                               .reduce(new ReduceFunction<Long>() {
+                                       @Override
+                                       public Long reduce(Long value1, Long 
value2) {
+                                               return value1 + value2;
+                                       }
+                               })
+                               // The check is done in the mapper, because the 
client can currently not handle
+                               // job manager losses/reconnects.
+                               .flatMap(new RichFlatMapFunction<Long, Long>() {
+                                       @Override
+                                       public void flatMap(Long value, 
Collector<Long> out) throws Exception {
+                                               assertEquals(NUM_ELEMENTS * 
(NUM_ELEMENTS + 1L) / 2L, (long) value);
+
+                                               int taskIndex = 
getRuntimeContext().getIndexOfThisSubtask();
+                                               
AbstractTaskManagerProcessFailureRecoveryTest.touchFile(
+                                                               new 
File(coordinateDir, FINISH_MARKER_FILE_PREFIX + taskIndex));
+                                       }
+                               });
+
+               result.output(new DiscardingOutputFormat<Long>());
+
+               env.execute();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
deleted file mode 100644
index f2b8c31..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recovery;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.Collection;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Test the recovery of a simple batch program in the case of TaskManager 
process failure.
- */
-@SuppressWarnings("serial")
-@RunWith(Parameterized.class)
-public class ProcessFailureBatchRecoveryITCase extends 
AbstractProcessFailureRecoveryTest {
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Parametrization (run pipelined and batch)
-       // 
--------------------------------------------------------------------------------------------
-
-       private final ExecutionMode executionMode;
-
-       public ProcessFailureBatchRecoveryITCase(ExecutionMode executionMode) {
-               this.executionMode = executionMode;
-       }
-
-       @Parameterized.Parameters
-       public static Collection<Object[]> executionMode() {
-               return Arrays.asList(new Object[][]{
-                               {ExecutionMode.PIPELINED},
-                               {ExecutionMode.BATCH}});
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Test the program
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void testProgram(int jobManagerPort, final File coordinateDir) 
throws Exception {
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort);
-               env.setParallelism(PARALLELISM);
-               env.setNumberOfExecutionRetries(1);
-               env.getConfig().setExecutionMode(executionMode);
-               env.getConfig().disableSysoutLogging();
-
-               final long NUM_ELEMENTS = 100000L;
-               final DataSet<Long> result = env.generateSequence(1, 
NUM_ELEMENTS)
-
-                               // make sure every mapper is involved (no one 
is skipped because of lazy split assignment)
-                               .rebalance()
-                                               // the majority of the behavior 
is in the MapFunction
-                               .map(new RichMapFunction<Long, Long>() {
-
-                                       private final File proceedFile = new 
File(coordinateDir, PROCEED_MARKER_FILE);
-
-                                       private boolean markerCreated = false;
-                                       private boolean checkForProceedFile = 
true;
-
-                                       @Override
-                                       public Long map(Long value) throws 
Exception {
-                                               if (!markerCreated) {
-                                                       int taskIndex = 
getRuntimeContext().getIndexOfThisSubtask();
-                                                       touchFile(new 
File(coordinateDir, READY_MARKER_FILE_PREFIX + taskIndex));
-                                                       markerCreated = true;
-                                               }
-
-                                               // check if the proceed file 
exists
-                                               if (checkForProceedFile) {
-                                                       if 
(proceedFile.exists()) {
-                                                               
checkForProceedFile = false;
-                                                       } else {
-                                                               // otherwise 
wait so that we make slow progress
-                                                               
Thread.sleep(100);
-                                                       }
-                                               }
-                                               return value;
-                                       }
-                               })
-                               .reduce(new ReduceFunction<Long>() {
-                                       @Override
-                                       public Long reduce(Long value1, Long 
value2) {
-                                               return value1 + value2;
-                                       }
-                               });
-
-               long sum = result.collect().get(0);
-               assertEquals(NUM_ELEMENTS * (NUM_ELEMENTS + 1L) / 2L, sum);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 945a78c..6dce370 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -115,13 +115,13 @@ public class ProcessFailureCancelingITCase {
                                        "-Dlog4j.configuration=file:" + 
tempLogFile.getAbsolutePath(),
                                        "-Xms80m", "-Xmx80m",
                                        "-classpath", getCurrentClasspath(),
-                                       
AbstractProcessFailureRecoveryTest.TaskManagerProcessEntryPoint.class.getName(),
+                                       
AbstractTaskManagerProcessFailureRecoveryTest.TaskManagerProcessEntryPoint.class.getName(),
                                        String.valueOf(jobManagerPort)
                        };
 
                        // start the first two TaskManager processes
                        taskManagerProcess = new 
ProcessBuilder(command).start();
-                       new 
AbstractProcessFailureRecoveryTest.PipeForwarder(taskManagerProcess.getErrorStream(),
 processOutput);
+                       new 
CommonTestUtils.PipeForwarder(taskManagerProcess.getErrorStream(), 
processOutput);
                        
                        // we wait for the JobManager to have the two 
TaskManagers available
                        // since some of the CI environments are very hostile, 
we need to give this a lot of time (2 minutes)

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
deleted file mode 100644
index 054b321..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recovery;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.UUID;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.state.filesystem.FsStateBackend;
-
-import org.junit.Assert;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test for streaming program behaviour in case of TaskManager failure
- * based on {@link AbstractProcessFailureRecoveryTest}.
- *
- * The logic in this test is as follows:
- *  - The source slowly emits records (every 10 msecs) until the test driver
- *    gives the "go" for regular execution
- *  - The "go" is given after the first taskmanager has been killed, so it can 
only
- *    happen in the recovery run
- *  - The mapper must not be slow, because otherwise the checkpoint barrier 
cannot pass
- *    the mapper and no checkpoint will be completed before the killing of the 
first
- *    TaskManager.
- */
-@SuppressWarnings("serial")
-public class ProcessFailureStreamingRecoveryITCase extends 
AbstractProcessFailureRecoveryTest {
-
-       private static final int DATA_COUNT = 10000;
-
-       @Override
-       public void testProgram(int jobManagerPort, final File coordinateDir) 
throws Exception {
-               
-               final File tempCheckpointDir = new File(new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH),
-                               UUID.randomUUID().toString());
-
-               assertTrue("Cannot create directory for checkpoints", 
tempCheckpointDir.mkdirs());
-
-               StreamExecutionEnvironment env = StreamExecutionEnvironment
-                                                                       
.createRemoteEnvironment("localhost", jobManagerPort);
-               env.setParallelism(PARALLELISM);
-               env.getConfig().disableSysoutLogging();
-               env.setNumberOfExecutionRetries(1);
-               env.enableCheckpointing(200);
-               
-               env.setStateBackend(new 
FsStateBackend(tempCheckpointDir.getAbsoluteFile().toURI()));
-
-               DataStream<Long> result = env.addSource(new 
SleepyDurableGenerateSequence(coordinateDir, DATA_COUNT))
-                               // add a non-chained no-op map to test the 
chain state restore logic
-                               .map(new MapFunction<Long, Long>() {
-                                       @Override
-                                       public Long map(Long value) throws 
Exception {
-                                               return value;
-                                       }
-                               }).startNewChain()
-                               // populate the coordinate directory so we can 
proceed to TaskManager failure
-                               .map(new Mapper(coordinateDir));
-
-               //write result to temporary file
-               result.addSink(new CheckpointedSink(DATA_COUNT));
-
-               try {
-                       // blocking call until execution is done
-                       env.execute();
-
-                       // TODO: Figure out why this fails when ran with other 
tests
-                       // Check whether checkpoints have been cleaned up 
properly
-                       // assertDirectoryEmpty(tempCheckpointDir);
-               }
-               finally {
-                       // clean up
-                       if (tempCheckpointDir.exists()) {
-                               FileUtils.deleteDirectory(tempCheckpointDir);
-                       }
-               }
-       }
-
-       public static class SleepyDurableGenerateSequence extends 
RichParallelSourceFunction<Long> 
-                       implements Checkpointed<Long> {
-
-               private static final long SLEEP_TIME = 50;
-
-               private final File coordinateDir;
-               private final long end;
-
-               private volatile boolean isRunning = true;
-               
-               private long collected;
-
-               public SleepyDurableGenerateSequence(File coordinateDir, long 
end) {
-                       this.coordinateDir = coordinateDir;
-                       this.end = end;
-               }
-
-               @Override
-               public void run(SourceContext<Long> sourceCtx) throws Exception 
{
-                       final Object checkpointLock = 
sourceCtx.getCheckpointLock();
-
-                       RuntimeContext runtimeCtx = getRuntimeContext();
-
-                       final long stepSize = 
runtimeCtx.getNumberOfParallelSubtasks();
-                       final long congruence = 
runtimeCtx.getIndexOfThisSubtask();
-                       final long toCollect = (end % stepSize > congruence) ? 
(end / stepSize + 1) : (end / stepSize);
-
-                       final File proceedFile = new File(coordinateDir, 
PROCEED_MARKER_FILE);
-                       boolean checkForProceedFile = true;
-
-                       while (isRunning && collected < toCollect) {
-                               // check if the proceed file exists (then we go 
full speed)
-                               // if not, we always recheck and sleep
-                               if (checkForProceedFile) {
-                                       if (proceedFile.exists()) {
-                                               checkForProceedFile = false;
-                                       } else {
-                                               // otherwise wait so that we 
make slow progress
-                                               Thread.sleep(SLEEP_TIME);
-                                       }
-                               }
-
-                               synchronized (checkpointLock) {
-                                       sourceCtx.collect(collected * stepSize 
+ congruence);
-                                       collected++;
-                               }
-                       }
-               }
-
-               @Override
-               public void cancel() {
-                       isRunning = false;
-               }
-
-               @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return collected;
-               }
-
-               @Override
-               public void restoreState(Long state) {
-                       collected = state;
-               }
-       }
-       
-       public static class Mapper extends RichMapFunction<Long, Long> {
-               private boolean markerCreated = false;
-               private File coordinateDir;
-
-               public Mapper(File coordinateDir) {
-                       this.coordinateDir = coordinateDir;
-               }
-
-               @Override
-               public Long map(Long value) throws Exception {
-                       if (!markerCreated) {
-                               int taskIndex = 
getRuntimeContext().getIndexOfThisSubtask();
-                               touchFile(new File(coordinateDir, 
READY_MARKER_FILE_PREFIX + taskIndex));
-                               markerCreated = true;
-                       }
-                       return value;
-               }
-       }
-
-       private static class CheckpointedSink extends RichSinkFunction<Long> 
implements Checkpointed<Long> {
-
-               private long stepSize;
-               private long congruence;
-               private long toCollect;
-               private Long collected = 0L;
-               private long end;
-
-               public CheckpointedSink(long end) {
-                       this.end = end;
-               }
-
-               @Override
-               public void open(Configuration parameters) throws IOException {
-                       stepSize = 
getRuntimeContext().getNumberOfParallelSubtasks();
-                       congruence = 
getRuntimeContext().getIndexOfThisSubtask();
-                       toCollect = (end % stepSize > congruence) ? (end / 
stepSize + 1) : (end / stepSize);
-               }
-
-               @Override
-               public void invoke(Long value) throws Exception {
-                       long expected = collected * stepSize + congruence;
-
-                       Assert.assertTrue("Value did not match expected value. 
" + expected + " != " + value, value.equals(expected));
-
-                       collected++;
-
-                       if (collected > toCollect) {
-                               Assert.fail("Collected <= toCollect: " + 
collected + " > " + toCollect);
-                       }
-
-               }
-
-               @Override
-               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
-                       return collected;
-               }
-
-               @Override
-               public void restoreState(Long state) {
-                       collected = state;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
new file mode 100644
index 0000000..173c8ea
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test the recovery of a simple batch program in the case of TaskManager 
process failure.
+ */
+@SuppressWarnings("serial")
+@RunWith(Parameterized.class)
+public class TaskManagerProcessFailureBatchRecoveryITCase extends 
AbstractTaskManagerProcessFailureRecoveryTest {
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Parametrization (run pipelined and batch)
+       // 
--------------------------------------------------------------------------------------------
+
+       private final ExecutionMode executionMode;
+
+       public TaskManagerProcessFailureBatchRecoveryITCase(ExecutionMode 
executionMode) {
+               this.executionMode = executionMode;
+       }
+
+       @Parameterized.Parameters
+       public static Collection<Object[]> executionMode() {
+               return Arrays.asList(new Object[][]{
+                               {ExecutionMode.PIPELINED},
+                               {ExecutionMode.BATCH}});
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Test the program
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void testTaskManagerFailure(int jobManagerPort, final File 
coordinateDir) throws Exception {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort);
+               env.setParallelism(PARALLELISM);
+               env.setNumberOfExecutionRetries(1);
+               env.getConfig().setExecutionMode(executionMode);
+               env.getConfig().disableSysoutLogging();
+
+               final long NUM_ELEMENTS = 100000L;
+               final DataSet<Long> result = env.generateSequence(1, 
NUM_ELEMENTS)
+
+                               // make sure every mapper is involved (no one 
is skipped because of lazy split assignment)
+                               .rebalance()
+                                               // the majority of the behavior 
is in the MapFunction
+                               .map(new RichMapFunction<Long, Long>() {
+
+                                       private final File proceedFile = new 
File(coordinateDir, PROCEED_MARKER_FILE);
+
+                                       private boolean markerCreated = false;
+                                       private boolean checkForProceedFile = 
true;
+
+                                       @Override
+                                       public Long map(Long value) throws 
Exception {
+                                               if (!markerCreated) {
+                                                       int taskIndex = 
getRuntimeContext().getIndexOfThisSubtask();
+                                                       touchFile(new 
File(coordinateDir, READY_MARKER_FILE_PREFIX + taskIndex));
+                                                       markerCreated = true;
+                                               }
+
+                                               // check if the proceed file 
exists
+                                               if (checkForProceedFile) {
+                                                       if 
(proceedFile.exists()) {
+                                                               
checkForProceedFile = false;
+                                                       } else {
+                                                               // otherwise 
wait so that we make slow progress
+                                                               
Thread.sleep(100);
+                                                       }
+                                               }
+                                               return value;
+                                       }
+                               })
+                               .reduce(new ReduceFunction<Long>() {
+                                       @Override
+                                       public Long reduce(Long value1, Long 
value2) {
+                                               return value1 + value2;
+                                       }
+                               });
+
+               long sum = result.collect().get(0);
+               assertEquals(NUM_ELEMENTS * (NUM_ELEMENTS + 1L) / 2L, sum);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
new file mode 100644
index 0000000..aa634f0
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.state.filesystem.FsStateBackend;
+
+import org.junit.Assert;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for streaming program behaviour in case of TaskManager failure
+ * based on {@link AbstractTaskManagerProcessFailureRecoveryTest}.
+ *
+ * The logic in this test is as follows:
+ *  - The source slowly emits records (every 10 msecs) until the test driver
+ *    gives the "go" for regular execution
+ *  - The "go" is given after the first taskmanager has been killed, so it can 
only
+ *    happen in the recovery run
+ *  - The mapper must not be slow, because otherwise the checkpoint barrier 
cannot pass
+ *    the mapper and no checkpoint will be completed before the killing of the 
first
+ *    TaskManager.
+ */
+@SuppressWarnings("serial")
+public class TaskManagerProcessFailureStreamingRecoveryITCase extends 
AbstractTaskManagerProcessFailureRecoveryTest {
+
+       private static final int DATA_COUNT = 10000;
+
+       @Override
+       public void testTaskManagerFailure(int jobManagerPort, final File 
coordinateDir) throws Exception {
+
+               final File tempCheckpointDir = new File(new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH),
+                               UUID.randomUUID().toString());
+
+               assertTrue("Cannot create directory for checkpoints", 
tempCheckpointDir.mkdirs());
+
+               StreamExecutionEnvironment env = StreamExecutionEnvironment
+                               .createRemoteEnvironment("localhost", 
jobManagerPort);
+               env.setParallelism(PARALLELISM);
+               env.getConfig().disableSysoutLogging();
+               env.setNumberOfExecutionRetries(1);
+               env.enableCheckpointing(200);
+               
+               env.setStateBackend(new 
FsStateBackend(tempCheckpointDir.getAbsoluteFile().toURI()));
+
+               DataStream<Long> result = env.addSource(new 
SleepyDurableGenerateSequence(coordinateDir, DATA_COUNT))
+                               // add a non-chained no-op map to test the 
chain state restore logic
+                               .map(new MapFunction<Long, Long>() {
+                                       @Override
+                                       public Long map(Long value) throws 
Exception {
+                                               return value;
+                                       }
+                               }).startNewChain()
+                                               // populate the coordinate 
directory so we can proceed to TaskManager failure
+                               .map(new Mapper(coordinateDir));
+
+               //write result to temporary file
+               result.addSink(new CheckpointedSink(DATA_COUNT));
+
+               try {
+                       // blocking call until execution is done
+                       env.execute();
+
+                       // TODO: Figure out why this fails when ran with other 
tests
+                       // Check whether checkpoints have been cleaned up 
properly
+                       // assertDirectoryEmpty(tempCheckpointDir);
+               }
+               finally {
+                       // clean up
+                       if (tempCheckpointDir.exists()) {
+                               FileUtils.deleteDirectory(tempCheckpointDir);
+                       }
+               }
+       }
+
+       public static class SleepyDurableGenerateSequence extends 
RichParallelSourceFunction<Long> 
+                       implements Checkpointed<Long> {
+
+               private static final long SLEEP_TIME = 50;
+
+               private final File coordinateDir;
+               private final long end;
+
+               private volatile boolean isRunning = true;
+
+               private long collected;
+
+               public SleepyDurableGenerateSequence(File coordinateDir, long 
end) {
+                       this.coordinateDir = coordinateDir;
+                       this.end = end;
+               }
+
+               @Override
+               public void run(SourceContext<Long> sourceCtx) throws Exception 
{
+                       final Object checkpointLock = 
sourceCtx.getCheckpointLock();
+
+                       RuntimeContext runtimeCtx = getRuntimeContext();
+
+                       final long stepSize = 
runtimeCtx.getNumberOfParallelSubtasks();
+                       final long congruence = 
runtimeCtx.getIndexOfThisSubtask();
+                       final long toCollect = (end % stepSize > congruence) ? 
(end / stepSize + 1) : (end / stepSize);
+
+                       final File proceedFile = new File(coordinateDir, 
PROCEED_MARKER_FILE);
+                       boolean checkForProceedFile = true;
+
+                       while (isRunning && collected < toCollect) {
+                               // check if the proceed file exists (then we go 
full speed)
+                               // if not, we always recheck and sleep
+                               if (checkForProceedFile) {
+                                       if (proceedFile.exists()) {
+                                               checkForProceedFile = false;
+                                       } else {
+                                               // otherwise wait so that we 
make slow progress
+                                               Thread.sleep(SLEEP_TIME);
+                                       }
+                               }
+
+                               synchronized (checkpointLock) {
+                                       sourceCtx.collect(collected * stepSize 
+ congruence);
+                                       collected++;
+                               }
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       isRunning = false;
+               }
+
+               @Override
+               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) {
+                       return collected;
+               }
+
+               @Override
+               public void restoreState(Long state) {
+                       collected = state;
+               }
+       }
+
+       public static class Mapper extends RichMapFunction<Long, Long> {
+               private boolean markerCreated = false;
+               private File coordinateDir;
+
+               public Mapper(File coordinateDir) {
+                       this.coordinateDir = coordinateDir;
+               }
+
+               @Override
+               public Long map(Long value) throws Exception {
+                       if (!markerCreated) {
+                               int taskIndex = 
getRuntimeContext().getIndexOfThisSubtask();
+                               touchFile(new File(coordinateDir, 
READY_MARKER_FILE_PREFIX + taskIndex));
+                               markerCreated = true;
+                       }
+                       return value;
+               }
+       }
+
+       private static class CheckpointedSink extends RichSinkFunction<Long> 
implements Checkpointed<Long> {
+
+               private long stepSize;
+               private long congruence;
+               private long toCollect;
+               private Long collected = 0L;
+               private long end;
+
+               public CheckpointedSink(long end) {
+                       this.end = end;
+               }
+
+               @Override
+               public void open(Configuration parameters) throws IOException {
+                       stepSize = 
getRuntimeContext().getNumberOfParallelSubtasks();
+                       congruence = 
getRuntimeContext().getIndexOfThisSubtask();
+                       toCollect = (end % stepSize > congruence) ? (end / 
stepSize + 1) : (end / stepSize);
+               }
+
+               @Override
+               public void invoke(Long value) throws Exception {
+                       long expected = collected * stepSize + congruence;
+
+                       Assert.assertTrue("Value did not match expected value. 
" + expected + " != " + value, value.equals(expected));
+
+                       collected++;
+
+                       if (collected > toCollect) {
+                               Assert.fail("Collected <= toCollect: " + 
collected + " > " + toCollect);
+                       }
+
+               }
+
+               @Override
+               public Long snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+                       return collected;
+               }
+
+               @Override
+               public void restoreState(Long state) {
+                       collected = state;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 6035c45..ed2113a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.runtime.leaderelection;
 import akka.actor.ActorSystem;
 import akka.actor.Kill;
 import akka.actor.PoisonPill;
+import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobClient;
@@ -39,17 +40,41 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
 import org.junit.Test;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-import static org.junit.Assert.*;
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 public class ZooKeeperLeaderElectionITCase extends TestLogger {
 
        private static final FiniteDuration timeout = 
TestingUtils.TESTING_DURATION();
 
+       private static final File tempDirectory;
+
+       static {
+               try {
+                       tempDirectory = org.apache.flink.runtime.testutils
+                                       .CommonTestUtils.createTempDirectory();
+               }
+               catch (IOException e) {
+                       throw new RuntimeException("Test setup failed", e);
+               }
+       }
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               if (tempDirectory != null) {
+                       FileUtils.deleteDirectory(tempDirectory);
+               }
+       }
+
        /**
         * Tests that the TaskManagers successfully register at the new leader 
once the old leader
         * is terminated.
@@ -64,13 +89,15 @@ public class ZooKeeperLeaderElectionITCase extends 
TestLogger {
                configuration.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
+               configuration.setString(ConfigConstants.STATE_BACKEND, 
"filesystem");
+               
configuration.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, 
tempDirectory.getPath());
 
                ForkableFlinkMiniCluster cluster = new 
ForkableFlinkMiniCluster(configuration);
 
                try {
                        cluster.start();
 
-                       for(int i = 0; i < numJMs; i++) {
+                       for (int i = 0; i < numJMs; i++) {
                                ActorGateway leadingJM = 
cluster.getLeaderGateway(timeout);
 
                                
cluster.waitForTaskManagersToBeRegisteredAtJobManager(leadingJM.actor());
@@ -86,7 +113,8 @@ public class ZooKeeperLeaderElectionITCase extends 
TestLogger {
                                cluster.clearLeader();
                                leadingJM.tell(PoisonPill.getInstance());
                        }
-               } finally {
+               }
+               finally {
                        cluster.stop();
                }
        }
@@ -110,6 +138,13 @@ public class ZooKeeperLeaderElectionITCase extends 
TestLogger {
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
                
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTM);
+               configuration.setString(ConfigConstants.STATE_BACKEND, 
"filesystem");
+               
configuration.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, 
tempDirectory.getPath());
+
+               // @TODO @tillrohrmann temporary "disable" recovery, because 
currently the client does
+               // not need to resubmit a failed job to a new leader. Should we 
keep this test and
+               // disable recovery fully or will this be subsumed by the real 
client changes anyways?
+               
configuration.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, 
timeout.toString());
 
                Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
 
@@ -152,7 +187,7 @@ public class ZooKeeperLeaderElectionITCase extends 
TestLogger {
                        thread.start();
 
                        // Kill all JobManager except for two
-                       for(int i = 0; i < numJMs - 2; i++) {
+                       for (int i = 0; i < numJMs - 2; i++) {
                                ActorGateway jm = 
cluster.getLeaderGateway(timeout);
 
                                
cluster.waitForTaskManagersToBeRegisteredAtJobManager(jm.actor());
@@ -184,17 +219,18 @@ public class ZooKeeperLeaderElectionITCase extends 
TestLogger {
 
                        thread.join(timeout.toMillis());
 
-                       if(thread.isAlive()) {
+                       if (thread.isAlive()) {
                                jobSubmission.finished = true;
                                fail("The job submission thread did not stop 
(meaning it did not succeeded in" +
                                                "executing the test job.");
                        }
-               } finally {
+               }
+               finally {
                        if (clientActorSystem != null) {
                                
cluster.shutdownJobClientActorSystem(clientActorSystem);
                        }
 
-                       if(thread != null && thread.isAlive() && jobSubmission 
!= null) {
+                       if (thread != null && thread.isAlive() && jobSubmission 
!= null) {
                                jobSubmission.finished = true;
                        }
                        cluster.stop();
@@ -219,7 +255,7 @@ public class ZooKeeperLeaderElectionITCase extends 
TestLogger {
 
                @Override
                public void run() {
-                       while(!finished) {
+                       while (!finished) {
                                try {
                                        LeaderRetrievalService lrService =
                                                        
LeaderRetrievalUtils.createLeaderRetrievalService(
@@ -240,11 +276,14 @@ public class ZooKeeperLeaderElectionITCase extends 
TestLogger {
                                                        
getClass().getClassLoader());
 
                                        finished = true;
-                               } catch (JobExecutionException e) {
+                               }
+                               catch (JobExecutionException e) {
                                        // This was expected, so just try again 
to submit the job
-                               } catch (LeaderRetrievalException e) {
+                               }
+                               catch (LeaderRetrievalException e) {
                                        // This can also happen, so just try 
again to submit the job
-                               } catch (Exception e) {
+                               }
+                               catch (Exception e) {
                                        // This was not expected... fail the 
test case
                                        e.printStackTrace();
                                        fail("Caught unexpected exception in 
job submission test case.");

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 61eb6a5..4ada21e 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -29,8 +29,9 @@ import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.{Configuration => FlinkConfiguration, 
ConfigConstants}
 import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.jobgraph.JobStatus
-import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, 
CurrentJobStatus,
 JobNotFound}
@@ -88,7 +89,9 @@ class YarnJobManager(
     delayBetweenRetries: Long,
     timeout: FiniteDuration,
     mode: StreamingMode,
-    leaderElectionService: LeaderElectionService)
+    leaderElectionService: LeaderElectionService,
+    submittedJobGraphs : SubmittedJobGraphStore,
+    checkpointRecoveryFactory : CheckpointRecoveryFactory)
   extends JobManager(
     flinkConfiguration,
     executionContext,
@@ -100,7 +103,9 @@ class YarnJobManager(
     delayBetweenRetries,
     timeout,
     mode,
-    leaderElectionService) {
+    leaderElectionService,
+    submittedJobGraphs,
+    checkpointRecoveryFactory) {
 
   import context._
   import scala.collection.JavaConverters._

Reply via email to