[FLINK-1556] [runtime] Fails jobs properly in case of a job submission exception

This closes #422


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ff91cd7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ff91cd7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ff91cd7

Branch: refs/heads/master
Commit: 4ff91cd7cd3e05637d70a56cfc6f5a4ba2f2501a
Parents: dbf589a
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Feb 19 12:44:32 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 19 19:07:02 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/program/Client.java |  20 +-
 .../apache/flink/client/program/ClientTest.java |   8 +-
 .../client/JobCancellationException.java        |  10 +-
 .../runtime/client/JobExecutionException.java   |  24 +-
 .../runtime/client/JobSubmissionException.java  |  37 +++
 .../runtime/client/JobTimeoutException.java     |  10 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   2 +-
 .../apache/flink/runtime/client/JobClient.scala |  40 ++--
 .../flink/runtime/jobmanager/JobManager.scala   |  68 +++---
 .../runtime/messages/JobmanagerMessages.scala   |  38 +---
 .../runtime/taskmanager/TaskManagerTest.java    |   2 -
 .../jobmanager/CoLocationConstraintITCase.scala |   6 +-
 .../runtime/jobmanager/JobManagerITCase.scala   |  92 ++++++--
 .../runtime/jobmanager/RecoveryITCase.scala     |  10 +-
 .../runtime/jobmanager/SlotSharingITCase.scala  |   7 +-
 .../TaskManagerFailsWithSlotSharingITCase.scala |  25 +-
 .../testingUtils/TestingJobManager.scala        |   1 +
 .../JobSubmissionFailsITCase.java               | 227 +++++++++++++++++++
 .../taskmanager/TaskManagerFailsITCase.scala    |  32 ++-
 19 files changed, 487 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java 
b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 1fc7696..bd364ac 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -45,13 +45,9 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.client.JobTimeoutException;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.messages.JobManagerMessages.SubmissionFailure;
-import org.apache.flink.runtime.messages.JobManagerMessages.SubmissionResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -342,24 +338,12 @@ public class Client {
                                return JobClient.submitJobAndWait(jobGraph, 
printStatusDuringExecution, client, timeout);
                        }
                        else {
-                               SubmissionResponse response = 
JobClient.submitJobDetached(jobGraph, client, timeout);
-                               if (response instanceof SubmissionFailure) {
-                                       SubmissionFailure failure = 
(SubmissionFailure) response;
-                                       throw new ProgramInvocationException(
-                                                       "Failed to submit the 
job to the JobManager.", failure.cause());
-                               }
+                               JobClient.submitJobDetached(jobGraph, client, 
timeout);
                        }
                } catch (JobExecutionException e) {
                        throw new ProgramInvocationException("The program 
execution failed.", e);
-               } catch (JobTimeoutException e) {
-                       throw new ProgramInvocationException("Lost connection 
to the JobManager.", e);
-               } catch (JobCancellationException e) {
-                       throw new ProgramInvocationException("The program has 
been canceled.", e);
-               } catch (ProgramInvocationException e) {
-                       // forward exception resulting from submission failure
-                       throw e;
                } catch (Exception e) {
-                       throw new ProgramInvocationException("Exception 
occurred during job execution.", e);
+                       throw new ProgramInvocationException("Unexpected 
exception while program execution.", e);
                }
                finally {
                        actorSystem.shutdown();

http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index b5b3ac6..f68a7ae 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.client.program;
 
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import akka.actor.Status;
 import akka.actor.UntypedActor;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
@@ -36,7 +37,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.net.NetUtils;
 import org.junit.After;
 
@@ -174,7 +174,7 @@ public class ClientTest {
                                // bam!
                        }
                        catch (Exception e) {
-                               fail("wrong exception");
+                               fail("wrong exception " + e);
                        }
 
                        verify(this.compilerMock, 
times(1)).compile(any(Plan.class));
@@ -225,7 +225,7 @@ public class ClientTest {
 
                @Override
                public void onReceive(Object message) throws Exception {
-                       getSender().tell(new 
JobManagerMessages.SubmissionSuccess(new JobID()), getSelf());
+                       getSender().tell(new Status.Success(new JobID()), 
getSelf());
                }
        }
 
@@ -233,7 +233,7 @@ public class ClientTest {
 
                @Override
                public void onReceive(Object message) throws Exception {
-                       getSender().tell(new 
JobManagerMessages.SubmissionFailure(new JobID(), new Exception("test")), 
getSelf());
+                       getSender().tell(new Status.Failure(new 
Exception("test")), getSelf());
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java
index 1a9a1d0..9f72db0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java
@@ -18,13 +18,17 @@
 
 package org.apache.flink.runtime.client;
 
+import org.apache.flink.runtime.jobgraph.JobID;
+
 /**
  * An exception which is thrown by the JobClient if a job is aborted as a 
result of a user
  * cancellation.
  */
-public class JobCancellationException extends Exception {
+public class JobCancellationException extends JobExecutionException {
+
+       private static final long serialVersionUID = 2818087325120827526L;
 
-       public JobCancellationException(final String msg, final Throwable 
cause){
-               super(msg, cause);
+       public JobCancellationException(final JobID jobID, final String msg, 
final Throwable cause){
+               super(jobID, msg, cause);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
index d5eb492..1a56a93 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.client;
 
+import org.apache.flink.runtime.jobgraph.JobID;
+
 /**
  * This exception is thrown by the {@link JobClient} if a job has been aborted 
as a result of an
  * error which occurred during the execution.
@@ -26,13 +28,33 @@ public class JobExecutionException extends Exception {
 
        private static final long serialVersionUID = 2818087325120827525L;
 
+       private JobID jobID;
+
        /**
         * Constructs a new job execution exception.
         * 
         * @param msg The cause for the execution exception.
         * @param cause The cause of the exception
         */
-       public JobExecutionException(String msg, Throwable cause) {
+       public JobExecutionException(final JobID jobID, final String msg, final 
Throwable cause) {
                super(msg, cause);
+
+               this.jobID = jobID;
+       }
+
+       public JobExecutionException(final JobID jobID, final String msg) {
+               super(msg);
+
+               this.jobID = jobID;
+       }
+
+       public JobExecutionException(final JobID jobID, final Throwable cause) {
+               super(cause);
+
+               this.jobID = jobID;
+       }
+
+       public JobID getJobID() {
+               return jobID;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java
new file mode 100644
index 0000000..3d672a5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.runtime.jobgraph.JobID;
+
+/**
+ * This exception denotes an error while submitting a job to the JobManager
+ */
+public class JobSubmissionException extends JobExecutionException {
+
+       private static final long serialVersionUID = 2818087325120827526L;
+
+       public JobSubmissionException(final JobID jobID, final String msg, 
final Throwable cause) {
+               super(jobID, msg, cause);
+       }
+
+       public JobSubmissionException(final JobID jobID, final String msg) {
+               super(jobID, msg);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java
index 2bd6ec5..10ef601 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java
@@ -18,12 +18,16 @@
 
 package org.apache.flink.runtime.client;
 
+import org.apache.flink.runtime.jobgraph.JobID;
+
 /**
  * An exception which is thrown by the JobClient if the job manager is no 
longer reachable.
  */
-public class JobTimeoutException extends Exception {
+public class JobTimeoutException extends JobExecutionException {
+
+       private static final long serialVersionUID = 2818087325120827529L;
 
-       public JobTimeoutException(final String msg, final Throwable cause) {
-               super(msg, cause);
+       public JobTimeoutException(final JobID jobID, final String msg, final 
Throwable cause) {
+               super(jobID, msg, cause);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 7198076..f7b13fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -406,7 +406,7 @@ public class ExecutionGraph implements Serializable {
                                        }
                                } else {
                                        // set the state of the job to failed
-                                       transitionState(current, 
JobStatus.FAILED, t);
+                                       transitionState(JobStatus.FAILING, 
JobStatus.FAILED, t);
                                }
                                
                                return;

http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
index 4a64f87..d3ceeaf 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
@@ -21,14 +21,14 @@ package org.apache.flink.runtime.client
 import java.io.IOException
 import java.net.{InetAddress, InetSocketAddress}
 
-import akka.actor.Status.Failure
+import akka.actor.Status.{Success, Failure}
 import akka.actor._
 import akka.pattern.{Patterns, ask}
 import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.ActorLogMessages
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.jobgraph.JobGraph
+import org.apache.flink.runtime.jobgraph.{JobID, JobGraph}
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.messages.JobClientMessages.{SubmitJobDetached, 
SubmitJobAndWait}
 import org.apache.flink.runtime.messages.JobManagerMessages._
@@ -82,24 +82,16 @@ Actor with ActorLogMessages with ActorLogging {
 class JobClientListener(jobSubmitter: ActorRef) extends Actor with 
ActorLogMessages with
 ActorLogging {
   override def receiveWithLogMessages: Receive = {
-    case SubmissionFailure(jobID, t) =>
-      System.out.println(s"Submission of job with ID $jobID was unsuccessful, 
" +
-        s"because ${t.getMessage}.")
+    case failure: Failure =>
+      jobSubmitter ! failure
+      self ! PoisonPill
 
-    case SubmissionSuccess(_) =>
+    case Success(_) =>
 
     case JobResultSuccess(_, duration, accumulatorResults) =>
       jobSubmitter ! new JobExecutionResult(duration, accumulatorResults)
       self ! PoisonPill
 
-    case JobResultCanceled(_, t) =>
-      jobSubmitter ! Failure(new JobCancellationException("The job has been 
cancelled.", t))
-      self ! PoisonPill
-
-    case JobResultFailed(_, t) =>
-      jobSubmitter ! Failure(new JobExecutionException("The job execution 
failed.", t))
-      self ! PoisonPill
-
     case msg =>
       // we have to use System.out.println here to avoid erroneous behavior 
for output redirection
       System.out.println(msg.toString)
@@ -211,7 +203,7 @@ object JobClient {
    *                                                               execution 
fails.
    * @return The job execution result
    */
-  @throws(classOf[Exception])
+  @throws(classOf[JobExecutionException])
   def submitJobAndWait(jobGraph: JobGraph, listenToStatusEvents: Boolean, 
jobClient: ActorRef)
                       (implicit timeout: FiniteDuration): JobExecutionResult = 
{
 
@@ -233,7 +225,8 @@ object JobClient {
             Await.result(jmStatus, timeout)
           } catch {
             case t: Throwable =>
-              throw new JobTimeoutException("Lost connection to job manager.", 
t)
+              throw new JobTimeoutException(jobGraph.getJobID, "Lost 
connection to " +
+                "job manager.", t)
           }
       }
     }
@@ -244,26 +237,25 @@ object JobClient {
   /**
    * Submits a job in detached mode. The method sends the corresponding 
[[JobGraph]] to the
    * JobClient specified by jobClient. The JobClient does not start a 
[[JobClientListener]] and
-   * simply returns the [[SubmissionResponse]] of the [[JobManager]]. The 
SubmissionResponse is
-   * then returned by this method.
+   * simply returns a possible failure on the [[JobManager]].
    *
    * @param jobGraph Flink job
    * @param jobClient ActorRef to the JobClient
    * @param timeout Tiemout for futures
    * @return The submission response
    */
-  @throws(classOf[Exception])
+  @throws(classOf[JobExecutionException])
   def submitJobDetached(jobGraph: JobGraph, jobClient: ActorRef)(implicit 
timeout: FiniteDuration):
-  SubmissionResponse = {
+  Unit = {
+
     val response = (jobClient ? SubmitJobDetached(jobGraph))(timeout)
 
     try {
-      Await.result(response.mapTo[SubmissionResponse], timeout)
+      Await.result(response, timeout)
     } catch {
       case timeout: TimeoutException =>
-        throw new JobTimeoutException("Timeout while waiting for the 
submission result.", timeout);
-      case t: Throwable =>
-        throw new JobExecutionException("Exception while waiting for the 
submission result.", t)
+        throw new JobTimeoutException(jobGraph.getJobID,
+          "Timeout while waiting for the submission result.", timeout);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index cb8acae..1399bce 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -21,10 +21,12 @@ package org.apache.flink.runtime.jobmanager
 import java.io.{IOException, File}
 import java.net.InetSocketAddress
 
-import akka.actor.Status.Failure
+import akka.actor.Status.{Success, Failure}
 import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, 
Configuration}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.blob.BlobServer
+import org.apache.flink.runtime.client.{JobSubmissionException, 
JobExecutionException,
+JobCancellationException}
 import org.apache.flink.runtime.executiongraph.{ExecutionJobVertex, 
ExecutionGraph}
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
@@ -254,31 +256,32 @@ class JobManager(val configuration: Configuration,
 
       sender ! NextInputSplit(serializedInputSplit)
 
-    case JobStatusChanged(jobID, newJobStatus, timeStamp, optionalMessage) =>
+    case JobStatusChanged(jobID, newJobStatus, timeStamp, error) =>
       currentJobs.get(jobID) match {
         case Some((executionGraph, jobInfo)) => executionGraph.getJobName
           log.info("Status of job {} ({}) changed to {} {}.",
             jobID, executionGraph.getJobName, newJobStatus,
-            if(optionalMessage == null) "" else optionalMessage.getMessage)
+            if(error == null) "" else error.getMessage)
 
           if(newJobStatus.isTerminalState) {
             jobInfo.end = timeStamp
 
-            // is the client waiting for the job result?
-            if(!jobInfo.detached) {
-              newJobStatus match {
-                case JobStatus.FINISHED =>
-                  val accumulatorResults = 
accumulatorManager.getJobAccumulatorResults(jobID)
-                  jobInfo.client ! JobResultSuccess(jobID, jobInfo.duration, 
accumulatorResults)
-                case JobStatus.CANCELED =>
-                  jobInfo.client ! JobResultCanceled(jobID, optionalMessage)
-                case JobStatus.FAILED =>
-                  jobInfo.client ! JobResultFailed(jobID, optionalMessage)
-                case x =>
-                  val exception = new IllegalStateException(s"$x is not a 
terminal state.")
-                  jobInfo.client ! JobResultFailed(jobID, exception)
-                  throw exception
-              }
+          // is the client waiting for the job result?
+            newJobStatus match {
+              case JobStatus.FINISHED if !jobInfo.detached =>
+                val accumulatorResults = 
accumulatorManager.getJobAccumulatorResults(jobID)
+                jobInfo.client ! JobResultSuccess(jobID, jobInfo.duration, 
accumulatorResults)
+              case JobStatus.CANCELED =>
+                jobInfo.client ! Failure(new JobCancellationException(jobID,
+                  "Job was cancelled.", error))
+              case JobStatus.FAILED =>
+                jobInfo.client ! Failure(new JobExecutionException(jobID,
+                  "Job execution failed.", error))
+              case x =>
+                val exception = new JobExecutionException(jobID, s"$x is not a 
" +
+                  "terminal state.")
+                jobInfo.client ! Failure(exception)
+                throw exception
             }
 
             removeJob(jobID)
@@ -381,14 +384,12 @@ class JobManager(val configuration: Configuration,
   private def submitJob(jobGraph: JobGraph, listenToEvents: Boolean, detached: 
Boolean): Unit = {
     try {
       if (jobGraph == null) {
-        sender ! akka.actor.Status.Failure(new 
IllegalArgumentException("JobGraph must not be" +
-          " null."))
+        sender ! Failure(new JobSubmissionException(null, "JobGraph must not 
be null."))
       } else {
         log.info(s"Received job ${jobGraph.getJobID} (${jobGraph.getName}).")
 
         if (jobGraph.getNumberOfVertices == 0) {
-          sender ! SubmissionFailure(jobGraph.getJobID, new 
IllegalArgumentException("Job is " +
-            "empty."))
+          sender ! Failure(new JobSubmissionException(jobGraph.getJobID ,"Job 
is empty."))
         } else {
           // Create the user code class loader
           libraryCacheManager.registerJob(jobGraph.getJobID, 
jobGraph.getUserJarBlobKeys)
@@ -415,10 +416,8 @@ class JobManager(val configuration: Configuration,
           }
 
           if (log.isDebugEnabled) {
-            log.debug(s"Running master initialization of job 
${jobGraph.getJobID} (${
-              jobGraph
-                .getName
-            }}).")
+            log.debug(s"Running master initialization of job 
${jobGraph.getJobID} " +
+              s"(${jobGraph.getName}).")
           }
 
           for (vertex <- jobGraph.getVertices.asScala) {
@@ -464,32 +463,31 @@ class JobManager(val configuration: Configuration,
 
           executionGraph.scheduleForExecution(scheduler)
 
-          sender ! SubmissionSuccess(jobGraph.getJobID)
+          sender ! Success(jobGraph.getJobID)
         }
       }
     } catch {
       case t: Throwable =>
-        log.error(t, "Job submission failed.")
+        log.error(t, "Job submission of job {} failed.", jobGraph.getJobID)
 
         currentJobs.get(jobGraph.getJobID) match {
           case Some((executionGraph, jobInfo)) =>
             /*
-             * Register self to be notified about job status changes in case 
that it did not happen
-             * before. That way the proper cleanup of the job is triggered in 
the JobStatusChanged
-             * handler.
-             */
+           * Register self to be notified about job status changes in case 
that it did not happen
+           * before. That way the proper cleanup of the job is triggered in 
the JobStatusChanged
+           * handler.
+           */
             if (!executionGraph.containsJobStatusListener(self)) {
               executionGraph.registerJobStatusListener(self)
             }
 
+            // let the execution graph fail, which will send a failure to the 
job client
             executionGraph.fail(t)
-
           case None =>
             libraryCacheManager.unregisterJob(jobGraph.getJobID)
             currentJobs.remove(jobGraph.getJobID)
+            sender ! Failure(t)
         }
-
-        sender ! SubmissionFailure(jobGraph.getJobID, t)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
index f25083f..8aef552 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala
@@ -179,10 +179,6 @@ object JobManagerMessages {
    */
   case class RequestFinalJobStatus(jobID: JobID)
 
-  sealed trait JobResult{
-    def jobID: JobID
-  }
-
   /**
    * Denotes a successful job execution.
    *
@@ -191,39 +187,7 @@ object JobManagerMessages {
    * @param accumulatorResults
    */
   case class JobResultSuccess(jobID: JobID, runtime: Long, accumulatorResults: 
java.util.Map[String,
-    AnyRef]) extends JobResult {}
-
-  /**
-   * Denotes a cancellation of the job.
-   * @param jobID
-   * @param t
-   */
-  case class JobResultCanceled(jobID: JobID, t: Throwable) extends JobResult
-
-  /**
-   * Denotes a failed job execution.
-   * @param jobID
-   * @param t
-   */
-  case class JobResultFailed(jobID: JobID, t: Throwable) extends JobResult
-
-  sealed trait SubmissionResponse{
-    def jobID: JobID
-  }
-
-  /**
-   * Denotes a successful job submission.
-   * @param jobID
-   */
-  case class SubmissionSuccess(jobID: JobID) extends SubmissionResponse
-
-  /**
-   * Denotes a failed job submission. The cause of the failure is denoted by 
[[cause]].
-   *
-   * @param jobID
-   * @param cause of the submission failure
-   */
-  case class SubmissionFailure(jobID: JobID, cause: Throwable) extends 
SubmissionResponse
+    AnyRef]) {}
 
   sealed trait CancellationResponse{
     def jobID: JobID

http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index a83886c..e6a487d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -55,8 +55,6 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import scala.None;
-import scala.None$;
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;

http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
index 2104034..28be994 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
@@ -19,13 +19,13 @@
 package org.apache.flink.runtime.jobmanager
 
 import akka.actor.ActorSystem
+import akka.actor.Status.Success
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.runtime.jobgraph.{JobGraph, DistributionPattern,
 AbstractJobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks.{Receiver, Sender}
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
-import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, 
SubmissionSuccess,
-SubmitJob}
+import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, 
SubmitJob}
 import org.apache.flink.runtime.testingUtils.TestingUtils
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
@@ -70,7 +70,7 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll with WrapA
       try {
         within(TestingUtils.TESTING_DURATION) {
           jm ! SubmitJob(jobGraph)
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsg(Success(jobGraph.getJobID))
 
           expectMsgType[JobResultSuccess]
         }

http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 89d5d43..7e2840a 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -20,9 +20,11 @@ package org.apache.flink.runtime.jobmanager
 
 import Tasks._
 import akka.actor.ActorSystem
+import akka.actor.Status.{Success, Failure}
 import akka.pattern.ask
 import akka.testkit.{ImplicitSender, TestKit}
 import akka.util.Timeout
+import org.apache.flink.runtime.client.JobExecutionException
 import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, 
DistributionPattern, JobGraph, ScheduleMode}
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
@@ -70,9 +72,14 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
         within(1 second) {
           jm ! SubmitJob(jobGraph)
 
-          expectMsg(SubmissionFailure(jobGraph.getJobID, new 
NoResourceAvailableException(1,1,0)))
+          val failure = expectMsgType[Failure]
 
-          expectMsg(JobResultFailed(jobGraph.getJobID, new 
NoResourceAvailableException(1,1,0)))
+          failure.cause match {
+            case e: JobExecutionException =>
+              jobGraph.getJobID should equal(e.getJobID)
+              new NoResourceAvailableException(1,1,0) should equal(e.getCause)
+            case e => fail(s"Received wrong exception of type $e.")
+          }
         }
 
         jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
@@ -103,7 +110,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
         within(TestingUtils.TESTING_DURATION) {
           jm ! SubmitJob(jobGraph)
 
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsg(Success(jobGraph.getJobID))
           val result = expectMsgType[JobResultSuccess]
 
           result.jobID should equal(jobGraph.getJobID)
@@ -133,7 +140,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
         within(TestingUtils.TESTING_DURATION) {
           jm ! SubmitJob(jobGraph)
 
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsg(Success(jobGraph.getJobID))
 
           val result = expectMsgType[JobResultSuccess]
 
@@ -168,7 +175,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
         within(TestingUtils.TESTING_DURATION) {
           jm ! SubmitJob(jobGraph)
 
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsg(Success(jobGraph.getJobID))
 
           val result = expectMsgType[JobResultSuccess]
 
@@ -203,7 +210,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
         within(TestingUtils.TESTING_DURATION) {
           jm ! SubmitJob(jobGraph)
 
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsg(Success(jobGraph.getJobID))
 
           expectMsgType[JobResultSuccess]
         }
@@ -240,8 +247,15 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
         within(TestingUtils.TESTING_DURATION) {
           jm ! SubmitJob(jobGraph)
 
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
-          expectMsgType[JobResultFailed]
+          expectMsg(Success(jobGraph.getJobID))
+          val failure = expectMsgType[Failure]
+
+          failure.cause match {
+            case e: JobExecutionException =>
+              jobGraph.getJobID should equal(e.getJobID)
+
+            case e => fail(s"Received wrong exception $e.")
+          }
         }
 
         jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
@@ -276,7 +290,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
       try {
         within(TestingUtils.TESTING_DURATION) {
           jm ! SubmitJob(jobGraph)
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsg(Success(jobGraph.getJobID))
 
           expectMsgType[JobResultSuccess]
         }
@@ -321,7 +335,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
         within(TestingUtils.TESTING_DURATION) {
           jm ! SubmitJob(jobGraph)
 
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsg(Success(jobGraph.getJobID))
 
           expectMsgType[JobResultSuccess]
 
@@ -359,8 +373,16 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
         within(TestingUtils.TESTING_DURATION) {
           jm ! SubmitJob(jobGraph)
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
-          expectMsgType[JobResultFailed]
+          expectMsg(Success(jobGraph.getJobID))
+
+          val failure = expectMsgType[Failure]
+
+          failure.cause match {
+            case e: JobExecutionException =>
+              jobGraph.getJobID should equal(e.getJobID)
+
+            case e => fail(s"Received wrong exception $e.")
+          }
         }
 
         jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
@@ -399,8 +421,15 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
         within(TestingUtils.TESTING_DURATION) {
           jm ! SubmitJob(jobGraph)
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
-          expectMsgType[JobResultFailed]
+          expectMsg(Success(jobGraph.getJobID))
+          val failure = expectMsgType[Failure]
+
+          failure.cause match {
+            case e: JobExecutionException =>
+              jobGraph.getJobID should equal(e.getJobID)
+
+            case e => fail(s"Received wrong exception $e.")
+          }
         }
 
         jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
@@ -431,8 +460,15 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
       try {
         within(TestingUtils.TESTING_DURATION) {
           jm ! SubmitJob(jobGraph)
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
-          expectMsgType[JobResultFailed]
+          expectMsg(Success(jobGraph.getJobID))
+          val failure = expectMsgType[Failure]
+
+          failure.cause match {
+            case e: JobExecutionException =>
+              jobGraph.getJobID should equal(e.getJobID)
+
+            case e => fail(s"Received wrong exception $e.")
+          }
         }
 
         jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
@@ -466,8 +502,15 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
           expectMsg(num_tasks)
 
           jm ! SubmitJob(jobGraph)
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
-          expectMsgType[JobResultFailed]
+          expectMsg(Success(jobGraph.getJobID))
+          val failure = expectMsgType[Failure]
+
+          failure.cause match {
+            case e: JobExecutionException =>
+              jobGraph.getJobID should equal(e.getJobID)
+
+            case e => fail(s"Received wrong exception $e.")
+          }
         }
 
         jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
@@ -506,8 +549,15 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
           expectMsg(num_tasks)
 
           jm ! SubmitJob(jobGraph)
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
-          expectMsgType[JobResultFailed]
+          expectMsg(Success(jobGraph.getJobID))
+          val failure = expectMsgType[Failure]
+
+          failure.cause match {
+            case e: JobExecutionException =>
+              jobGraph.getJobID should equal(e.getJobID)
+
+            case e => fail(s"Received wrong exception $e.")
+          }
         }
 
         jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
@@ -539,7 +589,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
         within(TestingUtils.TESTING_DURATION){
           jm ! SubmitJob(jobGraph)
 
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsg(Success(jobGraph.getJobID))
           expectMsgType[JobResultSuccess]
         }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index a7be14b..2c1f82f 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -18,14 +18,14 @@
 
 package org.apache.flink.runtime.jobmanager
 
+import akka.actor.Status.Success
 import akka.actor.{ActorRef, PoisonPill, ActorSystem}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, 
DistributionPattern,
 AbstractJobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingOnceReceiver, 
FailingOnceReceiver}
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
-import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, 
SubmissionSuccess,
-SubmitJob}
+import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, 
SubmitJob}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingUtils
 import org.junit.runner.RunWith
@@ -68,7 +68,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
         within(TestingUtils.TESTING_DURATION){
           jm ! SubmitJob(jobGraph)
 
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsg(Success(jobGraph.getJobID))
 
           val result = expectMsgType[JobResultSuccess]
 
@@ -111,7 +111,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
         within(TestingUtils.TESTING_DURATION){
           jm ! SubmitJob(jobGraph)
 
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsg(Success(jobGraph.getJobID))
 
           val result = expectMsgType[JobResultSuccess]
 
@@ -155,7 +155,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
         within(TestingUtils.TESTING_DURATION){
           jm ! SubmitJob(jobGraph)
 
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsg(Success(jobGraph.getJobID))
 
           jm ! WaitForAllVerticesToBeRunningOrFinished(jobGraph.getJobID)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
index 289f759..d719dc3 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
@@ -19,11 +19,12 @@
 package org.apache.flink.runtime.jobmanager
 
 import akka.actor.ActorSystem
+import akka.actor.Status.Success
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, 
DistributionPattern, JobGraph}
 import org.apache.flink.runtime.jobmanager.Tasks.{Sender, 
AgnosticBinaryReceiver, Receiver}
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
-import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, 
SubmissionSuccess, SubmitJob}
+import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, 
SubmitJob}
 import org.apache.flink.runtime.testingUtils.TestingUtils
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
@@ -65,7 +66,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
       try {
         within(TestingUtils.TESTING_DURATION) {
           jm ! SubmitJob(jobGraph)
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsg(Success(jobGraph.getJobID))
           expectMsgType[JobResultSuccess]
 
         }
@@ -108,7 +109,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
       try {
         within(TestingUtils.TESTING_DURATION) {
           jm ! SubmitJob(jobGraph)
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsg(Success(jobGraph.getJobID))
           expectMsgType[JobResultSuccess]
         }
       } finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
index 626c518..dcfc899 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
@@ -18,12 +18,14 @@
 
 package org.apache.flink.runtime.jobmanager
 
+import akka.actor.Status.{Failure, Success}
 import akka.actor.{Kill, ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
+import org.apache.flink.runtime.client.JobExecutionException
 import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, 
DistributionPattern, JobGraph}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
-import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailed, 
SubmissionSuccess, SubmitJob}
+import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingUtils
 import org.junit.runner.RunWith
@@ -68,7 +70,7 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
       try{
         within(TestingUtils.TESTING_DURATION) {
           jm ! SubmitJob(jobGraph)
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsg(Success(jobGraph.getJobID))
 
           jm ! WaitForAllVerticesToBeRunningOrFinished(jobID)
 
@@ -77,7 +79,13 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
           //kill task manager
           taskManagers(0) ! PoisonPill
 
-          expectMsgType[JobResultFailed]
+          val failure = expectMsgType[Failure]
+
+          failure.cause match {
+            case e: JobExecutionException =>
+              jobGraph.getJobID should equal(e.getJobID)
+            case e => fail(s"Received wrong exception $e.")
+          }
         }
       }finally{
         cluster.stop()
@@ -111,7 +119,7 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
       try{
         within(TestingUtils.TESTING_DURATION) {
           jm ! SubmitJob(jobGraph)
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsg(Success(jobGraph.getJobID))
 
           jm ! WaitForAllVerticesToBeRunningOrFinished(jobID)
           expectMsg(AllVerticesRunning(jobID))
@@ -119,7 +127,14 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
           //kill task manager
           taskManagers(0) ! Kill
 
-          expectMsgType[JobResultFailed]
+          val failure = expectMsgType[Failure]
+
+          failure.cause match {
+            case e: JobExecutionException =>
+              jobGraph.getJobID should equal(e.getJobID)
+
+            case e => fail(s"Received wrong exception $e.")
+          }
         }
       }finally{
         cluster.stop()

http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 15c55ea..566f661 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -114,6 +114,7 @@ trait TestingJobManager extends ActorLogMessages with 
WrapAsScala {
     case NotifyWhenTaskManagerTerminated(taskManager) =>
       val waiting = 
waitForTaskManagerToBeTerminated.getOrElse(taskManager.path.name, Set())
       waitForTaskManagerToBeTerminated += taskManager.path.name -> (waiting + 
sender)
+
     case msg@Terminated(taskManager) =>
       super.receiveWithLogMessages(msg)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
new file mode 100644
index 0000000..7da4125
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.failingPrograms;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(Parameterized.class)
+public class JobSubmissionFailsITCase {
+
+       private static ActorSystem system;
+
+       private static JobGraph workingJobGraph;
+
+       @BeforeClass
+       public static void setup() {
+               system = ActorSystem.create("TestingActorSystem", 
AkkaUtils.getDefaultAkkaConfig());
+
+               final AbstractJobVertex jobVertex = new 
AbstractJobVertex("Working job vertex.");
+               jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+
+               workingJobGraph = new JobGraph("Working testing job", 
jobVertex);
+       }
+
+       @AfterClass
+       public static void teardown() {
+               JavaTestKit.shutdownActorSystem(system);
+               system = null;
+       }
+
+       private boolean detached;
+
+       public JobSubmissionFailsITCase(boolean detached) {
+               this.detached = detached;
+       }
+
+       @Parameterized.Parameters(name = "Detached mode = {0}")
+       public static Collection<Boolean[]> executionModes(){
+               return Arrays.asList(new Boolean[]{false},
+                               new Boolean[]{true});
+       }
+
+       private JobExecutionResult submitJob(JobGraph jobGraph, ActorRef 
jobClient) throws Exception {
+               if(detached) {
+                       JobClient.submitJobDetached(jobGraph, jobClient, 
TestingUtils.TESTING_DURATION());
+                       return null;
+               } else {
+                       return JobClient.submitJobAndWait(jobGraph, false, 
jobClient, TestingUtils.TESTING_DURATION());
+               }
+       }
+
+       @Test
+       public void testExceptionInInitializeOnMaster() {
+               new JavaTestKit(system) {{
+                       final int numSlots = 20;
+
+                       final ForkableFlinkMiniCluster cluster =
+                                       
ForkableFlinkMiniCluster.startCluster(numSlots/2, 2,
+                                                       
TestingUtils.TESTING_DURATION().toString());
+
+                       final ActorRef jobClient = cluster.getJobClient();
+
+                       final AbstractJobVertex failingJobVertex = new 
FailingJobVertex("Failing job vertex");
+                       
failingJobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+
+                       final JobGraph failingJobGraph = new JobGraph("Failing 
testing job", failingJobVertex);
+
+                       try {
+                               new Within(TestingUtils.TESTING_DURATION()) {
+
+                                       @Override
+                                       protected void run() {
+                                               try {
+                                                       
submitJob(failingJobGraph, jobClient);
+                                                       fail("Expected 
JobExecutionException.");
+                                               } catch (JobExecutionException 
e) {
+                                                       assertEquals("Test 
exception.", e.getCause().getMessage());
+                                               } catch (Throwable t) {
+                                                       fail("Caught wrong 
exception of type " + t.getClass() + ".");
+                                                       t.printStackTrace();
+                                               }
+
+                                               try {
+                                                       
JobClient.submitJobAndWait(workingJobGraph, false, jobClient,
+                                                                       
TestingUtils.TESTING_DURATION());
+                                               } catch (Throwable t) {
+                                                       fail("Caught unexpected 
exception " + t.getMessage() + ".");
+                                               }
+                                       }
+                               };
+                       } finally {
+                               cluster.stop();
+                       }
+               }};
+       }
+
+       @Test
+       public void testSubmitEmptyJobGraph() {
+               new JavaTestKit(system) {{
+                       final int numSlots = 20;
+
+                       final ForkableFlinkMiniCluster cluster =
+                                       
ForkableFlinkMiniCluster.startCluster(numSlots/2, 2,
+                                                       
TestingUtils.TESTING_DURATION().toString());
+
+                       final ActorRef jobClient = cluster.getJobClient();
+
+                       final JobGraph jobGraph = new JobGraph("Testing job");
+
+                       try {
+                               new Within(TestingUtils.TESTING_DURATION()) {
+
+                                       @Override
+                                       protected void run() {
+                                               try {
+                                                       submitJob(jobGraph, 
jobClient);
+                                                       fail("Expected 
JobSubmissionException.");
+                                               } catch (JobSubmissionException 
e) {
+                                                       assertEquals("Job is 
empty.", e.getMessage());
+                                               } catch (Throwable t) {
+                                                       fail("Caught wrong 
exception of type " + t.getClass() + ".");
+                                                       t.printStackTrace();
+                                               }
+
+                                               try {
+                                                       
JobClient.submitJobAndWait(workingJobGraph, false, jobClient,
+                                                                       
TestingUtils.TESTING_DURATION());
+                                               } catch (Throwable t) {
+                                                       fail("Caught unexpected 
exception " + t.getMessage() + ".");
+                                               }
+                                       }
+                               };
+                       } finally {
+                               cluster.stop();
+                       }
+               }};
+       }
+
+       @Test
+       public void testSubmitNullJobGraph() {
+               new JavaTestKit(system) {{
+                       final int numSlots = 20;
+
+                       final ForkableFlinkMiniCluster cluster =
+                                       
ForkableFlinkMiniCluster.startCluster(numSlots/2, 2,
+                                                       
TestingUtils.TESTING_DURATION().toString());
+
+                       final ActorRef jobClient = cluster.getJobClient();
+
+                       try {
+                               new Within(TestingUtils.TESTING_DURATION()) {
+
+                                       @Override
+                                       protected void run() {
+                                               try {
+                                                       submitJob(null, 
jobClient);
+                                                       fail("Expected 
JobSubmissionException.");
+                                               } catch (JobSubmissionException 
e) {
+                                                       assertEquals("JobGraph 
must not be null.", e.getMessage());
+                                               } catch (Throwable t) {
+                                                       fail("Caught wrong 
exception of type " + t.getClass() + ".");
+                                                       t.printStackTrace();
+                                               }
+
+                                               try {
+                                                       
JobClient.submitJobAndWait(workingJobGraph, false, jobClient,
+                                                                       
TestingUtils.TESTING_DURATION());
+                                               } catch (Throwable t) {
+                                                       fail("Caught unexpected 
exception " + t.getMessage() + ".");
+                                               }
+                                       }
+                               };
+                       } finally {
+                               cluster.stop();
+                       }
+               }};
+       }
+
+       public static class FailingJobVertex extends AbstractJobVertex {
+               public FailingJobVertex(final String msg) {
+                       super(msg);
+               }
+
+               @Override
+               public void initializeOnMaster(ClassLoader loader) throws 
Exception {
+                       throw new Exception("Test exception.");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index a6ee119..60c67fb 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -18,15 +18,17 @@
 
 package org.apache.flink.api.scala.runtime.taskmanager
 
+import akka.actor.Status.{Failure, Success}
 import akka.actor.{ActorSystem, Kill, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.configuration.ConfigConstants
 import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.client.JobExecutionException
 import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, 
DistributionPattern, JobGraph}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
-import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailed, 
RequestNumberRegisteredTaskManager, SubmissionSuccess, SubmitJob}
+import 
org.apache.flink.runtime.messages.JobManagerMessages.{RequestNumberRegisteredTaskManager,
 SubmitJob}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingUtils
 import org.apache.flink.test.util.ForkableFlinkMiniCluster
@@ -103,7 +105,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
       try {
         within(TestingUtils.TESTING_DURATION) {
           jm ! SubmitJob(jobGraph)
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsg(Success(jobGraph.getJobID))
 
           jm ! WaitForAllVerticesToBeRunningOrFinished(jobID)
 
@@ -114,9 +116,17 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
           val tm = expectMsgType[WorkingTaskManager].taskManager
           // kill one task manager
           tm ! PoisonPill
-          expectMsgType[JobResultFailed]
+
+          val failure = expectMsgType[Failure]
+
+          failure.cause match {
+            case e: JobExecutionException =>
+              jobGraph.getJobID should equal(e.getJobID)
+
+            case e => fail(s"Received wrong exception $e.")
+          }
         }
-      }finally{
+      } finally {
         cluster.stop()
       }
     }
@@ -142,16 +152,24 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
       try {
         within(TestingUtils.TESTING_DURATION) {
           jm ! SubmitJob(jobGraph)
-          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+          expectMsg(Success(jobGraph.getJobID))
 
           jm ! WaitForAllVerticesToBeRunningOrFinished(jobID)
           expectMsg(AllVerticesRunning(jobID))
 
           // kill one task manager
           taskManagers(0) ! Kill
-          expectMsgType[JobResultFailed]
+
+          val failure = expectMsgType[Failure]
+
+          failure.cause match {
+            case e: JobExecutionException =>
+              jobGraph.getJobID should equal(e.getJobID)
+
+            case e => fail(s"Received wrong exception $e.")
+          }
         }
-      }finally{
+      } finally {
         cluster.stop()
       }
     }

Reply via email to