[FLINK-4273] Modify JobClient to attach to running jobs

These changes are required for FLINK-4272 (introduce a JobClient class
for job control). Essentially, we want to be able to re-attach to a
running job and monitor it. It shouldn't make any difference whether we
just submitted the job or we recover it from an existing JobID.

This PR modifies the JobClientActor to support two different operation
modes: a) submitJob and monitor b) re-attach to job and monitor

The JobClient class has been updated with methods to access this
functionality. Before the class just had `submitJobAndWait` and
`submitJobDetached`. Now, it has the additional methods `submitJob`,
`attachToRunningJob`, and `awaitJobResult`.

The job submission has been split up in two phases:

1a. submitJob(..)
Submit job and return a future which can be completed to
get the result with `awaitJobResult`

1b. attachToRunningJob(..)
Re-attach to a runnning job, reconstruct its class loader, and return a
future which can be completed with `awaitJobResult`

2. awaitJobResult(..)
Blocks until the returned future from either `submitJob` or
`attachToRunningJob` has been completed

- split up JobClientActor into a base class and two implementations
- JobClient: on waiting check JobClientActor liveness
- lazily reconstruct user class loader
- add additional tests for JobClientActor
- add test case to test resuming of jobs

This closes #2313


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/259a3a55
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/259a3a55
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/259a3a55

Branch: refs/heads/master
Commit: 259a3a5569952458140afc8e9ad96eac0c330162
Parents: 444315a
Author: Maximilian Michels <[email protected]>
Authored: Thu Aug 18 16:04:35 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Thu Aug 25 15:46:15 2016 +0200

----------------------------------------------------------------------
 .../flink/client/program/ClusterClient.java     |  41 ++-
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../flink/api/common/JobExecutionResult.java    |   4 +-
 .../client/JobAttachmentClientActor.java        | 171 +++++++++++
 .../apache/flink/runtime/client/JobClient.java  | 292 +++++++++++++++----
 .../flink/runtime/client/JobClientActor.java    | 281 ++++++------------
 ...ClientActorRegistrationTimeoutException.java |  35 +++
 .../runtime/client/JobListeningContext.java     | 145 +++++++++
 .../runtime/client/JobRetrievalException.java   |  42 +++
 .../client/JobSubmissionClientActor.java        | 192 ++++++++++++
 .../runtime/executiongraph/ExecutionGraph.java  |   1 +
 .../flink/runtime/jobmanager/JobInfo.scala      |  62 +++-
 .../flink/runtime/jobmanager/JobManager.scala   | 161 +++++-----
 .../runtime/messages/JobClientMessages.scala    |  23 +-
 .../runtime/messages/JobManagerMessages.scala   |  48 ++-
 .../testingUtils/TestingJobManagerLike.scala    |  12 +-
 .../TestingJobManagerMessages.scala             |   6 +
 .../runtime/client/JobClientActorTest.java      | 190 +++++++++++-
 .../ZooKeeperSubmittedJobGraphsStoreITCase.java |   3 +-
 .../clients/examples/JobRetrievalITCase.java    | 138 +++++++++
 20 files changed, 1499 insertions(+), 350 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index c3c666b..292da70 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -47,6 +47,8 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobListeningContext;
+import org.apache.flink.runtime.client.JobRetrievalException;
 import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -429,6 +431,39 @@ public abstract class ClusterClient {
        }
 
        /**
+        * Reattaches to a running from from the supplied job id
+        * @param jobID The job id of the job to attach to
+        * @return The JobExecutionResult for the jobID
+        * @throws JobExecutionException if an error occurs during monitoring 
the job execution
+        */
+       public JobExecutionResult retrieveJob(JobID jobID) throws 
JobExecutionException {
+               final LeaderRetrievalService leaderRetrievalService;
+               try {
+                       leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
+               } catch (Exception e) {
+                       throw new JobRetrievalException(jobID, "Could not 
create the leader retrieval service", e);
+               }
+
+               ActorGateway jobManagerGateway;
+               try {
+                       jobManagerGateway = getJobManagerGateway();
+               } catch (Exception e) {
+                       throw new JobRetrievalException(jobID, "Could not 
retrieve the JobManager Gateway");
+               }
+
+               final JobListeningContext listeningContext = 
JobClient.attachToRunningJob(
+                               jobID,
+                               jobManagerGateway,
+                               flinkConfig,
+                               actorSystemLoader.get(),
+                               leaderRetrievalService,
+                               timeout,
+                               printStatusDuringExecution);
+
+               return JobClient.awaitJobResult(listeningContext);
+       }
+
+       /**
         * Cancels a job identified by the job id.
         * @param jobId the job id
         * @throws Exception In case an error occurred.
@@ -446,11 +481,11 @@ public abstract class ClusterClient {
                final Object result = Await.result(response, timeout);
 
                if (result instanceof JobManagerMessages.CancellationSuccess) {
-                       LOG.info("Job cancellation with ID " + jobId + " 
succeeded.");
+                       logAndSysout("Job cancellation with ID " + jobId + " 
succeeded.");
                } else if (result instanceof 
JobManagerMessages.CancellationFailure) {
                        final Throwable t = 
((JobManagerMessages.CancellationFailure) result).cause();
-                       LOG.info("Job cancellation with ID " + jobId + " 
failed.", t);
-                       throw new Exception("Failed to cancel the job because 
of \n" + t.getMessage());
+                       logAndSysout("Job cancellation with ID " + jobId + " 
failed because of " + t.getMessage());
+                       throw new Exception("Failed to cancel the job with id " 
+ jobId, t);
                } else {
                        throw new Exception("Unknown message received while 
cancelling: " + result.getClass().getName());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-clients/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/resources/log4j-test.properties 
b/flink-clients/src/test/resources/log4j-test.properties
index 85897b3..5100c1f 100644
--- a/flink-clients/src/test/resources/log4j-test.properties
+++ b/flink-clients/src/test/resources/log4j-test.properties
@@ -27,4 +27,4 @@ 
log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
 log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java 
b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
index bc5ae09..cb4ecc5 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
@@ -34,7 +34,7 @@ public class JobExecutionResult extends JobSubmissionResult {
 
        private long netRuntime;
 
-       private Map<String, Object> accumulatorResults = Collections.emptyMap();
+       private final Map<String, Object> accumulatorResults;
 
        /**
         * Creates a new JobExecutionResult.
@@ -49,6 +49,8 @@ public class JobExecutionResult extends JobSubmissionResult {
 
                if (accumulators != null) {
                        this.accumulatorResults = accumulators;
+               } else {
+                       this.accumulatorResults = Collections.emptyMap();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
new file mode 100644
index 0000000..5446002
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.Status;
+import akka.dispatch.Futures;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobClientMessages;
+import org.apache.flink.runtime.messages.JobClientMessages.AttachToJobAndWait;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.Callable;
+
+
+/**
+ * Actor which handles Job attachment process and provides Job updates until 
completion.
+ */
+public class JobAttachmentClientActor extends JobClientActor {
+
+       /** JobID to attach to when the JobClientActor retrieves a job */
+       private JobID jobID;
+       /** true if a JobRegistrationSuccess message has been received */
+       private boolean successfullyRegisteredForJob = false;
+
+       public JobAttachmentClientActor(
+                       LeaderRetrievalService leaderRetrievalService,
+                       FiniteDuration timeout,
+                       boolean sysoutUpdates) {
+               super(leaderRetrievalService, timeout, sysoutUpdates);
+       }
+
+       @Override
+       public void connectedToJobManager() {
+               if (jobID != null && !successfullyRegisteredForJob) {
+                       tryToAttachToJob();
+               }
+       }
+
+       @Override
+       protected Class getClientMessageClass() {
+               return AttachToJobAndWait.class;
+       }
+
+       @Override
+       public void handleCustomMessage(Object message) {
+               if (message instanceof AttachToJobAndWait) {
+                       // sanity check that this no job registration was 
performed through this actor before -
+                       // it is a one-shot actor after all
+                       if (this.client == null) {
+                               jobID = ((AttachToJobAndWait) message).jobID();
+                               if (jobID == null) {
+                                       LOG.error("Received null JobID");
+                                       sender().tell(
+                                               decorateMessage(new 
Status.Failure(new Exception("JobID is null"))),
+                                               getSelf());
+                               } else {
+                                       LOG.info("Received JobID {}.", jobID);
+
+                                       this.client = getSender();
+
+                                       // is only successful if we already 
know the job manager leader
+                                       if (jobManager != null) {
+                                               tryToAttachToJob();
+                                       }
+                               }
+                       } else {
+                               // repeated submission - tell failure to sender 
and kill self
+                               String msg = "Received repeated 
'AttachToJobAndWait'";
+                               LOG.error(msg);
+                               getSender().tell(
+                                       decorateMessage(new Status.Failure(new 
Exception(msg))), ActorRef.noSender());
+
+                               terminate();
+                       }
+               }
+               else if (message instanceof 
JobManagerMessages.RegisterJobClientSuccess) {
+                       // job registration was successful :o)
+                       JobManagerMessages.RegisterJobClientSuccess msg = 
((JobManagerMessages.RegisterJobClientSuccess) message);
+                       logAndPrintMessage("Successfully registered at the 
JobManager for Job " + msg.jobId());
+                       successfullyRegisteredForJob = true;
+               }
+               else if (message instanceof JobManagerMessages.JobNotFound) {
+                       LOG.info("Couldn't register JobClient for JobID {}",
+                               ((JobManagerMessages.JobNotFound) 
message).jobID());
+                       client.tell(decorateMessage(message), getSelf());
+                       terminate();
+               }
+               else if 
(JobClientMessages.getRegistrationTimeout().equals(message)) {
+                       // check if our registration for a job was successful 
in the meantime
+                       if (!successfullyRegisteredForJob) {
+                               if (isClientConnected()) {
+                                       client.tell(
+                                               decorateMessage(new 
Status.Failure(
+                                                       new 
JobClientActorRegistrationTimeoutException("Registration for Job at the 
JobManager " +
+                                                               "timed out. " + 
"You may increase '" + ConfigConstants.AKKA_CLIENT_TIMEOUT +
+                                                               "' in case the 
JobManager needs more time to confirm the job client registration."))),
+                                               getSelf());
+                               }
+
+                               // We haven't heard back from the job manager 
after attempting registration for a job
+                               // therefore terminate
+                               terminate();
+                       }
+               } else {
+                       LOG.error("{} received unknown message: ", getClass());
+               }
+
+       }
+
+       private void tryToAttachToJob() {
+               LOG.info("Sending message to JobManager {} to attach to job {} 
and wait for progress", jobID);
+
+               Futures.future(new Callable<Object>() {
+                       @Override
+                       public Object call() throws Exception {
+                               LOG.info("Attaching to job {} at the job 
manager {}.", jobID, jobManager.path());
+
+                               jobManager.tell(
+                                       decorateMessage(
+                                               new 
JobManagerMessages.RegisterJobClient(
+                                                       jobID,
+                                                       
ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)),
+                                       getSelf());
+
+                               // issue a RegistrationTimeout message to check 
that we submit the job within
+                               // the given timeout
+                               getContext().system().scheduler().scheduleOnce(
+                                       timeout,
+                                       getSelf(),
+                                       
decorateMessage(JobClientMessages.getRegistrationTimeout()),
+                                       getContext().dispatcher(),
+                                       ActorRef.noSender());
+
+                               return null;
+                       }
+               }, getContext().dispatcher());
+       }
+
+       public static Props createActorProps(
+                       LeaderRetrievalService leaderRetrievalService,
+                       FiniteDuration timeout,
+                       boolean sysoutUpdates) {
+               return Props.create(
+                       JobAttachmentClientActor.class,
+                       leaderRetrievalService,
+                       timeout,
+                       sysoutUpdates);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index c0e0d08..4e916eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.client;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
+import akka.actor.Identify;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.pattern.Patterns;
@@ -30,6 +31,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobCache;
+import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -44,10 +47,15 @@ import scala.Some;
 import scala.Tuple2;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -80,28 +88,18 @@ public class JobClient {
        }
 
        /**
-        * Sends a [[JobGraph]] to the JobClient actor specified by jobClient 
which submits it then to
-        * the JobManager. The method blocks until the job has finished or the 
JobManager is no longer
-        * alive. In the former case, the [[SerializedJobExecutionResult]] is 
returned and in the latter
-        * case a [[JobExecutionException]] is thrown.
-        *
-        * @param actorSystem The actor system that performs the communication.
-        * @param leaderRetrievalService Leader retrieval service which used to 
find the current leading
-        *                               JobManager
-        * @param jobGraph    JobGraph describing the Flink job
-        * @param timeout     Timeout for futures
-        * @param sysoutLogUpdates prints log updates to system out if true
-        * @return The job execution result
-        * @throws org.apache.flink.runtime.client.JobExecutionException Thrown 
if the job
-        *                                                               
execution fails.
+        * Submits a job to a Flink cluster (non-blocking) and returns a 
JobListeningContext which can be
+        * passed to {@code awaitJobResult} to get the result of the submission.
+        * @return JobListeningContext which may be used to retrieve the 
JobExecutionResult via
+        *                      {@code awaitJobResult(JobListeningContext 
context)}.
         */
-       public static JobExecutionResult submitJobAndWait(
+       public static JobListeningContext submitJob(
                        ActorSystem actorSystem,
                        LeaderRetrievalService leaderRetrievalService,
                        JobGraph jobGraph,
                        FiniteDuration timeout,
                        boolean sysoutLogUpdates,
-                       ClassLoader classLoader) throws JobExecutionException {
+                       ClassLoader classLoader) {
 
                checkNotNull(actorSystem, "The actorSystem must not be null.");
                checkNotNull(leaderRetrievalService, "The jobManagerGateway 
must not be null.");
@@ -112,29 +110,187 @@ public class JobClient {
                // the JobManager. It forwards the job submission, checks the 
success/failure responses, logs
                // update messages, watches for disconnect between client and 
JobManager, ...
 
-               Props jobClientActorProps = 
JobClientActor.createJobClientActorProps(
+               Props jobClientActorProps = 
JobSubmissionClientActor.createActorProps(
                        leaderRetrievalService,
                        timeout,
                        sysoutLogUpdates);
 
                ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
-               
-               // first block handles errors while waiting for the result
-               Object answer;
+
+               Future<Object> submissionFuture = Patterns.ask(
+                               jobClientActor,
+                               new 
JobClientMessages.SubmitJobAndWait(jobGraph),
+                               new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+               return new JobListeningContext(
+                               jobGraph.getJobID(),
+                               submissionFuture,
+                               jobClientActor,
+                               timeout,
+                               classLoader);
+       }
+
+
+       /**
+        * Attaches to a running Job using the JobID.
+        * Reconstructs the user class loader by downloading the jars from the 
JobManager.
+        */
+       public static JobListeningContext attachToRunningJob(
+                       JobID jobID,
+                       ActorGateway jobManagerGateWay,
+                       Configuration configuration,
+                       ActorSystem actorSystem,
+                       LeaderRetrievalService leaderRetrievalService,
+                       FiniteDuration timeout,
+                       boolean sysoutLogUpdates) {
+
+               checkNotNull(jobID, "The jobID must not be null.");
+               checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not 
be null.");
+               checkNotNull(configuration, "The configuration must not be 
null.");
+               checkNotNull(actorSystem, "The actorSystem must not be null.");
+               checkNotNull(leaderRetrievalService, "The jobManagerGateway 
must not be null.");
+               checkNotNull(timeout, "The timeout must not be null.");
+
+               // we create a proxy JobClientActor that deals with all 
communication with
+               // the JobManager. It forwards the job attachments, checks the 
success/failure responses, logs
+               // update messages, watches for disconnect between client and 
JobManager, ...
+               Props jobClientActorProps = 
JobAttachmentClientActor.createActorProps(
+                       leaderRetrievalService,
+                       timeout,
+                       sysoutLogUpdates);
+
+               ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
+
+               Future<Object> attachmentFuture = Patterns.ask(
+                               jobClientActor,
+                               new JobClientMessages.AttachToJobAndWait(jobID),
+                               new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+               return new JobListeningContext(
+                               jobID,
+                               attachmentFuture,
+                               jobClientActor,
+                               timeout,
+                               actorSystem,
+                               configuration);
+       }
+
+       /**
+        * Reconstructs the class loader by first requesting information about 
it at the JobManager
+        * and then downloading missing jar files.
+        * @param jobID id of job
+        * @param jobManager gateway to the JobManager
+        * @param config the flink configuration
+        * @return A classloader that should behave like the original 
classloader
+        * @throws JobRetrievalException if anything goes wrong
+        */
+       public static ClassLoader retrieveClassLoader(
+               JobID jobID,
+               ActorGateway jobManager,
+               Configuration config)
+               throws JobRetrievalException {
+
+               final Object jmAnswer;
                try {
-                       Future<Object> future = Patterns.ask(jobClientActor,
-                                       new 
JobClientMessages.SubmitJobAndWait(jobGraph),
-                                       new Timeout(AkkaUtils.INF_TIMEOUT()));
-                       
-                       answer = Await.result(future, AkkaUtils.INF_TIMEOUT());
+                       jmAnswer = Await.result(
+                               jobManager.ask(
+                                       new 
JobManagerMessages.RequestClassloadingProps(jobID),
+                                       AkkaUtils.getDefaultTimeout()),
+                               AkkaUtils.getDefaultTimeout());
+               } catch (Exception e) {
+                       throw new JobRetrievalException(jobID, "Couldn't 
retrieve class loading properties from JobManager.", e);
                }
-               catch (TimeoutException e) {
-                       throw new JobTimeoutException(jobGraph.getJobID(), 
"Timeout while waiting for JobManager answer. " +
-                                       "Job time exceeded " + 
AkkaUtils.INF_TIMEOUT(), e);
+
+               if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) {
+                       JobManagerMessages.ClassloadingProps props = 
((JobManagerMessages.ClassloadingProps) jmAnswer);
+
+                       Option<String> jmHost = 
jobManager.actor().path().address().host();
+                       String jmHostname = jmHost.isDefined() ? jmHost.get() : 
"localhost";
+                       InetSocketAddress serverAddress = new 
InetSocketAddress(jmHostname, props.blobManagerPort());
+                       final BlobCache blobClient = new 
BlobCache(serverAddress, config);
+
+                       final List<BlobKey> requiredJarFiles = 
props.requiredJarFiles();
+                       final List<URL> requiredClasspaths = 
props.requiredClasspaths();
+
+                       final URL[] allURLs = new URL[requiredJarFiles.size() + 
requiredClasspaths.size()];
+
+                       int pos = 0;
+                       for (BlobKey blobKey : props.requiredJarFiles()) {
+                               try {
+                                       allURLs[pos++] = 
blobClient.getURL(blobKey);
+                               } catch (Exception e) {
+                                       blobClient.shutdown();
+                                       throw new JobRetrievalException(jobID, 
"Failed to download BlobKey " + blobKey);
+                               }
+                       }
+
+                       for (URL url : requiredClasspaths) {
+                               allURLs[pos++] = url;
+                       }
+
+                       return new URLClassLoader(allURLs, 
JobClient.class.getClassLoader());
+               } else if (jmAnswer instanceof JobManagerMessages.JobNotFound) {
+                       throw new JobRetrievalException(jobID, "Couldn't 
retrieve class loader. Job " + jobID + " not found");
+               } else {
+                       throw new JobRetrievalException(jobID, "Unknown 
response from JobManager: " + jmAnswer);
+               }
+       }
+
+       /**
+        * Given a JobListeningContext, awaits the result of the job execution 
that this context is bound to
+        * @param listeningContext The listening context of the job execution
+        * @return The result of the execution
+        * @throws JobExecutionException if anything goes wrong while 
monitoring the job
+        */
+       public static JobExecutionResult awaitJobResult(JobListeningContext 
listeningContext) throws JobExecutionException {
+
+               final JobID jobID = listeningContext.getJobID();
+               final ActorRef jobClientActor = 
listeningContext.getJobClientActor();
+               final Future<Object> jobSubmissionFuture = 
listeningContext.getJobResultFuture();
+               final FiniteDuration askTimeout = listeningContext.getTimeout();
+               // retrieves class loader if necessary
+               final ClassLoader classLoader = 
listeningContext.getClassLoader();
+
+               // wait for the future which holds the result to be ready
+               // ping the JobClientActor from time to time to check if it is 
still running
+               while (!jobSubmissionFuture.isCompleted()) {
+                       try {
+                               Await.ready(jobSubmissionFuture, askTimeout);
+                       } catch (InterruptedException e) {
+                               throw new JobExecutionException(
+                                       jobID,
+                                       "Interrupted while waiting for job 
completion.");
+                       } catch (TimeoutException e) {
+                               try {
+                                       Await.result(
+                                               Patterns.ask(
+                                                       jobClientActor,
+                                                       // Ping the Actor to 
see if it is alive
+                                                       new Identify(true),
+                                                       
Timeout.durationToTimeout(askTimeout)),
+                                               askTimeout);
+                                       // we got a reply, continue waiting for 
the job result
+                               } catch (Exception eInner) {
+                                       // we could have a result but the 
JobClientActor might have been killed and
+                                       // thus the health check failed
+                                       if (!jobSubmissionFuture.isCompleted()) 
{
+                                               throw new JobExecutionException(
+                                                       jobID,
+                                                       "JobClientActor seems 
to have died before the JobExecutionResult could be retrieved.",
+                                                       eInner);
+                                       }
+                               }
+                       }
+               }
+
+               final Object answer;
+               try {
+                       // we have already awaited the result, zero time to 
wait here
+                       answer = Await.result(jobSubmissionFuture, 
Duration.Zero());
                }
                catch (Throwable throwable) {
-                       throw new JobExecutionException(jobGraph.getJobID(),
-                                       "Communication with JobManager failed: 
" + throwable.getMessage(), throwable);
+                       throw new JobExecutionException(jobID,
+                               "Couldn't retrieve the JobExecutionResult from 
the JobManager.", throwable);
                }
                finally {
                        // failsafe shutdown of the client actor
@@ -149,18 +305,16 @@ public class JobClient {
                        if (result != null) {
                                try {
                                        return 
result.toJobExecutionResult(classLoader);
+                               } catch (Throwable t) {
+                                       throw new JobExecutionException(jobID,
+                                               "Job was successfully executed 
but JobExecutionResult could not be deserialized.");
                                }
-                               catch (Throwable t) {
-                                       throw new 
JobExecutionException(jobGraph.getJobID(),
-                                                       "Job was successfully 
executed but JobExecutionResult could not be deserialized.");
-                               }
-                       }
-                       else {
-                               throw new 
JobExecutionException(jobGraph.getJobID(),
-                                               "Job was successfully executed 
but result contained a null JobExecutionResult.");
+                       } else {
+                               throw new JobExecutionException(jobID,
+                                       "Job was successfully executed but 
result contained a null JobExecutionResult.");
                        }
                }
-               if (answer instanceof JobManagerMessages.JobResultFailure) {
+               else if (answer instanceof JobManagerMessages.JobResultFailure) 
{
                        LOG.info("Job execution failed");
 
                        SerializedThrowable serThrowable = 
((JobManagerMessages.JobResultFailure) answer).cause();
@@ -168,23 +322,62 @@ public class JobClient {
                                Throwable cause = 
serThrowable.deserializeError(classLoader);
                                if (cause instanceof JobExecutionException) {
                                        throw (JobExecutionException) cause;
+                               } else {
+                                       throw new JobExecutionException(jobID, 
"Job execution failed", cause);
                                }
-                               else {
-                                       throw new 
JobExecutionException(jobGraph.getJobID(), "Job execution failed", cause);
-                               }
-                       }
-                       else {
-                               throw new 
JobExecutionException(jobGraph.getJobID(),
-                                               "Job execution failed with null 
as failure cause.");
+                       } else {
+                               throw new JobExecutionException(jobID,
+                                       "Job execution failed with null as 
failure cause.");
                        }
                }
+               else if (answer instanceof JobManagerMessages.JobNotFound) {
+                       throw new JobRetrievalException(
+                               ((JobManagerMessages.JobNotFound) 
answer).jobID(),
+                               "Couldn't retrieve Job " + jobID + " because it 
was not running.");
+               }
                else {
-                       throw new JobExecutionException(jobGraph.getJobID(),
-                                       "Unknown answer from JobManager after 
submitting the job: " + answer);
+                       throw new JobExecutionException(jobID,
+                               "Unknown answer from JobManager after 
submitting the job: " + answer);
                }
        }
 
        /**
+        * Sends a [[JobGraph]] to the JobClient actor specified by jobClient 
which submits it then to
+        * the JobManager. The method blocks until the job has finished or the 
JobManager is no longer
+        * alive. In the former case, the [[SerializedJobExecutionResult]] is 
returned and in the latter
+        * case a [[JobExecutionException]] is thrown.
+        *
+        * @param actorSystem The actor system that performs the communication.
+        * @param leaderRetrievalService Leader retrieval service which used to 
find the current leading
+        *                               JobManager
+        * @param jobGraph    JobGraph describing the Flink job
+        * @param timeout     Timeout for futures
+        * @param sysoutLogUpdates prints log updates to system out if true
+        * @param classLoader The class loader for deserializing the results
+        * @return The job execution result
+        * @throws org.apache.flink.runtime.client.JobExecutionException Thrown 
if the job
+        *                                                               
execution fails.
+        */
+       public static JobExecutionResult submitJobAndWait(
+                       ActorSystem actorSystem,
+                       LeaderRetrievalService leaderRetrievalService,
+                       JobGraph jobGraph,
+                       FiniteDuration timeout,
+                       boolean sysoutLogUpdates,
+                       ClassLoader classLoader) throws JobExecutionException {
+
+               JobListeningContext jobListeningContext = submitJob(
+                               actorSystem,
+                               leaderRetrievalService,
+                               jobGraph,
+                               timeout,
+                               sysoutLogUpdates,
+                               classLoader);
+
+               return awaitJobResult(jobListeningContext);
+       }
+
+       /**
         * Submits a job in detached mode. The method sends the JobGraph to the
         * JobManager and waits for the answer whether the job could be started 
or not.
         *
@@ -227,7 +420,7 @@ public class JobClient {
                                        "JobManager did not respond within " + 
timeout.toString(), e);
                }
                catch (Throwable t) {
-                       throw new JobExecutionException(jobGraph.getJobID(),
+                       throw new JobSubmissionException(jobGraph.getJobID(),
                                        "Failed to send job to JobManager: " + 
t.getMessage(), t.getCause());
                }
 
@@ -258,4 +451,5 @@ public class JobClient {
                        throw new JobExecutionException(jobGraph.getJobID(), 
"Unexpected response from JobManager: " + result);
                }
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
index 9379c30..1380e76 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
@@ -20,18 +20,11 @@ package org.apache.flink.runtime.client;
 
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
-import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.Terminated;
-import akka.dispatch.Futures;
 import akka.dispatch.OnSuccess;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
-import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -39,47 +32,39 @@ import 
org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobClientMessages.JobManagerActorRef;
 import 
org.apache.flink.runtime.messages.JobClientMessages.JobManagerLeaderAddress;
-import org.apache.flink.runtime.messages.JobClientMessages.SubmitJobAndWait;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.Preconditions;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.IOException;
 import java.util.UUID;
-import java.util.concurrent.Callable;
+
 
 /**
- * Actor which constitutes the bridge between the non-actor code and the 
JobManager. The JobClient
- * is used to submit jobs to the JobManager and to request the port of the 
BlobManager.
+ * Actor which constitutes the bridge between the non-actor code and the 
JobManager.
+ * This base class handles the connection to the JobManager and notifies in 
case of timeouts. It also
+ * receives and prints job updates until job completion.
  */
-public class JobClientActor extends FlinkUntypedActor implements 
LeaderRetrievalListener {
+public abstract class JobClientActor extends FlinkUntypedActor implements 
LeaderRetrievalListener {
 
        private final LeaderRetrievalService leaderRetrievalService;
 
        /** timeout for futures */
-       private final FiniteDuration timeout;
+       protected final FiniteDuration timeout;
 
        /** true if status messages shall be printed to sysout */
        private final boolean sysoutUpdates;
 
-       /** true if a SubmitJobSuccess message has been received */
-       private boolean jobSuccessfullySubmitted = false;
-
-       /** true if a PoisonPill was taken */
-       private boolean terminated = false;
+       /** true if a PoisonPill about to be taken */
+       private boolean toBeTerminated = false;
 
        /** ActorRef to the current leader */
-       private ActorRef jobManager;
+       protected ActorRef jobManager;
 
        /** leader session ID of the JobManager when this actor was created */
-       private UUID leaderSessionID;
-
-       /** Actor which submits a job to the JobManager via this actor */
-       private ActorRef submitter;
+       protected UUID leaderSessionID;
 
-       /** JobGraph which shall be submitted to the JobManager */
-       private JobGraph jobGraph;
+       /** The client which the actor is responsible for */
+       protected ActorRef client;
 
        public JobClientActor(
                        LeaderRetrievalService leaderRetrievalService,
@@ -109,9 +94,27 @@ public class JobClientActor extends FlinkUntypedActor 
implements LeaderRetrieval
                }
        }
 
+       /**
+        * Hook to be called once a connection has been established with the 
JobManager.
+        */
+       protected abstract void connectedToJobManager();
+
+       /**
+        * Hook to handle custom client message which are not handled by the 
base class.
+        * @param message The message to be handled
+        */
+       protected abstract void handleCustomMessage(Object message);
+
+       /**
+        * Hook to let the client know about messages that should start a timer 
for a timeout
+        * @return The message class after which a timeout should be started
+        */
+       protected abstract Class getClientMessageClass();
+
+
        @Override
        protected void handleMessage(Object message) {
-               
+
                // =========== State Change Messages ===============
 
                if (message instanceof 
ExecutionGraphMessages.ExecutionStateChanged) {
@@ -149,79 +152,31 @@ public class JobClientActor extends FlinkUntypedActor 
implements LeaderRetrieval
                        JobManagerActorRef msg = (JobManagerActorRef) message;
                        connectToJobManager(msg.jobManager());
 
-                       logAndPrintMessage("Connected to JobManager at " +  
msg.jobManager());
+                       logAndPrintMessage("Connected to JobManager at " + 
msg.jobManager());
 
-                       if (jobGraph != null && !jobSuccessfullySubmitted) {
-                               // if we haven't yet submitted the job 
successfully
-                               tryToSubmitJob(jobGraph);
-                       }
+                       connectedToJobManager();
                }
 
                // =========== Job Life Cycle Messages ===============
-               
-               // submit a job to the JobManager
-               else if (message instanceof SubmitJobAndWait) {
-                       // only accept SubmitJobWait messages if we're not 
about to terminate
-                       if (!terminated) {
-                               // sanity check that this no job was submitted 
through this actor before -
-                               // it is a one-shot actor after all
-                               if (this.submitter == null) {
-                                       jobGraph = ((SubmitJobAndWait) 
message).jobGraph();
-                                       if (jobGraph == null) {
-                                               LOG.error("Received null 
JobGraph");
-                                               sender().tell(
-                                                       decorateMessage(new 
Status.Failure(new Exception("JobGraph is null"))),
-                                                       getSelf());
-                                       } else {
-                                               LOG.info("Received job {} 
({}).", jobGraph.getName(), jobGraph.getJobID());
-
-                                               this.submitter = getSender();
-
-                                               // is only successful if we 
already know the job manager leader
-                                               tryToSubmitJob(jobGraph);
-                                       }
-                               } else {
-                                       // repeated submission - tell failure 
to sender and kill self
-                                       String msg = "Received repeated 
'SubmitJobAndWait'";
-                                       LOG.error(msg);
-                                       getSender().tell(
-                                               decorateMessage(new 
Status.Failure(new Exception(msg))), ActorRef.noSender());
-
-                                       terminate();
-                               }
-                       } else {
-                               // we're about to receive a PoisonPill because 
terminated == true
-                               String msg = getClass().getName() + " is about 
to be terminated. Therefore, the " +
-                                       "job submission cannot be executed.";
-                               LOG.error(msg);
-                               getSender().tell(
-                                       decorateMessage(new Status.Failure(new 
Exception(msg))), ActorRef.noSender());
-                       }
-               }
+
                // acknowledgement to submit job is only logged, our original
-               // submitter is only interested in the final job result
-               else if (message instanceof JobManagerMessages.JobResultSuccess 
||
-                               message instanceof 
JobManagerMessages.JobResultFailure) {
-                       
+               // client is only interested in the final job result
+               else if (message instanceof 
JobManagerMessages.JobResultMessage) {
+
                        if (LOG.isDebugEnabled()) {
                                LOG.debug("Received {} message from 
JobManager", message.getClass().getSimpleName());
                        }
 
-                       // forward the success to the original job submitter
-                       if (hasJobBeenSubmitted()) {
-                               this.submitter.tell(decorateMessage(message), 
getSelf());
+                       // forward the success to the original client
+                       if (isClientConnected()) {
+                               this.client.tell(decorateMessage(message), 
getSelf());
                        }
 
                        terminate();
                }
-               else if (message instanceof 
JobManagerMessages.JobSubmitSuccess) {
-                       // job was successfully submitted :-)
-                       LOG.info("Job was successfully submitted to the 
JobManager {}.", getSender().path());
-                       jobSuccessfullySubmitted = true;
-               }
 
                // =========== Actor / Communication Failure / Timeouts 
===============
-               
+
                else if (message instanceof Terminated) {
                        ActorRef target = ((Terminated) message).getActor();
                        if (jobManager.equals(target)) {
@@ -234,7 +189,7 @@ public class JobClientActor extends FlinkUntypedActor 
implements LeaderRetrieval
                                // Important: The ConnectionTimeout message is 
filtered out in case that we are
                                // notified about a new leader by setting the 
new leader session ID, because
                                // ConnectionTimeout extends 
RequiresLeaderSessionID
-                               if (hasJobBeenSubmitted()) {
+                               if (isClientConnected()) {
                                        
getContext().system().scheduler().scheduleOnce(
                                                timeout,
                                                getSelf(),
@@ -245,49 +200,61 @@ public class JobClientActor extends FlinkUntypedActor 
implements LeaderRetrieval
                        } else {
                                LOG.warn("Received 'Terminated' for unknown 
actor " + target);
                        }
-               } else if 
(JobClientMessages.getConnectionTimeout().equals(message)) {
+               }
+               else if 
(JobClientMessages.getConnectionTimeout().equals(message)) {
                        // check if we haven't found a job manager yet
-                       if (!isConnected()) {
-                               if (hasJobBeenSubmitted()) {
-                                       submitter.tell(
-                                               decorateMessage(new 
Status.Failure(
-                                                       new 
JobClientActorConnectionTimeoutException("Lost connection to the 
JobManager."))),
+                       if (!isJobManagerConnected()) {
+                               final JobClientActorConnectionTimeoutException 
errorMessage =
+                                       new 
JobClientActorConnectionTimeoutException("Lost connection to the JobManager.");
+                               final Object replyMessage = decorateMessage(new 
Status.Failure(errorMessage));
+                               if (isClientConnected()) {
+                                       client.tell(
+                                               replyMessage,
                                                getSelf());
                                }
                                // Connection timeout reached, let's terminate
                                terminate();
                        }
-               } else if 
(JobClientMessages.getSubmissionTimeout().equals(message)) {
-                       // check if our job submission was successful in the 
meantime
-                       if (!jobSuccessfullySubmitted) {
-                               if (hasJobBeenSubmitted()) {
-                                       submitter.tell(
-                                               decorateMessage(new 
Status.Failure(
-                                                       new 
JobClientActorSubmissionTimeoutException("Job submission to the JobManager 
timed out. " +
-                                                               "You may 
increase '" + ConfigConstants.AKKA_CLIENT_TIMEOUT + "' in case the JobManager " 
+
-                                                               "needs more 
time to configure and confirm the job submission."))),
-                                               getSelf());
-                               }
-
-                               // We haven't heard back from the job manager 
after sending the job graph to him,
-                               // therefore terminate
-                               terminate();
-                       }
                }
 
-               // =========== Unknown Messages ===============
-               
+               // =========== Message Delegation ===============
+
+               else if (!isJobManagerConnected() && 
getClientMessageClass().equals(message.getClass())) {
+                       LOG.info(
+                               "Received {} but there is no connection to a 
JobManager yet.",
+                               message);
+                       // We want to submit/attach to a job, but we haven't 
found a job manager yet.
+                       // Let's give him another chance to find a job manager 
within the given timeout.
+                       getContext().system().scheduler().scheduleOnce(
+                               timeout,
+                               getSelf(),
+                               
decorateMessage(JobClientMessages.getConnectionTimeout()),
+                               getContext().dispatcher(),
+                               ActorRef.noSender()
+                       );
+                       handleCustomMessage(message);
+               }
                else {
-                       LOG.error("JobClient received unknown message: " + 
message);
+                       if (!toBeTerminated) {
+                               handleCustomMessage(message);
+                       } else {
+                               // we're about to receive a PoisonPill because 
toBeTerminated == true
+                               String msg = getClass().getName() + " is about 
to be terminated. Therefore, the " +
+                                       "job submission cannot be executed.";
+                               LOG.error(msg);
+                               getSender().tell(
+                                       decorateMessage(new Status.Failure(new 
Exception(msg))), ActorRef.noSender());
+                       }
                }
        }
 
+
        @Override
        protected UUID getLeaderSessionID() {
                return leaderSessionID;
        }
 
-       private void logAndPrintMessage(String message) {
+       protected void logAndPrintMessage(String message) {
                LOG.info(message);
                if (sysoutUpdates) {
                        System.out.println(message);
@@ -351,97 +318,19 @@ public class JobClientActor extends FlinkUntypedActor 
implements LeaderRetrieval
                getContext().watch(jobManager);
        }
 
-       private void tryToSubmitJob(final JobGraph jobGraph) {
-               this.jobGraph = jobGraph;
-
-               if (isConnected()) {
-                       LOG.info("Sending message to JobManager {} to submit 
job {} ({}) and wait for progress",
-                               jobManager.path().toString(), 
jobGraph.getName(), jobGraph.getJobID());
-
-                       Futures.future(new Callable<Object>() {
-                               @Override
-                               public Object call() throws Exception {
-                                       ActorGateway jobManagerGateway = new 
AkkaActorGateway(jobManager, leaderSessionID);
-
-                                       LOG.info("Upload jar files to job 
manager {}.", jobManager.path());
-
-                                       try {
-                                               
jobGraph.uploadUserJars(jobManagerGateway, timeout);
-                                       } catch (IOException exception) {
-                                               getSelf().tell(
-                                                       decorateMessage(new 
JobManagerMessages.JobResultFailure(
-                                                               new 
SerializedThrowable(
-                                                                       new 
JobSubmissionException(
-                                                                               
jobGraph.getJobID(),
-                                                                               
"Could not upload the jar files to the job manager.",
-                                                                               
exception)
-                                                               )
-                                                       )),
-                                                       ActorRef.noSender()
-                                               );
-                                       }
-
-                                       LOG.info("Submit job to the job manager 
{}.", jobManager.path());
-
-                                       jobManager.tell(
-                                               decorateMessage(
-                                                       new 
JobManagerMessages.SubmitJob(
-                                                               jobGraph,
-                                                               
ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)),
-                                               getSelf());
-
-                                       // issue a SubmissionTimeout message to 
check that we submit the job within
-                                       // the given timeout
-                                       
getContext().system().scheduler().scheduleOnce(
-                                               timeout,
-                                               getSelf(),
-                                               
decorateMessage(JobClientMessages.getSubmissionTimeout()),
-                                               getContext().dispatcher(),
-                                               ActorRef.noSender());
-
-                                       return null;
-                               }
-                       }, getContext().dispatcher());
-               } else {
-                       LOG.info("Could not submit job {} ({}), because there 
is no connection to a " +
-                                       "JobManager.",
-                               jobGraph.getName(), jobGraph.getJobID());
-
-                       // We want to submit a job, but we haven't found a job 
manager yet.
-                       // Let's give him another chance to find a job manager 
within the given timeout.
-                       getContext().system().scheduler().scheduleOnce(
-                               timeout,
-                               getSelf(),
-                               
decorateMessage(JobClientMessages.getConnectionTimeout()),
-                               getContext().dispatcher(),
-                               ActorRef.noSender()
-                       );
-               }
-       }
-
-       private void terminate() {
+       protected void terminate() {
                LOG.info("Terminate JobClientActor.");
-               terminated = true;
+               toBeTerminated = true;
                disconnectFromJobManager();
                getSelf().tell(decorateMessage(PoisonPill.getInstance()), 
ActorRef.noSender());
        }
 
-       private boolean isConnected() {
+       private boolean isJobManagerConnected() {
                return jobManager != ActorRef.noSender();
        }
 
-       private boolean hasJobBeenSubmitted() {
-               return submitter != ActorRef.noSender();
+       protected boolean isClientConnected() {
+               return client != ActorRef.noSender();
        }
 
-       public static Props createJobClientActorProps(
-                       LeaderRetrievalService leaderRetrievalService,
-                       FiniteDuration timeout,
-                       boolean sysoutUpdates) {
-               return Props.create(
-                       JobClientActor.class,
-                       leaderRetrievalService,
-                       timeout,
-                       sysoutUpdates);
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorRegistrationTimeoutException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorRegistrationTimeoutException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorRegistrationTimeoutException.java
new file mode 100644
index 0000000..e57d1b4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActorRegistrationTimeoutException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+/**
+ * Exception which is thrown by the {@link JobClientActor} if it has not heard 
back from the job
+ * manager after it has attempted to register for a job within a given timeout 
interval.
+ */
+public class JobClientActorRegistrationTimeoutException extends Exception {
+       private static final long serialVersionUID = 8762463142030454853L;
+
+       public JobClientActorRegistrationTimeoutException(String msg) {
+               super(msg);
+       }
+
+       public JobClientActorRegistrationTimeoutException(String msg, Throwable 
cause) {
+               super(msg, cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
new file mode 100644
index 0000000..b5d7cb7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.client;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The JobListeningContext holds the state necessary to monitor a running job 
and receive its results.
+ */
+public final class JobListeningContext {
+
+       private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+       /** The Job id of the Job */
+       private final JobID jobID;
+       /** The Future which is completed upon job completion */
+       private final Future<Object> jobResultFuture;
+       /** The JobClientActor which handles communication and monitoring of 
the job */
+       private final ActorRef jobClientActor;
+       /** Timeout used Asks */
+       private final FiniteDuration timeout;
+
+       /** ActorSystem for leader retrieval */
+       private ActorSystem actorSystem;
+       /** Flink configuration for initializing the BlobService */
+       private Configuration configuration;
+
+       /** The class loader (either provided at job submission or 
reconstructed when it is needed */
+       private ClassLoader classLoader;
+
+       /**
+        * Constructor to use when the class loader is available.
+        */
+       public JobListeningContext(
+               JobID jobID,
+               Future<Object> jobResultFuture,
+               ActorRef jobClientActor,
+               FiniteDuration timeout,
+               ClassLoader classLoader) {
+               this.jobID = checkNotNull(jobID);
+               this.jobResultFuture = checkNotNull(jobResultFuture);
+               this.jobClientActor = checkNotNull(jobClientActor);
+               this.timeout = checkNotNull(timeout);
+               this.classLoader = checkNotNull(classLoader);
+       }
+
+       /**
+        * Constructor to use when the class loader is not available.
+        */
+       public JobListeningContext(
+               JobID jobID,
+               Future<Object> jobResultFuture,
+               ActorRef jobClientActor,
+               FiniteDuration timeout,
+               ActorSystem actorSystem,
+               Configuration configuration) {
+               this.jobID = checkNotNull(jobID);
+               this.jobResultFuture = checkNotNull(jobResultFuture);
+               this.jobClientActor = checkNotNull(jobClientActor);
+               this.timeout = checkNotNull(timeout);
+               this.actorSystem = checkNotNull(actorSystem);
+               this.configuration = checkNotNull(configuration);
+       }
+
+       /**
+        * @return The Job ID that this context is bound to.
+        */
+       public JobID getJobID() {
+               return jobID;
+       }
+
+       /**
+        * @return The Future that eventually holds the result of the execution.
+        */
+       public Future<Object> getJobResultFuture() {
+               return jobResultFuture;
+       }
+
+       /**
+        * @return The Job Client actor which communicats with the JobManager.
+        */
+       public ActorRef getJobClientActor() {
+               return jobClientActor;
+       }
+
+       /**
+        * @return The default timeout of Akka asks
+        */
+       public FiniteDuration getTimeout() {
+               return timeout;
+       }
+
+       /**
+        * The class loader necessary to deserialize the result of a job 
execution,
+        * i.e. JobExecutionResult or Exceptions
+        * @return The class loader for the job id
+        * @throws JobRetrievalException if anything goes wrong
+        */
+       public ClassLoader getClassLoader() throws JobRetrievalException {
+               if (classLoader == null) {
+                       // lazily initializes the class loader when it is needed
+                       classLoader = JobClient.retrieveClassLoader(jobID, 
getJobManager(), configuration);
+                       LOG.info("Reconstructed class loader for Job {}", 
jobID);
+               }
+               return classLoader;
+       }
+
+       private ActorGateway getJobManager() throws JobRetrievalException {
+               try {
+                       return LeaderRetrievalUtils.retrieveLeaderGateway(
+                               
LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
+                               actorSystem,
+                               AkkaUtils.getLookupTimeout(configuration));
+               } catch (Exception e) {
+                       throw new JobRetrievalException(jobID, "Couldn't 
retrieve leading JobManager.", e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobRetrievalException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobRetrievalException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobRetrievalException.java
new file mode 100644
index 0000000..a92bddc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobRetrievalException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.JobID;
+
+/**
+ * Exception used to indicate that a job couldn't be retrieved from the 
JobManager
+ */
+public class JobRetrievalException extends JobExecutionException {
+
+       private static final long serialVersionUID = -42L;
+
+       public JobRetrievalException(JobID jobID, String msg, Throwable cause) {
+               super(jobID, msg, cause);
+       }
+
+       public JobRetrievalException(JobID jobID, String msg) {
+               super(jobID, msg);
+       }
+
+       public JobRetrievalException(JobID jobID, Throwable cause) {
+               super(jobID, cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
new file mode 100644
index 0000000..2cc4a50
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.Status;
+import akka.dispatch.Futures;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobClientMessages;
+import org.apache.flink.runtime.messages.JobClientMessages.SubmitJobAndWait;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.util.SerializedThrowable;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+
+/**
+ * Actor which handles Job submission process and provides Job updates until 
completion.
+ */
+public class JobSubmissionClientActor extends JobClientActor {
+
+       /** JobGraph which shall be submitted to the JobManager */
+       private JobGraph jobGraph;
+       /** true if a SubmitJobSuccess message has been received */
+       private boolean jobSuccessfullySubmitted = false;
+
+       public JobSubmissionClientActor(
+                       LeaderRetrievalService leaderRetrievalService,
+                       FiniteDuration timeout,
+                       boolean sysoutUpdates) {
+               super(leaderRetrievalService, timeout, sysoutUpdates);
+       }
+
+
+       @Override
+       public void connectedToJobManager() {
+               if (jobGraph != null && !jobSuccessfullySubmitted) {
+                       // if we haven't yet submitted the job successfully
+                       tryToSubmitJob();
+               }
+       }
+
+       @Override
+       protected Class getClientMessageClass() {
+               return SubmitJobAndWait.class;
+       }
+
+       @Override
+       public void handleCustomMessage(Object message) {
+               // submit a job to the JobManager
+               if (message instanceof SubmitJobAndWait) {
+                       // sanity check that this no job was submitted through 
this actor before -
+                       // it is a one-shot actor after all
+                       if (this.client == null) {
+                               jobGraph = ((SubmitJobAndWait) 
message).jobGraph();
+                               if (jobGraph == null) {
+                                       LOG.error("Received null JobGraph");
+                                       sender().tell(
+                                               decorateMessage(new 
Status.Failure(new Exception("JobGraph is null"))),
+                                               getSelf());
+                               } else {
+                                       LOG.info("Received job {} ({}).", 
jobGraph.getName(), jobGraph.getJobID());
+
+                                       this.client = getSender();
+
+                                       // is only successful if we already 
know the job manager leader
+                                       if (jobManager != null) {
+                                               tryToSubmitJob();
+                                       }
+                               }
+                       } else {
+                               // repeated submission - tell failure to sender 
and kill self
+                               String msg = "Received repeated 
'SubmitJobAndWait'";
+                               LOG.error(msg);
+                               getSender().tell(
+                                       decorateMessage(new Status.Failure(new 
Exception(msg))), ActorRef.noSender());
+
+                               terminate();
+                       }
+               } else if (message instanceof 
JobManagerMessages.JobSubmitSuccess) {
+                       // job was successfully submitted :-)
+                       LOG.info("Job {} was successfully submitted to the 
JobManager {}.",
+                               ((JobManagerMessages.JobSubmitSuccess) 
message).jobId(),
+                               getSender().path());
+                       jobSuccessfullySubmitted = true;
+               } else if 
(JobClientMessages.getSubmissionTimeout().equals(message)) {
+                       // check if our job submission was successful in the 
meantime
+                       if (!jobSuccessfullySubmitted) {
+                               if (isClientConnected()) {
+                                       client.tell(
+                                               decorateMessage(new 
Status.Failure(
+                                                       new 
JobClientActorSubmissionTimeoutException("Job submission to the JobManager 
timed out. " +
+                                                               "You may 
increase '" + ConfigConstants.AKKA_CLIENT_TIMEOUT + "' in case the JobManager " 
+
+                                                               "needs more 
time to configure and confirm the job submission."))),
+                                               getSelf());
+                               }
+
+                               // We haven't heard back from the job manager 
after sending the job graph to him,
+                               // therefore terminate
+                               terminate();
+                       }
+               } else {
+                       LOG.error("{} received unknown message: ", getClass());
+               }
+       }
+
+       private void tryToSubmitJob() {
+               LOG.info("Sending message to JobManager {} to submit job {} 
({}) and wait for progress",
+                       jobManager.path().toString(), jobGraph.getName(), 
jobGraph.getJobID());
+
+               Futures.future(new Callable<Object>() {
+                       @Override
+                       public Object call() throws Exception {
+                               ActorGateway jobManagerGateway = new 
AkkaActorGateway(jobManager, leaderSessionID);
+
+                               LOG.info("Upload jar files to job manager {}.", 
jobManager.path());
+
+                               try {
+                                       
jobGraph.uploadUserJars(jobManagerGateway, timeout);
+                               } catch (IOException exception) {
+                                       getSelf().tell(
+                                               decorateMessage(new 
JobManagerMessages.JobResultFailure(
+                                                       new SerializedThrowable(
+                                                               new 
JobSubmissionException(
+                                                                       
jobGraph.getJobID(),
+                                                                       "Could 
not upload the jar files to the job manager.",
+                                                                       
exception)
+                                                       )
+                                               )),
+                                               ActorRef.noSender()
+                                       );
+                               }
+
+                               LOG.info("Submit job to the job manager {}.", 
jobManager.path());
+
+                               jobManager.tell(
+                                       decorateMessage(
+                                               new 
JobManagerMessages.SubmitJob(
+                                                       jobGraph,
+                                                       
ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)),
+                                       getSelf());
+
+                               // issue a SubmissionTimeout message to check 
that we submit the job within
+                               // the given timeout
+                               getContext().system().scheduler().scheduleOnce(
+                                       timeout,
+                                       getSelf(),
+                                       
decorateMessage(JobClientMessages.getSubmissionTimeout()),
+                                       getContext().dispatcher(),
+                                       ActorRef.noSender());
+
+                               return null;
+                       }
+               }, getContext().dispatcher());
+       }
+
+
+       public static Props createActorProps(
+                       LeaderRetrievalService leaderRetrievalService,
+                       FiniteDuration timeout,
+                       boolean sysoutUpdates) {
+               return Props.create(
+                       JobSubmissionClientActor.class,
+                       leaderRetrievalService,
+                       timeout,
+                       sysoutUpdates);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 7a94c0f..d7e40a3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -931,6 +931,7 @@ public class ExecutionGraph {
                intermediateResults.clear();
                currentExecutions.clear();
                requiredJarFiles.clear();
+               requiredClasspaths.clear();
                jobStatusListeners.clear();
                executionListeners.clear();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
index 67d7a06..a84650c 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager
 import akka.actor.ActorRef
 import org.apache.flink.runtime.akka.ListeningBehaviour
 
+
 /**
  * Utility class to store job information on the [[JobManager]]. The JobInfo 
stores which actor
  * submitted the job, when the start time and, if already terminated, the end 
time was.
@@ -37,11 +38,14 @@ import org.apache.flink.runtime.akka.ListeningBehaviour
  * @param start Starting time
  */
 class JobInfo(
-  val client: ActorRef,
-  val listeningBehaviour: ListeningBehaviour,
+  client: ActorRef,
+  listeningBehaviour: ListeningBehaviour,
   val start: Long,
   val sessionTimeout: Long) extends Serializable {
 
+  val clients = scala.collection.mutable.HashSet[(ActorRef, 
ListeningBehaviour)]()
+  clients += ((client, listeningBehaviour))
+
   var sessionAlive = sessionTimeout > 0
 
   var lastActive = 0L
@@ -58,10 +62,62 @@ class JobInfo(
     }
   }
 
-  override def toString = s"JobInfo(client: $client ($listeningBehaviour), 
start: $start)"
+
+  /**
+    * Notifies all clients by sending a message
+    * @param message the message to send
+    */
+  def notifyClients(message: Any) = {
+    clients foreach {
+      case (clientActor, _) =>
+        clientActor ! message
+    }
+  }
+
+  /**
+    * Notifies all clients which are not of type detached
+    * @param message the message to sent to non-detached clients
+    */
+  def notifyNonDetachedClients(message: Any) = {
+    clients foreach {
+      case (clientActor, ListeningBehaviour.DETACHED) =>
+        // do nothing
+      case (clientActor, _) =>
+        clientActor ! message
+    }
+  }
+
+  /**
+    * Sends a message to job clients that match the listening behavior
+    * @param message the message to send to all clients
+    * @param listeningBehaviour the desired listening behaviour
+    */
+  def notifyClients(message: Any, listeningBehaviour: ListeningBehaviour) = {
+    clients foreach {
+      case (clientActor, `listeningBehaviour`) =>
+        clientActor ! message
+      case _ =>
+    }
+  }
 
   def setLastActive() =
     lastActive = System.currentTimeMillis()
+
+
+  override def toString = s"JobInfo(clients: ${clients.toString()}, start: 
$start)"
+
+  override def equals(other: Any): Boolean = other match {
+    case that: JobInfo =>
+      clients == that.clients &&
+        start == that.start &&
+        sessionTimeout == that.sessionTimeout
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    val state = Seq(clients, start, sessionTimeout)
+    state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+  }
 }
 
 object JobInfo{

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0587987..d35fb0a 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -19,18 +19,16 @@
 package org.apache.flink.runtime.jobmanager
 
 import java.io.{File, IOException}
-import java.net.{BindException, ServerSocket, UnknownHostException, 
InetAddress, InetSocketAddress}
+import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket, 
UnknownHostException}
 import java.lang.management.ManagementFactory
 import java.util.UUID
 import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
 import javax.management.ObjectName
 
-import akka.actor.Status.{Success, Failure}
+import akka.actor.Status.{Failure, Success}
 import akka.actor._
 import akka.pattern.ask
-
 import grizzled.slf4j.Logger
-
 import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration}
 import org.apache.flink.core.fs.FileSystem
@@ -41,8 +39,8 @@ import 
org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint._
-import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, 
SavepointStoreFactory, SavepointStore}
-import org.apache.flink.runtime.checkpoint.stats.{CheckpointStatsTracker, 
SimpleCheckpointStatsTracker, DisabledCheckpointStatsTracker}
+import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, 
SavepointStore, SavepointStoreFactory}
+import org.apache.flink.runtime.checkpoint.stats.{CheckpointStatsTracker, 
DisabledCheckpointStatsTracker, SimpleCheckpointStatsTracker}
 import org.apache.flink.runtime.client._
 import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
@@ -58,24 +56,22 @@ import org.apache.flink.runtime.jobgraph.{JobGraph, 
JobStatus, JobVertexID}
 import 
org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
 import org.apache.flink.runtime.leaderelection.{LeaderContender, 
LeaderElectionService, StandaloneLeaderElectionService}
-
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
 import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.messages.Messages.{Disconnect, Acknowledge}
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
 import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, 
SendStackTrace}
 import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, 
UpdateTaskExecutionState}
 import org.apache.flink.runtime.messages.accumulators.{AccumulatorMessage, 
AccumulatorResultStringsFound, AccumulatorResultsErroneous, 
AccumulatorResultsFound, RequestAccumulatorResults, 
RequestAccumulatorResultsStringified}
-import org.apache.flink.runtime.messages.checkpoint.{DeclineCheckpoint, 
AbstractCheckpointMessage, AcknowledgeCheckpoint}
-
+import 
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, 
AcknowledgeCheckpoint, DeclineCheckpoint}
 import org.apache.flink.runtime.messages.webmonitor.InfoMessage
 import org.apache.flink.runtime.messages.webmonitor._
 import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.runtime.process.ProcessReaper
-import org.apache.flink.runtime.query.{UnknownKvStateLocation, KvStateMessage}
-import 
org.apache.flink.runtime.query.KvStateMessage.{NotifyKvStateUnregistered, 
LookupKvStateLocation, NotifyKvStateRegistered}
+import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation}
+import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, 
NotifyKvStateRegistered, NotifyKvStateUnregistered}
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.taskmanager.TaskManager
@@ -83,7 +79,6 @@ import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, 
LogMessages}
 import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils}
-
 import org.jboss.netty.channel.ChannelException
 
 import scala.annotation.tailrec
@@ -479,6 +474,22 @@ class JobManager(
 
       submitJob(jobGraph, jobInfo)
 
+    case RegisterJobClient(jobID, listeningBehaviour) =>
+      val client = sender()
+      currentJobs.get(jobID) match {
+        case Some((executionGraph, jobInfo)) =>
+          log.info("Registering client for job $jobID")
+          jobInfo.clients += ((client, listeningBehaviour))
+          val listener = new StatusListenerMessenger(client, 
leaderSessionID.orNull)
+          executionGraph.registerJobStatusListener(listener)
+          if (listeningBehaviour == 
ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) {
+            executionGraph.registerExecutionListener(listener)
+          }
+          client ! decorateMessage(RegisterJobClientSuccess(jobID))
+        case None =>
+          client ! decorateMessage(JobNotFound(jobID))
+      }
+
     case RecoverSubmittedJob(submittedJobGraph) =>
       if (!currentJobs.contains(submittedJobGraph.getJobId)) {
         submitJob(
@@ -788,50 +799,53 @@ class JobManager(
               }
 
               // is the client waiting for the job result?
-              if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) {
-                newJobStatus match {
-                  case JobStatus.FINISHED =>
-                  try {
-                    val accumulatorResults = 
executionGraph.getAccumulatorsSerialized()
-                    val result = new SerializedJobExecutionResult(
-                      jobID,
-                      jobInfo.duration,
-                      accumulatorResults)
-
-                    jobInfo.client ! decorateMessage(JobResultSuccess(result))
-                  } catch {
-                    case e: Exception =>
-                      log.error(s"Cannot fetch final accumulators for job 
$jobID", e)
-                      val exception = new JobExecutionException(jobID,
-                        "Failed to retrieve accumulator results.", e)
+              newJobStatus match {
+                case JobStatus.FINISHED =>
+                try {
+                  val accumulatorResults = 
executionGraph.getAccumulatorsSerialized()
+                  val result = new SerializedJobExecutionResult(
+                    jobID,
+                    jobInfo.duration,
+                    accumulatorResults)
+
+                  jobInfo.notifyNonDetachedClients(
+                    decorateMessage(JobResultSuccess(result)))
+                } catch {
+                  case e: Exception =>
+                    log.error(s"Cannot fetch final accumulators for job 
$jobID", e)
+                    val exception = new JobExecutionException(jobID,
+                      "Failed to retrieve accumulator results.", e)
 
-                      jobInfo.client ! decorateMessage(JobResultFailure(
-                        new SerializedThrowable(exception)))
-                  }
+                    jobInfo.notifyNonDetachedClients(
+                      decorateMessage(JobResultFailure(
+                        new SerializedThrowable(exception))))
+                }
 
-                  case JobStatus.CANCELED =>
-                    // the error may be packed as a serialized throwable
-                    val unpackedError = SerializedThrowable.get(
-                      error, executionGraph.getUserClassLoader())
+                case JobStatus.CANCELED =>
+                  // the error may be packed as a serialized throwable
+                  val unpackedError = SerializedThrowable.get(
+                    error, executionGraph.getUserClassLoader())
 
-                    jobInfo.client ! decorateMessage(JobResultFailure(
+                  jobInfo.notifyNonDetachedClients(
+                    decorateMessage(JobResultFailure(
                       new SerializedThrowable(
-                        new JobCancellationException(jobID, "Job was 
cancelled.", unpackedError))))
+                        new JobCancellationException(jobID, "Job was 
cancelled.", unpackedError)))))
 
-                  case JobStatus.FAILED =>
-                    val unpackedError = SerializedThrowable.get(
-                      error, executionGraph.getUserClassLoader())
+                case JobStatus.FAILED =>
+                  val unpackedError = SerializedThrowable.get(
+                    error, executionGraph.getUserClassLoader())
 
-                    jobInfo.client ! decorateMessage(JobResultFailure(
+                  jobInfo.notifyNonDetachedClients(
+                    decorateMessage(JobResultFailure(
                       new SerializedThrowable(
-                        new JobExecutionException(jobID, "Job execution 
failed.", unpackedError))))
-
-                  case x =>
-                    val exception = new JobExecutionException(jobID, s"$x is 
not a terminal state.")
-                    jobInfo.client ! decorateMessage(JobResultFailure(
-                      new SerializedThrowable(exception)))
-                    throw exception
-                }
+                        new JobExecutionException(jobID, "Job execution 
failed.", unpackedError)))))
+
+                case x =>
+                  val exception = new JobExecutionException(jobID, s"$x is not 
a terminal state.")
+                  jobInfo.notifyNonDetachedClients(
+                    decorateMessage(JobResultFailure(
+                      new SerializedThrowable(exception))))
+                  throw exception
               }
             }(context.dispatcher)
           }
@@ -919,6 +933,18 @@ class JobManager(
           archive forward decorateMessage(RequestJob(jobID))
       }
 
+    case RequestClassloadingProps(jobID) =>
+      currentJobs.get(jobID) match {
+        case Some((graph, jobInfo)) =>
+          sender() ! decorateMessage(
+            ClassloadingProps(
+              libraryCacheManager.getBlobServerPort,
+              graph.getRequiredJarFiles,
+              graph.getRequiredClasspaths))
+        case None =>
+          sender() ! decorateMessage(JobNotFound(jobID))
+      }
+
     case RequestBlobManagerPort =>
       sender ! decorateMessage(libraryCacheManager.getBlobServerPort)
 
@@ -1052,11 +1078,10 @@ class JobManager(
    */
   private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: 
Boolean = false): Unit = {
     if (jobGraph == null) {
-      jobInfo.client ! decorateMessage(JobResultFailure(
-        new SerializedThrowable(
-          new JobSubmissionException(null, "JobGraph must not be null.")
-        )
-      ))
+      jobInfo.notifyClients(
+        decorateMessage(JobResultFailure(
+          new SerializedThrowable(
+            new JobSubmissionException(null, "JobGraph must not be null.")))))
     }
     else {
       val jobId = jobGraph.getJobID
@@ -1259,13 +1284,15 @@ class JobManager(
         executionGraph.registerJobStatusListener(
           new StatusListenerMessenger(self, leaderSessionID.orNull))
 
-        if (jobInfo.listeningBehaviour == 
ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) {
+        jobInfo.clients foreach {
           // the sender wants to be notified about state changes
-          val listener  = new StatusListenerMessenger(jobInfo.client, 
leaderSessionID.orNull)
-
-          executionGraph.registerExecutionListener(listener)
-          executionGraph.registerJobStatusListener(listener)
+          case (client, ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) 
=>
+            val listener  = new StatusListenerMessenger(client, 
leaderSessionID.orNull)
+            executionGraph.registerExecutionListener(listener)
+            executionGraph.registerJobStatusListener(listener)
+          case _ => // do nothing
         }
+
       } catch {
         case t: Throwable =>
           log.error(s"Failed to submit job $jobId ($jobName)", t)
@@ -1283,7 +1310,8 @@ class JobManager(
             new JobExecutionException(jobId, s"Failed to submit job $jobId 
($jobName)", t)
           }
 
-          jobInfo.client ! decorateMessage(JobResultFailure(new 
SerializedThrowable(rt)))
+          jobInfo.notifyClients(
+            decorateMessage(JobResultFailure(new SerializedThrowable(rt))))
           return
       }
 
@@ -1338,7 +1366,8 @@ class JobManager(
             }
           }
 
-          jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
+          jobInfo.notifyClients(
+            decorateMessage(JobSubmitSuccess(jobGraph.getJobID)))
 
           if (leaderElectionService.hasLeadership) {
             // There is a small chance that multiple job managers schedule the 
same job after if
@@ -1740,10 +1769,10 @@ class JobManager(
       future {
         eg.suspend(cause)
 
-        if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) {
-          jobInfo.client ! decorateMessage(
-            Failure(new JobExecutionException(jobID, "All jobs are cancelled 
and cleared.", cause)))
-        }
+        jobInfo.notifyNonDetachedClients(
+          decorateMessage(
+            Failure(
+              new JobExecutionException(jobID, "All jobs are cancelled and 
cleared.", cause))))
       }(context.dispatcher)
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
index a60fa7a..1f29e32 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.messages
 import java.util.UUID
 
 import akka.actor.ActorRef
+import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.jobgraph.JobGraph
 
 /**
@@ -29,7 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph
 object JobClientMessages {
 
   /**
-   * This message is sent to the JobClient (via ask) to submit a job and
+   * This message is sent to the JobClientActor (via ask) to submit a job and
    * get a response when the job execution has finished.
    * 
    * The response to this message is a
@@ -40,15 +41,11 @@ object JobClientMessages {
   case class SubmitJobAndWait(jobGraph: JobGraph)
 
   /**
-   * This message is sent to the JobClient (via ask) to submit a job and 
-   * return as soon as the result of the submit operation is known. 
-   *
-   * The response to this message is a
-   * [[org.apache.flink.api.common.JobSubmissionResult]]
-   *
-   * @param jobGraph The job to be executed.
-   */
-  case class SubmitJobDetached(jobGraph: JobGraph)
+    * This message is sent to the JobClientActor to ask it to register at the 
JobManager
+    * and then return once the job execution is complete.
+    * @param jobID The job id
+    */
+  case class AttachToJobAndWait(jobID: JobID)
 
   /** Notifies the JobClientActor about a new leader address and a leader 
session ID.
     *
@@ -66,9 +63,13 @@ object JobClientMessages {
   /** Message which is triggered when the submission timeout has been reached. 
*/
   case object SubmissionTimeout extends RequiresLeaderSessionID
 
-  /** Messaeg which is triggered when the connection timeout has been reached. 
*/
+  /** Message which is triggered when the JobClient registration at the 
JobManager times out */
+  case object RegistrationTimeout extends RequiresLeaderSessionID
+
+  /** Message which is triggered when the connection timeout has been reached. 
*/
   case object ConnectionTimeout extends RequiresLeaderSessionID
 
   def getSubmissionTimeout(): AnyRef = SubmissionTimeout
+  def getRegistrationTimeout(): AnyRef = RegistrationTimeout
   def getConnectionTimeout(): AnyRef = ConnectionTimeout
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 14f72b0..40c4dcf 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.messages
 
+import java.net.URL
 import java.util.UUID
 
 import akka.actor.ActorRef
@@ -69,6 +70,19 @@ object JobManagerMessages {
     extends RequiresLeaderSessionID
 
   /**
+    * Registers the sender of the message as the client for the provided job 
identifier.
+    * This message is acknowledged by the JobManager with 
[[RegisterJobClientSuccess]]
+    * or [[JobNotFound]] if the job was not running.
+    * @param jobID The job id of the job
+    * @param listeningBehaviour The types of updates which will be sent to the 
sender
+    * after registration
+    */
+  case class RegisterJobClient(
+      jobID: JobID,
+      listeningBehaviour: ListeningBehaviour)
+    extends RequiresLeaderSessionID
+
+  /**
    * Triggers the recovery of the job with the given ID.
    *
    * @param jobId ID of the job to recover
@@ -195,6 +209,23 @@ object JobManagerMessages {
   case object RequestTotalNumberOfSlots
 
   /**
+    * Requests all entities necessary for reconstructing a job class loader
+    * May respond with [[ClassloadingProps]] or [[JobNotFound]]
+    * @param jobId The job id of the registered job
+    */
+  case class RequestClassloadingProps(jobId: JobID)
+
+  /**
+    * Response to [[RequestClassloadingProps]]
+    * @param blobManagerPort The port of the blobManager
+    * @param requiredJarFiles The blob keys of the required jar files
+    * @param requiredClasspaths The urls of the required classpaths
+    */
+  case class ClassloadingProps(blobManagerPort: Integer,
+                               requiredJarFiles: java.util.List[BlobKey],
+                               requiredClasspaths: java.util.List[URL])
+
+  /**
    * Requests the port of the blob manager from the job manager. The result is 
sent back to the
    * sender as an [[Int]].
    */
@@ -218,16 +249,27 @@ object JobManagerMessages {
   case class JobSubmitSuccess(jobId: JobID)
 
   /**
+    * Denotes a successful registration of a JobClientActor for a running job
+    * @param jobId The job id of the registered job
+    */
+  case class RegisterJobClientSuccess(jobId: JobID)
+
+  /**
+    * Denotes messages which contain the result of a completed job execution
+    */
+  sealed trait JobResultMessage
+
+  /**
    * Denotes a successful job execution.
    * @param result The result of the job execution, in serialized form.
    */
-  case class JobResultSuccess(result: SerializedJobExecutionResult)
+  case class JobResultSuccess(result: SerializedJobExecutionResult) extends 
JobResultMessage
 
   /**
    * Denotes an unsuccessful job execution.
    * @param cause The exception that caused the job to fail, in serialized 
form.
    */
-  case class JobResultFailure(cause: SerializedThrowable)
+  case class JobResultFailure(cause: SerializedThrowable) extends 
JobResultMessage
 
 
   sealed trait CancellationResponse{
@@ -316,7 +358,7 @@ object JobManagerMessages {
 
   /**
    * Denotes that there is no job with [[jobID]] retrievable. This message can 
be the response of
-   * [[RequestJob]] or [[RequestJobStatus]].
+   * [[RequestJob]], [[RequestJobStatus]] or [[RegisterJobClient]].
    *
    * @param jobID
    */

Reply via email to