http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
index 837b643..cd40c82 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
@@ -26,10 +26,10 @@ import 
org.apache.flink.runtime.io.network.api.reader.RecordReader;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -38,6 +38,7 @@ import 
org.apache.flink.runtime.messages.JobManagerMessages.CurrentJobStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages.JobNotFound;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
 import org.apache.flink.types.IntValue;
 import org.junit.Test;
 import scala.concurrent.Await;
@@ -48,7 +49,6 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import static 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
 import static 
org.apache.flink.runtime.messages.JobManagerMessages.RequestJobStatus;
-import static org.junit.Assert.fail;
 
 public class TaskCancelTest {
 
@@ -109,30 +109,21 @@ public class TaskCancelTest {
 
                        // Wait for the job to make some progress and then 
cancel
                        awaitRunning(
-                               
flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
-                               jobGraph.getJobID(),
-                               TestingUtils.TESTING_DURATION());
+                                       
flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
+                                       jobGraph.getJobID(),
+                                       TestingUtils.TESTING_DURATION());
 
                        Thread.sleep(5000);
 
                        cancelJob(
-                               
flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
-                               jobGraph.getJobID(),
-                               TestingUtils.TESTING_DURATION());
+                                       
flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
+                                       jobGraph.getJobID(),
+                                       TestingUtils.TESTING_DURATION());
 
                        // Wait for the job to be cancelled
-                       JobStatus status = awaitTermination(
-                               
flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
-                               jobGraph.getJobID(),
-                               TestingUtils.TESTING_DURATION());
-
-                       if (status == JobStatus.CANCELED) {
-                               // Expected :-) All is swell.
-                       }
-                       else {
-                               fail("The job finished with unexpected terminal 
state " + status + ". "
-                                               + "This indicates that there is 
a bug in the task cancellation.");
-                       }
+                       
JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), 
JobStatus.CANCELED,
+                                       
flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
+                                       TestingUtils.TESTING_DURATION());
                }
                finally {
                        if (flink != null) {
@@ -224,42 +215,6 @@ public class TaskCancelTest {
 
        }
 
-       private JobStatus awaitTermination(ActorGateway jobManager, JobID 
jobId, FiniteDuration timeout)
-                       throws Exception {
-
-               checkNotNull(jobManager);
-               checkNotNull(jobId);
-               checkNotNull(timeout);
-
-               while (true) {
-                       Future<Object> ask = jobManager.ask(
-                                       new RequestJobStatus(jobId),
-                                       timeout);
-
-                       Object result = Await.result(ask, timeout);
-
-                       if (result instanceof CurrentJobStatus) {
-                               // Success
-                               CurrentJobStatus status = (CurrentJobStatus) 
result;
-
-                               if (!status.jobID().equals(jobId)) {
-                                       throw new Exception("JobManager 
responded for wrong job ID. Request: "
-                                                       + jobId + ", response: 
" + status.jobID() + ".");
-                               }
-
-                               if (status.status().isTerminalState()) {
-                                       return status.status();
-                               }
-                       }
-                       else if (result instanceof JobNotFound) {
-                               throw new Exception("Cannot find job with ID " 
+ jobId + ".");
-                       }
-                       else {
-                               throw new Exception("Unexpected response to 
cancel request: " + result);
-                       }
-               }
-       }
-
        // 
---------------------------------------------------------------------------------------------
 
        public static class InfiniteSource extends AbstractInvokable {

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index 61b1f7a..069b6af 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -18,26 +18,17 @@
 
 package org.apache.flink.runtime.testutils;
 
-import static org.junit.Assert.fail;
+import org.apache.flink.runtime.util.FileUtils;
 
-import java.io.BufferedWriter;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.InputStream;
 import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import java.util.UUID;
 
 /**
  * This class contains auxiliary methods for unit tests.
@@ -77,6 +68,18 @@ public class CommonTestUtils {
        }
 
        /**
+        * Create a temporary log4j configuration for the test.
+        */
+       public static File createTemporaryLog4JProperties() throws IOException {
+               File log4jProps = 
File.createTempFile(FileUtils.getRandomFilename(""), "-log4j" +
+                               ".properties");
+               log4jProps.deleteOnExit();
+               CommonTestUtils.printLog4jDebugConfig(log4jProps);
+
+               return log4jProps;
+       }
+
+       /**
         * Tries to get the java executable command with which the current JVM 
was started.
         * Returns null, if the command could not be found.
         *
@@ -152,4 +155,50 @@ public class CommonTestUtils {
                        fw.close();
                }
        }
+
+       public static File createTempDirectory() throws IOException {
+               File tempDir = new File(System.getProperty("java.io.tmpdir"));
+
+               for (int i = 0; i < 10; i++) {
+                       File dir = new File(tempDir, 
UUID.randomUUID().toString());
+                       if (!dir.exists() && dir.mkdirs()) {
+                               return dir;
+                       }
+                       System.err.println("Could not use temporary directory " 
+ dir.getAbsolutePath());
+               }
+
+               throw new IOException("Could not create temporary file 
directory");
+       }
+
+       /**
+        * Utility class to read the output of a process stream and forward it 
into a StringWriter.
+        */
+       public static class PipeForwarder extends Thread {
+
+               private final StringWriter target;
+               private final InputStream source;
+
+               public PipeForwarder(InputStream source, StringWriter target) {
+                       super("Pipe Forwarder");
+                       setDaemon(true);
+
+                       this.source = source;
+                       this.target = target;
+
+                       start();
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               int next;
+                               while ((next = source.read()) != -1) {
+                                       target.write(next);
+                               }
+                       }
+                       catch (IOException e) {
+                               // terminate
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java
new file mode 100644
index 0000000..66e1d9b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerActorTestUtils.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages.CurrentJobStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages.JobStatusResponse;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.flink.runtime.messages.JobManagerMessages.JobNotFound;
+import static 
org.apache.flink.runtime.messages.JobManagerMessages.RequestJobStatus;
+import static 
org.apache.flink.runtime.messages.JobManagerMessages.getRequestJobStatus;
+import static 
org.apache.flink.runtime.messages.JobManagerMessages.getRequestNumberRegisteredTaskManager;
+
+/**
+ * JobManager actor test utilities.
+ *
+ * <p>If you are using a {@link TestingJobManager} most of these are not 
needed.
+ */
+public class JobManagerActorTestUtils {
+
+       /**
+        * Waits for the expected {@link JobStatus}.
+        *
+        * <p>Repeatedly queries the JobManager via {@link RequestJobStatus} 
messages.
+        *
+        * @param jobId             Job ID of the job to wait for
+        * @param expectedJobStatus Expected job status
+        * @param jobManager        Job manager actor to ask
+        * @param timeout           Timeout after which the operation fails
+        * @throws Exception If the job is not found within the timeout or the 
job is in another state.
+        */
+       public static void waitForJobStatus(
+                       JobID jobId,
+                       JobStatus expectedJobStatus,
+                       ActorGateway jobManager,
+                       FiniteDuration timeout) throws Exception {
+
+               checkNotNull(jobId, "Job ID");
+               checkNotNull(expectedJobStatus, "Expected job status");
+               checkNotNull(jobManager, "Job manager");
+               checkNotNull(timeout, "Timeout");
+
+               final Deadline deadline = timeout.fromNow();
+
+               while (deadline.hasTimeLeft()) {
+                       // Request the job status
+                       JobStatusResponse response = requestJobStatus(jobId, 
jobManager, deadline.timeLeft());
+
+                       // Found the job
+                       if (response instanceof CurrentJobStatus) {
+                               JobStatus jobStatus = ((CurrentJobStatus) 
response).status();
+
+                               // OK, that's what we were waiting for
+                               if (jobStatus == expectedJobStatus) {
+                                       return;
+                               }
+                               else if (jobStatus.isTerminalState()) {
+                                       throw new IllegalStateException("Job is 
in terminal state " + jobStatus + ", "
+                                                       + "but was waiting for 
" + expectedJobStatus + ".");
+                               }
+                       }
+                       // Did not find the job... retry
+                       else if (response instanceof JobNotFound) {
+                               Thread.sleep(Math.min(100, 
deadline.timeLeft().toMillis()));
+                       }
+                       else {
+                               throw new IllegalStateException("Unexpected 
response.");
+                       }
+               }
+
+               throw new IllegalStateException("Job not found within 
deadline.");
+       }
+
+       /**
+        * Request a {@link JobStatusResponse}.
+        *
+        * @param jobId      Job ID of the job to request the status of
+        * @param jobManager Job manager actor to ask
+        * @param timeout    Timeout after which the operation fails
+        * @return The {@link JobStatusResponse} from the job manager
+        * @throws Exception If there is no answer within the timeout.
+        */
+       public static JobStatusResponse requestJobStatus(
+                       JobID jobId,
+                       ActorGateway jobManager,
+                       FiniteDuration timeout) throws Exception {
+
+               checkNotNull(jobId, "Job ID");
+               checkNotNull(jobManager, "Job manager");
+               checkNotNull(timeout, "Timeout");
+
+               // Ask the JobManager
+               RequestJobStatus request = (RequestJobStatus) 
getRequestJobStatus(jobId);
+               Future<Object> ask = jobManager.ask(request, timeout);
+               Object response = Await.result(ask, timeout);
+
+               if (response instanceof JobStatusResponse) {
+                       return (JobStatusResponse) response;
+               }
+
+               throw new IllegalStateException("Unexpected response.");
+       }
+
+       /**
+        * Waits for a minimum number of task managers to connect to the job 
manager.
+        *
+        * @param minimumNumberOfTaskManagers Minimum number of task managers 
to wait for
+        * @param jobManager                  Job manager actor to ask
+        * @param timeout                     Timeout after which the operation 
fails
+        * @throws Exception If the task managers don't connection with the 
timeout.
+        */
+       public static void waitForTaskManagers(
+                       int minimumNumberOfTaskManagers,
+                       ActorGateway jobManager,
+                       FiniteDuration timeout) throws Exception {
+
+               checkArgument(minimumNumberOfTaskManagers >= 1);
+               checkNotNull(jobManager, "Job manager");
+               checkNotNull(timeout, "Timeout");
+
+               final Deadline deadline = timeout.fromNow();
+
+               while (deadline.hasTimeLeft()) {
+                       Future<Object> ask = 
jobManager.ask(getRequestNumberRegisteredTaskManager(),
+                                       deadline.timeLeft());
+
+                       Integer response = (Integer) Await.result(ask, 
deadline.timeLeft());
+
+                       // All are connected. We are done.
+                       if (response >= minimumNumberOfTaskManagers) {
+                               return;
+                       }
+                       // Waiting for more... retry
+                       else {
+                               Thread.sleep(Math.min(100, 
deadline.timeLeft().toMillis()));
+                       }
+               }
+
+               throw new IllegalStateException("Task managers not connected 
within deadline.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
new file mode 100644
index 0000000..85b768d
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.JobManagerMode;
+import org.apache.flink.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A {@link JobManager} instance running in a separate JVM.
+ */
+public class JobManagerProcess extends TestJvmProcess {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(JobManagerProcess.class);
+
+       /** ID for this JobManager */
+       private final int id;
+
+       /** The port the JobManager listens on */
+       private final int jobManagerPort;
+
+       /** The configuration for the JobManager */
+       private final Configuration config;
+
+       /** Configuration parsed as args for {@link 
JobManagerProcess.JobManagerProcessEntryPoint} */
+       private final String[] jvmArgs;
+
+       private ActorRef jobManagerRef;
+
+       /**
+        * Creates a {@link JobManager} running in a separate JVM.
+        *
+        * <p>See {@link #JobManagerProcess(int, Configuration, int)} for a more
+        * detailed
+        * description.
+        *
+        * @param config Configuration for the job manager process
+        * @throws Exception
+        */
+       public JobManagerProcess(int id, Configuration config) throws Exception 
{
+               this(id, config, 0);
+       }
+
+       /**
+        * Creates a {@link JobManager} running in a separate JVM.
+        *
+        * @param id             ID for the JobManager
+        * @param config         Configuration for the job manager process
+        * @param jobManagerPort Job manager port (if <code>0</code>, pick any 
available port)
+        * @throws Exception
+        */
+       public JobManagerProcess(int id, Configuration config, int 
jobManagerPort) throws Exception {
+               checkArgument(id >= 0, "Negative ID");
+               this.id = id;
+               this.config = checkNotNull(config, "Configuration");
+               this.jobManagerPort = jobManagerPort <= 0 ? 
NetUtils.getAvailablePort() : jobManagerPort;
+
+               ArrayList<String> args = new ArrayList<>();
+               args.add("--port");
+               args.add(String.valueOf(this.jobManagerPort));
+
+               for (Map.Entry<String, String> entry : 
config.toMap().entrySet()) {
+                       args.add("--" + entry.getKey());
+                       args.add(entry.getValue());
+               }
+
+               this.jvmArgs = new String[args.size()];
+               args.toArray(jvmArgs);
+       }
+
+       @Override
+       public String getName() {
+               return "JobManager " + id;
+       }
+
+       @Override
+       public String[] getJvmArgs() {
+               return jvmArgs;
+       }
+
+       @Override
+       public String getEntryPointClassName() {
+               return JobManagerProcessEntryPoint.class.getName();
+       }
+
+       public int getJobManagerPort() {
+               return jobManagerPort;
+       }
+
+       public Configuration getConfig() {
+               return config;
+       }
+
+       /**
+        * Returns the Akka URL of this JobManager.
+        */
+       public String getJobManagerAkkaURL() {
+               return JobManager.getRemoteJobManagerAkkaURL(
+                               new InetSocketAddress("localhost", 
jobManagerPort),
+                               Option.<String>empty());
+       }
+
+       @Override
+       public String toString() {
+               return String.format("JobManagerProcess(id=%d, port=%d)", id, 
jobManagerPort);
+       }
+
+       /**
+        * Waits for the job manager to be reachable.
+        *
+        * <p><strong>Important:</strong> Make sure to set the timeout larger 
than Akka's gating
+        * time. Otherwise, this will effectively not wait for the JobManager 
to startup, because the
+        * time out will fire immediately.
+        *
+        * @param actorSystem Actor system to be used to resolve JobManager 
address.
+        * @param timeout     Timeout (make sure to set larger than Akka's 
gating time).
+        */
+       public ActorRef getActorRef(ActorSystem actorSystem, FiniteDuration 
timeout)
+                       throws Exception {
+
+               if (jobManagerRef != null) {
+                       return jobManagerRef;
+               }
+
+               checkNotNull(actorSystem, "Actor system");
+
+               // Deadline passes timeout ms
+               Deadline deadline = timeout.fromNow();
+
+               while (deadline.hasTimeLeft()) {
+                       try {
+                               // If the Actor is not reachable yet, this 
throws an Exception. Retry until the
+                               // deadline passes.
+                               this.jobManagerRef = AkkaUtils.getActorRef(
+                                               getJobManagerAkkaURL(),
+                                               actorSystem,
+                                               deadline.timeLeft());
+
+                               return jobManagerRef;
+                       }
+                       catch (Throwable ignored) {
+                               // Retry
+                               Thread.sleep(Math.min(100, 
deadline.timeLeft().toMillis()));
+                       }
+               }
+
+               throw new IllegalStateException("JobManager did not start up 
within " + timeout + ".");
+       }
+
+       /**
+        * Entry point for the JobManager process.
+        */
+       public static class JobManagerProcessEntryPoint {
+
+               private static final Logger LOG = 
LoggerFactory.getLogger(JobManagerProcessEntryPoint.class);
+
+               /**
+                * Runs the JobManager process in {@link 
JobManagerMode#CLUSTER} and {@link
+                * StreamingMode#STREAMING} (can handle both batch and 
streaming jobs).
+                *
+                * <p><strong>Required argument</strong>: <code>port</code>. 
Start the process with
+                * <code>--port PORT</code>.
+                *
+                * <p>Other arguments are parsed to a {@link Configuration} and 
passed to the
+                * JobManager, for instance: <code>--recovery.mode ZOOKEEPER 
--ha.zookeeper.quorum
+                * "xyz:123:456"</code>.
+                */
+               public static void main(String[] args) {
+                       try {
+                               ParameterTool params = 
ParameterTool.fromArgs(args);
+                               final int port = 
Integer.valueOf(params.getRequired("port"));
+                               LOG.info("Running on port {}.", port);
+
+                               Configuration config = 
params.getConfiguration();
+                               LOG.info("Configuration: {}.", config);
+
+                               // Run the JobManager
+                               JobManager.runJobManager(config, 
JobManagerMode.CLUSTER, StreamingMode.STREAMING,
+                                               "localhost", port);
+
+                               // Run forever. Forever, ever? Forever, ever!
+                               new CountDownLatch(1).await();
+                       }
+                       catch (Throwable t) {
+                               LOG.error("Failed to start JobManager process", 
t);
+                               System.exit(1);
+                       }
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
new file mode 100644
index 0000000..f683c55
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A {@link TaskManager} instance running in a separate JVM.
+ */
+public class TaskManagerProcess extends TestJvmProcess {
+
+       /** ID for this TaskManager */
+       private final int id;
+
+       /** The configuration for the TaskManager */
+       private final Configuration config;
+
+       /** Configuration parsed as args for {@link 
TaskManagerProcess.TaskManagerProcessEntryPoint} */
+       private final String[] jvmArgs;
+
+       public TaskManagerProcess(int id, Configuration config) throws 
Exception {
+               checkArgument(id >= 0, "Negative ID");
+               this.id = id;
+               this.config = checkNotNull(config, "Configuration");
+
+               ArrayList<String> args = new ArrayList<>();
+
+               for (Map.Entry<String, String> entry : 
config.toMap().entrySet()) {
+                       args.add("--" + entry.getKey());
+                       args.add(entry.getValue());
+               }
+
+               this.jvmArgs = new String[args.size()];
+               args.toArray(jvmArgs);
+       }
+
+       @Override
+       public String getName() {
+               return "TaskManager " + id;
+       }
+
+       @Override
+       public String[] getJvmArgs() {
+               return jvmArgs;
+       }
+
+       @Override
+       public String getEntryPointClassName() {
+               return TaskManagerProcessEntryPoint.class.getName();
+       }
+
+       public int getId() {
+               return id;
+       }
+
+       @Override
+       public String toString() {
+               return String.format("TaskManagerProcess(id=%d)", id);
+       }
+
+       /**
+        * Entry point for the TaskManager process.
+        */
+       public static class TaskManagerProcessEntryPoint {
+
+               private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);
+
+               /**
+                * Runs the JobManager process in {@link 
StreamingMode#STREAMING} (can handle both batch
+                * and streaming jobs).
+                *
+                * <p>All arguments are parsed to a {@link Configuration} and 
passed to the Taskmanager,
+                * for instance: <code>--recovery.mode ZOOKEEPER 
--ha.zookeeper.quorum "xyz:123:456"</code>.
+                */
+               public static void main(String[] args) throws Exception {
+                       try {
+                               Configuration config = 
ParameterTool.fromArgs(args).getConfiguration();
+
+                               if 
(!config.containsKey(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY)) {
+                                       
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+                               }
+
+                               if 
(!config.containsKey(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)) {
+                                       
config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+                               }
+
+
+                               LOG.info("Configuration: {}.", config);
+
+                               // Run the TaskManager
+                               
TaskManager.selectNetworkInterfaceAndRunTaskManager(
+                                               config, 
StreamingMode.STREAMING, TaskManager.class);
+
+                               // Run forever
+                               new CountDownLatch(1).await();
+                       }
+                       catch (Throwable t) {
+                               LOG.error("Failed to start TaskManager 
process", t);
+                               System.exit(1);
+                       }
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
new file mode 100644
index 0000000..0920b5c
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Arrays;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.createTemporaryLog4JProperties;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+import static org.junit.Assert.fail;
+
+/**
+ * A {@link Process} running a separate JVM.
+ */
+public abstract class TestJvmProcess {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(TestJvmProcess.class);
+
+       /** Lock to guard {@link #createAndStart()} and {@link #destroy()} 
calls. */
+       private final Object createDestroyLock = new Object();
+
+       /** The java command path */
+       private final String javaCommandPath;
+
+       /** The log4j configuration path. */
+       private final String log4jConfigFilePath;
+
+       /** Shutdown hook for resource cleanup */
+       private final Thread shutdownHook;
+
+       /** JVM process memory (set for both '-Xms' and '-Xmx'). */
+       private int jvmMemoryInMb = 80;
+
+       /** The JVM process */
+       private Process process;
+
+       /** Writer for the process output */
+       private volatile StringWriter processOutput;
+
+       public TestJvmProcess() throws Exception {
+               this(getJavaCommandPath(), 
createTemporaryLog4JProperties().getPath());
+       }
+
+       public TestJvmProcess(String javaCommandPath, String 
log4jConfigFilePath) {
+               this.javaCommandPath = checkNotNull(javaCommandPath, "Java 
command path");
+               this.log4jConfigFilePath = checkNotNull(log4jConfigFilePath, 
"log4j config file path");
+
+               this.shutdownHook = new Thread(new Runnable() {
+                       @Override
+                       public void run() {
+                               try {
+                                       destroy();
+                               }
+                               catch (Throwable t) {
+                                       LOG.error("Error during process cleanup 
shutdown hook.", t);
+                               }
+                       }
+               });
+       }
+
+       /**
+        * Returns the name of the process.
+        */
+       public abstract String getName();
+
+       /**
+        * Returns the arguments to the JVM.
+        *
+        * <p>These can be parsed by the main method of the entry point class.
+        */
+       public abstract String[] getJvmArgs();
+
+       /**
+        * Returns the name of the class to run.
+        *
+        * <p>Arguments to the main method can be specified via {@link 
#getJvmArgs()}.
+        */
+       public abstract String getEntryPointClassName();
+
+       // 
---------------------------------------------------------------------------------------------
+
+       /**
+        * Sets the memory for the process (<code>-Xms</code> and 
<code>-Xmx</code> flags) (>= 80).
+        *
+        * @param jvmMemoryInMb Amount of memory in Megabytes for the JVM (>= 
80).
+        */
+       public void setJVMMemory(int jvmMemoryInMb) {
+               checkArgument(jvmMemoryInMb >= 80, "JobManager JVM Requires at 
least 80 MBs of memory.");
+               this.jvmMemoryInMb = jvmMemoryInMb;
+       }
+
+       /**
+        * Creates and starts the {@link Process}.
+        *
+        * <strong>Important:</strong> Don't forget to call {@link #destroy()} 
to prevent
+        * resource leaks. The created process will be child process and is not 
guaranteed to
+        * terminate when the parent process terminates.
+        */
+       public void createAndStart() throws IOException {
+               String[] cmd = new String[] {
+                               javaCommandPath,
+                               "-Dlog.level=DEBUG",
+                               "-Dlog4j.configuration=file:" + 
log4jConfigFilePath,
+                               "-Xms" + jvmMemoryInMb + "m",
+                               "-Xmx" + jvmMemoryInMb + "m",
+                               "-classpath", getCurrentClasspath(),
+                               getEntryPointClassName() };
+
+               String[] jvmArgs = getJvmArgs();
+
+               if (jvmArgs != null && jvmArgs.length > 0) {
+                       cmd = ArrayUtils.addAll(cmd, jvmArgs);
+               }
+
+               synchronized (createDestroyLock) {
+                       if (process == null) {
+                               LOG.debug("Running command '{}'.", 
Arrays.toString(cmd));
+                               this.process = new ProcessBuilder(cmd).start();
+
+                               // Forward output
+                               this.processOutput = new StringWriter();
+                               new 
CommonTestUtils.PipeForwarder(process.getErrorStream(), processOutput);
+
+                               try {
+                                       // Add JVM shutdown hook to call 
shutdown of service
+                                       
Runtime.getRuntime().addShutdownHook(shutdownHook);
+                               }
+                               catch (IllegalStateException ignored) {
+                                       // JVM is already shutting down. No 
need to do this.
+                               }
+                               catch (Throwable t) {
+                                       LOG.error("Cannot register process 
cleanup shutdown hook.", t);
+                               }
+                       }
+                       else {
+                               throw new IllegalStateException("Already 
running.");
+                       }
+               }
+       }
+
+       public void printProcessLog() {
+               if (processOutput == null) {
+                       throw new IllegalStateException("Not started");
+               }
+
+               System.out.println("-----------------------------------------");
+               System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + 
getName());
+               System.out.println("-----------------------------------------");
+
+               String out = processOutput.toString();
+               if (out == null || out.length() == 0) {
+                       System.out.println("(EMPTY)");
+               }
+               else {
+                       System.out.println(out);
+               }
+
+               System.out.println("-----------------------------------------");
+               System.out.println("            END SPAWNED PROCESS LOG " + 
getName());
+               System.out.println("-----------------------------------------");
+       }
+
+       public void destroy() {
+               synchronized (createDestroyLock) {
+                       if (process != null) {
+                               LOG.debug("Destroying " + getName() + " 
process.");
+
+                               try {
+                                       process.destroy();
+                               }
+                               catch (Throwable t) {
+                                       LOG.error("Error while trying to 
destroy process.", t);
+                               }
+                               finally {
+                                       process = null;
+
+                                       if (shutdownHook != null && 
shutdownHook != Thread.currentThread()) {
+                                               try {
+                                                       
Runtime.getRuntime().removeShutdownHook(shutdownHook);
+                                               }
+                                               catch (IllegalStateException 
ignored) {
+                                                       // JVM is in shutdown 
already, we can safely ignore this.
+                                               }
+                                               catch (Throwable t) {
+                                                       LOG.warn("Exception 
while unregistering prcess cleanup shutdown hook.");
+                                               }
+                                       }
+                               }
+                       }
+               }
+       }
+
+       // 
---------------------------------------------------------------------------------------------
+       // File based synchronization utilities
+       // 
---------------------------------------------------------------------------------------------
+
+       public static void touchFile(File file) throws IOException {
+               if (!file.exists()) {
+                       new FileOutputStream(file).close();
+               }
+               if (!file.setLastModified(System.currentTimeMillis())) {
+                       throw new IOException("Could not touch the file.");
+               }
+       }
+
+       public static void waitForMarkerFiles(File basedir, String prefix, int 
num, long timeout) {
+               long now = System.currentTimeMillis();
+               final long deadline = now + timeout;
+
+
+               while (now < deadline) {
+                       boolean allFound = true;
+
+                       for (int i = 0; i < num; i++) {
+                               File nextToCheck = new File(basedir, prefix + 
i);
+                               if (!nextToCheck.exists()) {
+                                       allFound = false;
+                                       break;
+                               }
+                       }
+
+                       if (allFound) {
+                               return;
+                       }
+                       else {
+                               // not all found, wait for a bit
+                               try {
+                                       Thread.sleep(10);
+                               }
+                               catch (InterruptedException e) {
+                                       throw new RuntimeException(e);
+                               }
+
+                               now = System.currentTimeMillis();
+                       }
+               }
+
+               fail("The tasks were not started within time (" + timeout + 
"msecs)");
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
new file mode 100644
index 0000000..d2e5b6a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * ZooKeeper test utilities.
+ */
+public class ZooKeeperTestUtils {
+
+       /**
+        * Creates a configuration to operate in {@link RecoveryMode#ZOOKEEPER}.
+        *
+        * @param zooKeeperQuorum   ZooKeeper quorum to connect to
+        * @param fsStateHandlePath Base path for file system state backend 
(for checkpoints and
+        *                          recovery)
+        * @return A new configuration to operate in {@link 
RecoveryMode#ZOOKEEPER}.
+        */
+       public static Configuration createZooKeeperRecoveryModeConfig(
+                       String zooKeeperQuorum, String fsStateHandlePath) {
+
+               return setZooKeeperRecoveryMode(new Configuration(), 
zooKeeperQuorum, fsStateHandlePath);
+       }
+
+       /**
+        * Sets all necessary configuration keys to operate in {@link 
RecoveryMode#ZOOKEEPER}.
+        *
+        * @param config            Configuration to use
+        * @param zooKeeperQuorum   ZooKeeper quorum to connect to
+        * @param fsStateHandlePath Base path for file system state backend 
(for checkpoints and
+        *                          recovery)
+        * @return The modified configuration to operate in {@link 
RecoveryMode#ZOOKEEPER}.
+        */
+       public static Configuration setZooKeeperRecoveryMode(
+                       Configuration config,
+                       String zooKeeperQuorum,
+                       String fsStateHandlePath) {
+
+               checkNotNull(config, "Configuration");
+               checkNotNull(zooKeeperQuorum, "ZooKeeper quorum");
+               checkNotNull(fsStateHandlePath, "File state handle backend 
path");
+
+               // Web frontend, you have been dismissed. Sorry.
+               config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1);
+
+               // ZooKeeper recovery mode
+               config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
+               config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, 
zooKeeperQuorum);
+
+               int connTimeout = 5000;
+               if (System.getenv().get("CI") != null) {
+                       // The regular timeout is to aggressive for Travis and 
connections are often lost.
+                       connTimeout = 20000;
+               }
+
+               config.setInteger(ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT, 
connTimeout);
+               config.setInteger(ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT, 
connTimeout);
+
+               // File system state backend
+               config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+               config.setString(ConfigConstants.STATE_BACKEND_FS_DIR, 
fsStateHandlePath + "/checkpoints");
+               
config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, 
fsStateHandlePath + "/recovery");
+
+               // Akka failure detection and execution retries
+               config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 
"1000 ms");
+               config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 
s");
+               config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
+               
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "10 s");
+
+               return config;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
new file mode 100644
index 0000000..f0130ec
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
@@ -0,0 +1,591 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLogger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for basic {@link ZooKeeperStateHandleStore} behaviour.
+ *
+ * <p> Tests include:
+ * <ul>
+ * <li>Expected usage of operations</li>
+ * <li>Correct ordering of ZooKeeper and state handle operations</li>
+ * </ul>
+ */
+public class ZooKeeperStateHandleStoreITCase extends TestLogger {
+
+       private final static ZooKeeperTestEnvironment ZooKeeper = new 
ZooKeeperTestEnvironment(1);
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               if (ZooKeeper != null) {
+                       ZooKeeper.shutdown();
+               }
+       }
+
+       @Before
+       public void cleanUp() throws Exception {
+               ZooKeeper.deleteAll();
+       }
+
+       /**
+        * Tests add operation with default {@link CreateMode}.
+        */
+       @Test
+       public void testAdd() throws Exception {
+               // Setup
+               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZooKeeper.getClient(), stateHandleProvider);
+
+               // Config
+               final String pathInZooKeeper = "/testAdd";
+               final Long state = 1239712317L;
+
+               // Test
+               store.add(pathInZooKeeper, state);
+
+               // Verify
+               // State handle created
+               assertEquals(1, stateHandleProvider.getStateHandles().size());
+               assertEquals(state, 
stateHandleProvider.getStateHandles().get(0).getState(null));
+
+               // Path created and is persistent
+               Stat stat = 
ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
+               assertNotNull(stat);
+               assertEquals(0, stat.getEphemeralOwner());
+
+               // Data is equal
+               @SuppressWarnings("unchecked")
+               Long actual = ((StateHandle<Long>) 
InstantiationUtil.deserializeObject(
+                               
ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
+                               
ClassLoader.getSystemClassLoader())).getState(null);
+
+               assertEquals(state, actual);
+       }
+
+       /**
+        * Tests that {@link CreateMode} is respected.
+        */
+       @Test
+       public void testAddWithCreateMode() throws Exception {
+               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZooKeeper.getClient(), stateHandleProvider);
+
+               // Config
+               Long state = 3457347234L;
+
+               CreateMode[] modes = CreateMode.values();
+               for (int i = 0; i < modes.length; i++) {
+                       CreateMode mode = modes[i];
+                       state += i;
+
+                       String pathInZooKeeper = "/testAddWithCreateMode" + 
mode.name();
+
+                       // Test
+                       store.add(pathInZooKeeper, state, mode);
+
+                       if (mode.isSequential()) {
+                               // Figure out the sequential ID
+                               List<String> paths = 
ZooKeeper.getClient().getChildren().forPath("/");
+                               for (String p : paths) {
+                                       if 
(p.startsWith("testAddWithCreateMode" + mode.name())) {
+                                               pathInZooKeeper = "/" + p;
+                                               break;
+                                       }
+                               }
+                       }
+
+                       // Verify
+                       // State handle created
+                       assertEquals(i + 1, 
stateHandleProvider.getStateHandles().size());
+                       assertEquals(state, 
stateHandleProvider.getStateHandles().get(i).getState(null));
+
+                       // Path created
+                       Stat stat = 
ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
+
+                       assertNotNull(stat);
+
+                       // Is ephemeral or persistent
+                       if (mode.isEphemeral()) {
+                               assertTrue(stat.getEphemeralOwner() != 0);
+                       }
+                       else {
+                               assertEquals(0, stat.getEphemeralOwner());
+                       }
+
+                       // Data is equal
+                       @SuppressWarnings("unchecked")
+                       Long actual = ((StateHandle<Long>) 
InstantiationUtil.deserializeObject(
+                                       
ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
+                                       
ClassLoader.getSystemClassLoader())).getState(null);
+
+                       assertEquals(state, actual);
+               }
+       }
+
+       /**
+        * Tests that an existing path throws an Exception.
+        */
+       @Test(expected = Exception.class)
+       public void testAddAlreadyExistingPath() throws Exception {
+               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZooKeeper.getClient(), stateHandleProvider);
+
+               
ZooKeeper.getClient().create().forPath("/testAddAlreadyExistingPath");
+
+               store.add("/testAddAlreadyExistingPath", 1L);
+       }
+
+       /**
+        * Tests that the created state handle is discarded if ZooKeeper create 
fails.
+        */
+       @Test
+       public void testAddDiscardStateHandleAfterFailure() throws Exception {
+               // Setup
+               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+
+               CuratorFramework client = spy(ZooKeeper.getClient());
+               when(client.create()).thenThrow(new RuntimeException("Expected 
test Exception."));
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               client, stateHandleProvider);
+
+               // Config
+               final String pathInZooKeeper = 
"/testAddDiscardStateHandleAfterFailure";
+               final Long state = 81282227L;
+
+               try {
+                       // Test
+                       store.add(pathInZooKeeper, state);
+                       fail("Did not throw expected exception");
+               }
+               catch (Exception ignored) {
+               }
+
+               // Verify
+               // State handle created and discarded
+               assertEquals(1, stateHandleProvider.getStateHandles().size());
+               assertEquals(state, 
stateHandleProvider.getStateHandles().get(0).getState(null));
+               assertEquals(1, 
stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls());
+       }
+
+       /**
+        * Tests that a state handle is replaced.
+        */
+       @Test
+       public void testReplace() throws Exception {
+               // Setup
+               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZooKeeper.getClient(), stateHandleProvider);
+
+               // Config
+               final String pathInZooKeeper = "/testReplace";
+               final Long initialState = 30968470898L;
+               final Long replaceState = 88383776661L;
+
+               // Test
+               store.add(pathInZooKeeper, initialState);
+               store.replace(pathInZooKeeper, 0, replaceState);
+
+               // Verify
+               // State handles created
+               assertEquals(2, stateHandleProvider.getStateHandles().size());
+               assertEquals(initialState, 
stateHandleProvider.getStateHandles().get(0).getState(null));
+               assertEquals(replaceState, 
stateHandleProvider.getStateHandles().get(1).getState(null));
+
+               // Path created and is persistent
+               Stat stat = 
ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
+               assertNotNull(stat);
+               assertEquals(0, stat.getEphemeralOwner());
+
+               // Data is equal
+               @SuppressWarnings("unchecked")
+               Long actual = ((StateHandle<Long>) 
InstantiationUtil.deserializeObject(
+                               
ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
+                               
ClassLoader.getSystemClassLoader())).getState(null);
+
+               assertEquals(replaceState, actual);
+       }
+
+       /**
+        * Tests that a non existing path throws an Exception.
+        */
+       @Test(expected = Exception.class)
+       public void testReplaceNonExistingPath() throws Exception {
+               StateHandleProvider<Long> stateHandleProvider = new 
LongStateHandleProvider();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZooKeeper.getClient(), stateHandleProvider);
+
+               store.replace("/testReplaceNonExistingPath", 0, 1L);
+       }
+
+       /**
+        * Tests that the replace state handle is discarded if ZooKeeper 
setData fails.
+        */
+       @Test
+       public void testReplaceDiscardStateHandleAfterFailure() throws 
Exception {
+               // Setup
+               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+
+               CuratorFramework client = spy(ZooKeeper.getClient());
+               when(client.setData()).thenThrow(new RuntimeException("Expected 
test Exception."));
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               client, stateHandleProvider);
+
+               // Config
+               final String pathInZooKeeper = 
"/testReplaceDiscardStateHandleAfterFailure";
+               final Long initialState = 30968470898L;
+               final Long replaceState = 88383776661L;
+
+               // Test
+               store.add(pathInZooKeeper, initialState);
+
+               try {
+                       store.replace(pathInZooKeeper, 0, replaceState);
+                       fail("Did not throw expected exception");
+               }
+               catch (Exception ignored) {
+               }
+
+               // Verify
+               // State handle created and discarded
+               assertEquals(2, stateHandleProvider.getStateHandles().size());
+               assertEquals(initialState, 
stateHandleProvider.getStateHandles().get(0).getState(null));
+               assertEquals(replaceState, 
stateHandleProvider.getStateHandles().get(1).getState(null));
+               assertEquals(1, 
stateHandleProvider.getStateHandles().get(1).getNumberOfDiscardCalls());
+
+               // Initial value
+               @SuppressWarnings("unchecked")
+               Long actual = ((StateHandle<Long>) 
InstantiationUtil.deserializeObject(
+                               
ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
+                               
ClassLoader.getSystemClassLoader())).getState(null);
+
+               assertEquals(initialState, actual);
+       }
+
+       /**
+        * Tests get operation.
+        */
+       @Test
+       public void testGetAndExists() throws Exception {
+               // Setup
+               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZooKeeper.getClient(), stateHandleProvider);
+
+               // Config
+               final String pathInZooKeeper = "/testGetAndExists";
+               final Long state = 311222268470898L;
+
+               // Test
+               assertEquals(-1, store.exists(pathInZooKeeper));
+
+               store.add(pathInZooKeeper, state);
+               StateHandle<Long> actual = store.get(pathInZooKeeper);
+
+               // Verify
+               assertEquals(state, actual.getState(null));
+               assertTrue(store.exists(pathInZooKeeper) >= 0);
+       }
+
+       /**
+        * Tests that a non existing path throws an Exception.
+        */
+       @Test(expected = Exception.class)
+       public void testGetNonExistingPath() throws Exception {
+               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZooKeeper.getClient(), stateHandleProvider);
+
+               store.get("/testGetNonExistingPath");
+       }
+
+       /**
+        * Tests that all added state is returned.
+        */
+       @Test
+       public void testGetAll() throws Exception {
+               // Setup
+               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZooKeeper.getClient(), stateHandleProvider);
+
+               // Config
+               final String pathInZooKeeper = "/testGetAll";
+
+               final Set<Long> expected = new HashSet<>();
+               expected.add(311222268470898L);
+               expected.add(132812888L);
+               expected.add(27255442L);
+               expected.add(11122233124L);
+
+               // Test
+               for (long val : expected) {
+                       store.add(pathInZooKeeper, val, 
CreateMode.PERSISTENT_SEQUENTIAL);
+               }
+
+               for (Tuple2<StateHandle<Long>, String> val : store.getAll()) {
+                       assertTrue(expected.remove(val.f0.getState(null)));
+               }
+               assertEquals(0, expected.size());
+       }
+
+       /**
+        * Tests that the state is returned sorted.
+        */
+       @Test
+       public void testGetAllSortedByName() throws Exception {
+               // Setup
+               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZooKeeper.getClient(), stateHandleProvider);
+
+               // Config
+               final String pathInZooKeeper = "/testGetAllSortedByName";
+
+               final Long[] expected = new Long[] {
+                               311222268470898L, 132812888L, 27255442L, 
11122233124L };
+
+               // Test
+               for (long val : expected) {
+                       store.add(pathInZooKeeper, val, 
CreateMode.PERSISTENT_SEQUENTIAL);
+               }
+
+               List<Tuple2<StateHandle<Long>, String>> actual = 
store.getAllSortedByName();
+               assertEquals(expected.length, actual.size());
+
+               for (int i = 0; i < expected.length; i++) {
+                       assertEquals(expected[i], 
actual.get(i).f0.getState(null));
+               }
+       }
+
+       /**
+        * Tests that state handles are correctly removed.
+        */
+       @Test
+       public void testRemove() throws Exception {
+               // Setup
+               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZooKeeper.getClient(), stateHandleProvider);
+
+               // Config
+               final String pathInZooKeeper = "/testRemove";
+               final Long state = 27255442L;
+
+               store.add(pathInZooKeeper, state);
+
+               // Test
+               store.remove(pathInZooKeeper);
+
+               // Verify discarded
+               assertEquals(0, 
ZooKeeper.getClient().getChildren().forPath("/").size());
+       }
+
+       /**
+        * Tests that state handles are correctly removed with a callback.
+        */
+       @Test
+       public void testRemoveWithCallback() throws Exception {
+               // Setup
+               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZooKeeper.getClient(), stateHandleProvider);
+
+               // Config
+               final String pathInZooKeeper = "/testRemoveWithCallback";
+               final Long state = 27255442L;
+
+               store.add(pathInZooKeeper, state);
+
+               final CountDownLatch sync = new CountDownLatch(1);
+               BackgroundCallback callback = mock(BackgroundCallback.class);
+               doAnswer(new Answer<Void>() {
+                       @Override
+                       public Void answer(InvocationOnMock invocation) throws 
Throwable {
+                               sync.countDown();
+                               return null;
+                       }
+               }).when(callback).processResult(eq(ZooKeeper.getClient()), 
any(CuratorEvent.class));
+
+               // Test
+               store.remove(pathInZooKeeper, callback);
+
+               // Verify discarded and callback called
+               assertEquals(0, 
ZooKeeper.getClient().getChildren().forPath("/").size());
+
+               sync.await();
+
+               verify(callback, times(1))
+                               .processResult(eq(ZooKeeper.getClient()), 
any(CuratorEvent.class));
+       }
+
+       /**
+        * Tests that state handles are correctly discarded.
+        */
+       @Test
+       public void testRemoveAndDiscardState() throws Exception {
+               // Setup
+               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZooKeeper.getClient(), stateHandleProvider);
+
+               // Config
+               final String pathInZooKeeper = "/testDiscard";
+               final Long state = 27255442L;
+
+               store.add(pathInZooKeeper, state);
+
+               // Test
+               store.removeAndDiscardState(pathInZooKeeper);
+
+               // Verify discarded
+               assertEquals(0, 
ZooKeeper.getClient().getChildren().forPath("/").size());
+       }
+
+       /** Tests that all state handles are correctly discarded. */
+       @Test
+       public void testRemoveAndDiscardAllState() throws Exception {
+               // Setup
+               LongStateHandleProvider stateHandleProvider = new 
LongStateHandleProvider();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZooKeeper.getClient(), stateHandleProvider);
+
+               // Config
+               final String pathInZooKeeper = "/testDiscardAll";
+
+               final Set<Long> expected = new HashSet<>();
+               expected.add(311222268470898L);
+               expected.add(132812888L);
+               expected.add(27255442L);
+               expected.add(11122233124L);
+
+               // Test
+               for (long val : expected) {
+                       store.add(pathInZooKeeper, val, 
CreateMode.PERSISTENT_SEQUENTIAL);
+               }
+
+               store.removeAndDiscardAllState();
+
+               // Verify all discarded
+               assertEquals(0, 
ZooKeeper.getClient().getChildren().forPath("/").size());
+       }
+
+       // 
---------------------------------------------------------------------------------------------
+       // Simple test helpers
+       // 
---------------------------------------------------------------------------------------------
+
+       private static class LongStateHandleProvider implements 
StateHandleProvider<Long> {
+
+               private static final long serialVersionUID = 
4572084854499402276L;
+
+               private final List<LongStateHandle> stateHandles = new 
ArrayList<>();
+
+               @Override
+               public StateHandle<Long> createStateHandle(Long state) {
+                       LongStateHandle stateHandle = new 
LongStateHandle(state);
+                       stateHandles.add(stateHandle);
+
+                       return stateHandle;
+               }
+
+               public List<LongStateHandle> getStateHandles() {
+                       return stateHandles;
+               }
+       }
+
+       private static class LongStateHandle implements StateHandle<Long> {
+
+               private static final long serialVersionUID = 
-3555329254423838912L;
+
+               private final Long state;
+
+               private int numberOfDiscardCalls;
+
+               public LongStateHandle(Long state) {
+                       this.state = state;
+               }
+
+               @Override
+               public Long getState(ClassLoader ignored) throws Exception {
+                       return state;
+               }
+
+               @Override
+               public void discardState() throws Exception {
+                       numberOfDiscardCalls++;
+               }
+
+               public int getNumberOfDiscardCalls() {
+                       return numberOfDiscardCalls;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
new file mode 100644
index 0000000..7ae89d1
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+
+/**
+ * Simple ZooKeeper and CuratorFramework setup for tests.
+ */
+public class ZooKeeperTestEnvironment {
+
+       private final TestingServer zooKeeperServer;
+
+       private final TestingCluster zooKeeperCluster;
+
+       private final CuratorFramework client;
+
+       /**
+        * Starts a ZooKeeper cluster with the number of quorum peers and a 
client.
+        *
+        * @param numberOfZooKeeperQuorumPeers Starts a {@link TestingServer}, 
if <code>1</code>.
+        *                                     Starts a {@link TestingCluster}, 
if <code>=>1</code>.
+        */
+       public ZooKeeperTestEnvironment(int numberOfZooKeeperQuorumPeers) {
+               if (numberOfZooKeeperQuorumPeers <= 0) {
+                       throw new IllegalArgumentException("Number of peers 
needs to be >= 1.");
+               }
+
+               final Configuration conf = new Configuration();
+
+               try {
+                       if (numberOfZooKeeperQuorumPeers == 1) {
+                               zooKeeperServer = new TestingServer(true);
+                               zooKeeperCluster = null;
+
+                               
conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY,
+                                               
zooKeeperServer.getConnectString());
+                       }
+                       else {
+                               zooKeeperServer = null;
+                               zooKeeperCluster = new 
TestingCluster(numberOfZooKeeperQuorumPeers);
+
+                               zooKeeperCluster.start();
+
+                               
conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY,
+                                               
zooKeeperCluster.getConnectString());
+                       }
+
+                       client = ZooKeeperUtils.startCuratorFramework(conf);
+
+                       client.newNamespaceAwareEnsurePath("/")
+                                       .ensure(client.getZookeeperClient());
+               }
+               catch (Exception e) {
+                       throw new RuntimeException("Error setting up 
ZooKeeperTestEnvironment", e);
+               }
+       }
+
+       /**
+        * Shutdown the client and ZooKeeper server/cluster.
+        */
+       public void shutdown() throws Exception {
+               if (client != null) {
+                       client.close();
+               }
+
+               if (zooKeeperServer != null) {
+                       zooKeeperServer.close();
+               }
+
+               if (zooKeeperCluster != null) {
+                       zooKeeperCluster.close();
+               }
+       }
+
+       public String getConnectString() {
+               if (zooKeeperServer != null) {
+                       return zooKeeperServer.getConnectString();
+               }
+               else {
+                       return zooKeeperCluster.getConnectString();
+               }
+       }
+
+       /**
+        * Returns a client for the started ZooKeeper server/cluster.
+        */
+       public CuratorFramework getClient() {
+               return client;
+       }
+
+       /**
+        * Creates a new client for the started ZooKeeper server/cluster.
+        */
+       public CuratorFramework createClient() {
+               Configuration config = new Configuration();
+               config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, 
getConnectString());
+               return ZooKeeperUtils.startCuratorFramework(config);
+       }
+
+       /**
+        * Deletes all ZNodes under the root node.
+        *
+        * @throws Exception If the ZooKeeper operation fails
+        */
+       public void deleteAll() throws Exception {
+               final String path = "/" + client.getNamespace();
+               
ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, false);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
index d1b8fac..9a1cde0 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
@@ -18,9 +18,12 @@
 
 package org.apache.flink.runtime.executiongraph
 
+import java.util.concurrent.TimeUnit
+
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.execution.ExecutionState
 import 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway
 import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks
@@ -32,6 +35,7 @@ import org.scalatest.junit.JUnitRunner
 import org.scalatest.{Matchers, WordSpecLike}
 
 import scala.collection.JavaConverters._
+import scala.concurrent.duration.FiniteDuration
 
 @RunWith(classOf[JUnitRunner])
 class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
@@ -126,8 +130,23 @@ class ExecutionGraphRestartTest extends WordSpecLike with 
Matchers {
         for (vertex <- eg.getAllExecutionVertices.asScala) {
           vertex.getCurrentExecutionAttempt().cancelingComplete()
         }
-        
+
+        val timeout = new FiniteDuration(2, TimeUnit.MINUTES)
+
+        // Wait for async restart
+        var deadline = timeout.fromNow
+        while (deadline.hasTimeLeft() && eg.getState != JobStatus.RUNNING) {
+          Thread.sleep(100)
+        }
+
         eg.getState should equal(JobStatus.RUNNING)
+
+        // Wait for deploying after async restart
+        deadline = timeout.fromNow
+        while (deadline.hasTimeLeft() && 
eg.getAllExecutionVertices.asScala.exists(
+          _.getCurrentExecutionAttempt.getState != ExecutionState.DEPLOYING)) {
+          Thread.sleep(100)
+        }
         
         for (vertex <- eg.getAllExecutionVertices.asScala) {
           vertex.getCurrentExecutionAttempt().markFinished()

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index c9ae1e4..703d7bf 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -43,28 +43,28 @@ import scala.concurrent.{Await, Future}
  *                          otherwise false
  */
 class TestingCluster(
-    userConfiguration: Configuration,
-    singleActorSystem: Boolean,
-    synchronousDispatcher: Boolean,
-    streamingMode: StreamingMode)
+                      userConfiguration: Configuration,
+                      singleActorSystem: Boolean,
+                      synchronousDispatcher: Boolean,
+                      streamingMode: StreamingMode)
   extends FlinkMiniCluster(
     userConfiguration,
     singleActorSystem,
     streamingMode) {
-  
+
 
   def this(userConfiguration: Configuration,
            singleActorSystem: Boolean,
            synchronousDispatcher: Boolean)
-       = this(userConfiguration, singleActorSystem, synchronousDispatcher, 
StreamingMode.BATCH_ONLY)
+  = this(userConfiguration, singleActorSystem, synchronousDispatcher, 
StreamingMode.BATCH_ONLY)
 
   def this(userConfiguration: Configuration, singleActorSystem: Boolean)
-       = this(userConfiguration, singleActorSystem, false)
+  = this(userConfiguration, singleActorSystem, false)
 
   def this(userConfiguration: Configuration) = this(userConfiguration, true, 
false)
-  
+
   // --------------------------------------------------------------------------
-  
+
   override def generateConfiguration(userConfig: Configuration): Configuration 
= {
     val cfg = new Configuration()
     cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")
@@ -100,16 +100,18 @@ class TestingCluster(
     }
 
     val (executionContext,
-      instanceManager,
-      scheduler,
-      libraryCacheManager,
-      executionRetries,
-      delayBetweenRetries,
-      timeout,
-      archiveCount,
-      leaderElectionService) = JobManager.createJobManagerComponents(
-        config,
-        createLeaderElectionService())
+    instanceManager,
+    scheduler,
+    libraryCacheManager,
+    executionRetries,
+    delayBetweenRetries,
+    timeout,
+    archiveCount,
+    leaderElectionService,
+    submittedJobsGraphs,
+    checkpointRecoveryFactory) = JobManager.createJobManagerComponents(
+      config,
+      createLeaderElectionService())
 
     val testArchiveProps = Props(new TestingMemoryArchivist(archiveCount))
     val archive = actorSystem.actorOf(testArchiveProps, archiveName)
@@ -126,7 +128,9 @@ class TestingCluster(
         delayBetweenRetries,
         timeout,
         streamingMode,
-        leaderElectionService))
+        leaderElectionService,
+        submittedJobsGraphs,
+        checkpointRecoveryFactory))
 
     val dispatcherJobManagerProps = if (synchronousDispatcher) {
       // disable asynchronous futures (e.g. accumulator update in Heartbeat)

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 312a1e5..be72003 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -18,32 +18,18 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import akka.actor.{Cancellable, Terminated, ActorRef}
-import akka.pattern.pipe
-import akka.pattern.ask
-import org.apache.flink.api.common.JobID
+import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.{StreamingMode, FlinkActor}
 import org.apache.flink.runtime.StreamingMode
-import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobgraph.JobStatus
-import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
-import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
-import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership
-import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
-import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
-import 
org.apache.flink.runtime.testingUtils.TestingMessages.{CheckIfJobRemoved, Alive,
-DisableDisconnect}
-import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
+import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
 
-import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.ExecutionContext
 import scala.concurrent.duration._
-
 import scala.language.postfixOps
 
 /** JobManager implementation extended by testing messages
@@ -70,7 +56,9 @@ class TestingJobManager(
     delayBetweenRetries: Long,
     timeout: FiniteDuration,
     mode: StreamingMode,
-    leaderElectionService: LeaderElectionService)
+    leaderElectionService: LeaderElectionService,
+    submittedJobGraphs : SubmittedJobGraphStore,
+    checkpointRecoveryFactory : CheckpointRecoveryFactory)
   extends JobManager(
     flinkConfiguration,
     executionContext,
@@ -82,5 +70,7 @@ class TestingJobManager(
     delayBetweenRetries,
     timeout,
     mode,
-    leaderElectionService)
+    leaderElectionService,
+    submittedJobGraphs,
+    checkpointRecoveryFactory)
   with TestingJobManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index b607433..72a8c25 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -541,7 +542,6 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                                        }
                        }
                }
-       }
 
        /**
         * Registers a timer.

http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
 
b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index c7f7698..11eb174 100644
--- 
a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ 
b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -20,22 +20,20 @@ package org.apache.flink.test.util
 
 import java.util.concurrent.TimeoutException
 
-import akka.pattern.ask
-import akka.actor.{Props, ActorRef, ActorSystem}
+import akka.actor.{ActorRef, ActorSystem}
 import akka.pattern.Patterns._
+import akka.pattern.ask
 import org.apache.curator.test.TestingCluster
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.jobmanager.{RecoveryMode, JobManager}
+import org.apache.flink.runtime.jobmanager.{JobManager, RecoveryMode}
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages
-.NotifyWhenRegisteredAtJobManager
-import org.apache.flink.runtime.testingUtils.{TestingUtils, TestingTaskManager,
-TestingJobManager, TestingMemoryArchivist}
+import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
+import org.apache.flink.runtime.testingUtils.{TestingJobManager, 
TestingMemoryArchivist, TestingTaskManager, TestingUtils}
 
-import scala.concurrent.{Future, Await}
+import scala.concurrent.{Await, Future}
 
 /**
  * A forkable mini cluster is a special case of the mini cluster, used for 
parallel test execution
@@ -47,20 +45,20 @@ import scala.concurrent.{Future, Await}
  *                          same [[ActorSystem]], otherwise false.
  */
 class ForkableFlinkMiniCluster(
-    userConfiguration: Configuration,
-    singleActorSystem: Boolean,
-    streamingMode: StreamingMode)
+                                userConfiguration: Configuration,
+                                singleActorSystem: Boolean,
+                                streamingMode: StreamingMode)
   extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem, 
streamingMode) {
 
-  def this(userConfiguration: Configuration, singleActorSystem: Boolean) 
-       = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
+  def this(userConfiguration: Configuration, singleActorSystem: Boolean)
+  = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
 
   def this(userConfiguration: Configuration) = this(userConfiguration, true)
-  
+
   // --------------------------------------------------------------------------
 
   var zookeeperCluster: Option[TestingCluster] = None
-  
+
   override def generateConfiguration(userConfiguration: Configuration): 
Configuration = {
     val forNumberString = System.getProperty("forkNumber")
 
@@ -264,10 +262,10 @@ object ForkableFlinkMiniCluster {
   import 
org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT
 
   def startCluster(
-      numSlots: Int,
-      numTaskManagers: Int,
-      timeout: String = DEFAULT_AKKA_ASK_TIMEOUT)
-    : ForkableFlinkMiniCluster = {
+                    numSlots: Int,
+                    numTaskManagers: Int,
+                    timeout: String = DEFAULT_AKKA_ASK_TIMEOUT)
+  : ForkableFlinkMiniCluster = {
 
     val config = new Configuration()
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)

Reply via email to