[FLINK-1556] [runtime] Fails jobs properly in case of a job submission exception
This closes #422 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ff91cd7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ff91cd7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ff91cd7 Branch: refs/heads/master Commit: 4ff91cd7cd3e05637d70a56cfc6f5a4ba2f2501a Parents: dbf589a Author: Till Rohrmann <trohrm...@apache.org> Authored: Thu Feb 19 12:44:32 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Feb 19 19:07:02 2015 +0100 ---------------------------------------------------------------------- .../org/apache/flink/client/program/Client.java | 20 +- .../apache/flink/client/program/ClientTest.java | 8 +- .../client/JobCancellationException.java | 10 +- .../runtime/client/JobExecutionException.java | 24 +- .../runtime/client/JobSubmissionException.java | 37 +++ .../runtime/client/JobTimeoutException.java | 10 +- .../runtime/executiongraph/ExecutionGraph.java | 2 +- .../apache/flink/runtime/client/JobClient.scala | 40 ++-- .../flink/runtime/jobmanager/JobManager.scala | 68 +++--- .../runtime/messages/JobmanagerMessages.scala | 38 +--- .../runtime/taskmanager/TaskManagerTest.java | 2 - .../jobmanager/CoLocationConstraintITCase.scala | 6 +- .../runtime/jobmanager/JobManagerITCase.scala | 92 ++++++-- .../runtime/jobmanager/RecoveryITCase.scala | 10 +- .../runtime/jobmanager/SlotSharingITCase.scala | 7 +- .../TaskManagerFailsWithSlotSharingITCase.scala | 25 +- .../testingUtils/TestingJobManager.scala | 1 + .../JobSubmissionFailsITCase.java | 227 +++++++++++++++++++ .../taskmanager/TaskManagerFailsITCase.scala | 32 ++- 19 files changed, 487 insertions(+), 172 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java index 1fc7696..bd364ac 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java @@ -45,13 +45,9 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.client.JobCancellationException; import org.apache.flink.runtime.client.JobClient; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.runtime.client.JobTimeoutException; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.messages.JobManagerMessages.SubmissionFailure; -import org.apache.flink.runtime.messages.JobManagerMessages.SubmissionResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -342,24 +338,12 @@ public class Client { return JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, client, timeout); } else { - SubmissionResponse response = JobClient.submitJobDetached(jobGraph, client, timeout); - if (response instanceof SubmissionFailure) { - SubmissionFailure failure = (SubmissionFailure) response; - throw new ProgramInvocationException( - "Failed to submit the job to the JobManager.", failure.cause()); - } + JobClient.submitJobDetached(jobGraph, client, timeout); } } catch (JobExecutionException e) { throw new ProgramInvocationException("The program execution failed.", e); - } catch (JobTimeoutException e) { - throw new ProgramInvocationException("Lost connection to the JobManager.", e); - } catch (JobCancellationException e) { - throw new ProgramInvocationException("The program has been canceled.", e); - } catch (ProgramInvocationException e) { - // forward exception resulting from submission failure - throw e; } catch (Exception e) { - throw new ProgramInvocationException("Exception occurred during job execution.", e); + throw new ProgramInvocationException("Unexpected exception while program execution.", e); } finally { actorSystem.shutdown(); http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index b5b3ac6..f68a7ae 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -20,6 +20,7 @@ package org.apache.flink.client.program; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.actor.Status; import akka.actor.UntypedActor; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; @@ -36,7 +37,6 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobmanager.JobManager; -import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.net.NetUtils; import org.junit.After; @@ -174,7 +174,7 @@ public class ClientTest { // bam! } catch (Exception e) { - fail("wrong exception"); + fail("wrong exception " + e); } verify(this.compilerMock, times(1)).compile(any(Plan.class)); @@ -225,7 +225,7 @@ public class ClientTest { @Override public void onReceive(Object message) throws Exception { - getSender().tell(new JobManagerMessages.SubmissionSuccess(new JobID()), getSelf()); + getSender().tell(new Status.Success(new JobID()), getSelf()); } } @@ -233,7 +233,7 @@ public class ClientTest { @Override public void onReceive(Object message) throws Exception { - getSender().tell(new JobManagerMessages.SubmissionFailure(new JobID(), new Exception("test")), getSelf()); + getSender().tell(new Status.Failure(new Exception("test")), getSelf()); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java index 1a9a1d0..9f72db0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java @@ -18,13 +18,17 @@ package org.apache.flink.runtime.client; +import org.apache.flink.runtime.jobgraph.JobID; + /** * An exception which is thrown by the JobClient if a job is aborted as a result of a user * cancellation. */ -public class JobCancellationException extends Exception { +public class JobCancellationException extends JobExecutionException { + + private static final long serialVersionUID = 2818087325120827526L; - public JobCancellationException(final String msg, final Throwable cause){ - super(msg, cause); + public JobCancellationException(final JobID jobID, final String msg, final Throwable cause){ + super(jobID, msg, cause); } } http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java index d5eb492..1a56a93 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.client; +import org.apache.flink.runtime.jobgraph.JobID; + /** * This exception is thrown by the {@link JobClient} if a job has been aborted as a result of an * error which occurred during the execution. @@ -26,13 +28,33 @@ public class JobExecutionException extends Exception { private static final long serialVersionUID = 2818087325120827525L; + private JobID jobID; + /** * Constructs a new job execution exception. * * @param msg The cause for the execution exception. * @param cause The cause of the exception */ - public JobExecutionException(String msg, Throwable cause) { + public JobExecutionException(final JobID jobID, final String msg, final Throwable cause) { super(msg, cause); + + this.jobID = jobID; + } + + public JobExecutionException(final JobID jobID, final String msg) { + super(msg); + + this.jobID = jobID; + } + + public JobExecutionException(final JobID jobID, final Throwable cause) { + super(cause); + + this.jobID = jobID; + } + + public JobID getJobID() { + return jobID; } } http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java new file mode 100644 index 0000000..3d672a5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.client; + +import org.apache.flink.runtime.jobgraph.JobID; + +/** + * This exception denotes an error while submitting a job to the JobManager + */ +public class JobSubmissionException extends JobExecutionException { + + private static final long serialVersionUID = 2818087325120827526L; + + public JobSubmissionException(final JobID jobID, final String msg, final Throwable cause) { + super(jobID, msg, cause); + } + + public JobSubmissionException(final JobID jobID, final String msg) { + super(jobID, msg); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java index 2bd6ec5..10ef601 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java @@ -18,12 +18,16 @@ package org.apache.flink.runtime.client; +import org.apache.flink.runtime.jobgraph.JobID; + /** * An exception which is thrown by the JobClient if the job manager is no longer reachable. */ -public class JobTimeoutException extends Exception { +public class JobTimeoutException extends JobExecutionException { + + private static final long serialVersionUID = 2818087325120827529L; - public JobTimeoutException(final String msg, final Throwable cause) { - super(msg, cause); + public JobTimeoutException(final JobID jobID, final String msg, final Throwable cause) { + super(jobID, msg, cause); } } http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 7198076..f7b13fd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -406,7 +406,7 @@ public class ExecutionGraph implements Serializable { } } else { // set the state of the job to failed - transitionState(current, JobStatus.FAILED, t); + transitionState(JobStatus.FAILING, JobStatus.FAILED, t); } return; http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala index 4a64f87..d3ceeaf 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala @@ -21,14 +21,14 @@ package org.apache.flink.runtime.client import java.io.IOException import java.net.{InetAddress, InetSocketAddress} -import akka.actor.Status.Failure +import akka.actor.Status.{Success, Failure} import akka.actor._ import akka.pattern.{Patterns, ask} import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.ActorLogMessages import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.jobgraph.JobGraph +import org.apache.flink.runtime.jobgraph.{JobID, JobGraph} import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.messages.JobClientMessages.{SubmitJobDetached, SubmitJobAndWait} import org.apache.flink.runtime.messages.JobManagerMessages._ @@ -82,24 +82,16 @@ Actor with ActorLogMessages with ActorLogging { class JobClientListener(jobSubmitter: ActorRef) extends Actor with ActorLogMessages with ActorLogging { override def receiveWithLogMessages: Receive = { - case SubmissionFailure(jobID, t) => - System.out.println(s"Submission of job with ID $jobID was unsuccessful, " + - s"because ${t.getMessage}.") + case failure: Failure => + jobSubmitter ! failure + self ! PoisonPill - case SubmissionSuccess(_) => + case Success(_) => case JobResultSuccess(_, duration, accumulatorResults) => jobSubmitter ! new JobExecutionResult(duration, accumulatorResults) self ! PoisonPill - case JobResultCanceled(_, t) => - jobSubmitter ! Failure(new JobCancellationException("The job has been cancelled.", t)) - self ! PoisonPill - - case JobResultFailed(_, t) => - jobSubmitter ! Failure(new JobExecutionException("The job execution failed.", t)) - self ! PoisonPill - case msg => // we have to use System.out.println here to avoid erroneous behavior for output redirection System.out.println(msg.toString) @@ -211,7 +203,7 @@ object JobClient { * execution fails. * @return The job execution result */ - @throws(classOf[Exception]) + @throws(classOf[JobExecutionException]) def submitJobAndWait(jobGraph: JobGraph, listenToStatusEvents: Boolean, jobClient: ActorRef) (implicit timeout: FiniteDuration): JobExecutionResult = { @@ -233,7 +225,8 @@ object JobClient { Await.result(jmStatus, timeout) } catch { case t: Throwable => - throw new JobTimeoutException("Lost connection to job manager.", t) + throw new JobTimeoutException(jobGraph.getJobID, "Lost connection to " + + "job manager.", t) } } } @@ -244,26 +237,25 @@ object JobClient { /** * Submits a job in detached mode. The method sends the corresponding [[JobGraph]] to the * JobClient specified by jobClient. The JobClient does not start a [[JobClientListener]] and - * simply returns the [[SubmissionResponse]] of the [[JobManager]]. The SubmissionResponse is - * then returned by this method. + * simply returns a possible failure on the [[JobManager]]. * * @param jobGraph Flink job * @param jobClient ActorRef to the JobClient * @param timeout Tiemout for futures * @return The submission response */ - @throws(classOf[Exception]) + @throws(classOf[JobExecutionException]) def submitJobDetached(jobGraph: JobGraph, jobClient: ActorRef)(implicit timeout: FiniteDuration): - SubmissionResponse = { + Unit = { + val response = (jobClient ? SubmitJobDetached(jobGraph))(timeout) try { - Await.result(response.mapTo[SubmissionResponse], timeout) + Await.result(response, timeout) } catch { case timeout: TimeoutException => - throw new JobTimeoutException("Timeout while waiting for the submission result.", timeout); - case t: Throwable => - throw new JobExecutionException("Exception while waiting for the submission result.", t) + throw new JobTimeoutException(jobGraph.getJobID, + "Timeout while waiting for the submission result.", timeout); } } http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index cb8acae..1399bce 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -21,10 +21,12 @@ package org.apache.flink.runtime.jobmanager import java.io.{IOException, File} import java.net.InetSocketAddress -import akka.actor.Status.Failure +import akka.actor.Status.{Success, Failure} import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration} import org.apache.flink.core.io.InputSplitAssigner import org.apache.flink.runtime.blob.BlobServer +import org.apache.flink.runtime.client.{JobSubmissionException, JobExecutionException, +JobCancellationException} import org.apache.flink.runtime.executiongraph.{ExecutionJobVertex, ExecutionGraph} import org.apache.flink.runtime.jobmanager.web.WebInfoServer import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph @@ -254,31 +256,32 @@ class JobManager(val configuration: Configuration, sender ! NextInputSplit(serializedInputSplit) - case JobStatusChanged(jobID, newJobStatus, timeStamp, optionalMessage) => + case JobStatusChanged(jobID, newJobStatus, timeStamp, error) => currentJobs.get(jobID) match { case Some((executionGraph, jobInfo)) => executionGraph.getJobName log.info("Status of job {} ({}) changed to {} {}.", jobID, executionGraph.getJobName, newJobStatus, - if(optionalMessage == null) "" else optionalMessage.getMessage) + if(error == null) "" else error.getMessage) if(newJobStatus.isTerminalState) { jobInfo.end = timeStamp - // is the client waiting for the job result? - if(!jobInfo.detached) { - newJobStatus match { - case JobStatus.FINISHED => - val accumulatorResults = accumulatorManager.getJobAccumulatorResults(jobID) - jobInfo.client ! JobResultSuccess(jobID, jobInfo.duration, accumulatorResults) - case JobStatus.CANCELED => - jobInfo.client ! JobResultCanceled(jobID, optionalMessage) - case JobStatus.FAILED => - jobInfo.client ! JobResultFailed(jobID, optionalMessage) - case x => - val exception = new IllegalStateException(s"$x is not a terminal state.") - jobInfo.client ! JobResultFailed(jobID, exception) - throw exception - } + // is the client waiting for the job result? + newJobStatus match { + case JobStatus.FINISHED if !jobInfo.detached => + val accumulatorResults = accumulatorManager.getJobAccumulatorResults(jobID) + jobInfo.client ! JobResultSuccess(jobID, jobInfo.duration, accumulatorResults) + case JobStatus.CANCELED => + jobInfo.client ! Failure(new JobCancellationException(jobID, + "Job was cancelled.", error)) + case JobStatus.FAILED => + jobInfo.client ! Failure(new JobExecutionException(jobID, + "Job execution failed.", error)) + case x => + val exception = new JobExecutionException(jobID, s"$x is not a " + + "terminal state.") + jobInfo.client ! Failure(exception) + throw exception } removeJob(jobID) @@ -381,14 +384,12 @@ class JobManager(val configuration: Configuration, private def submitJob(jobGraph: JobGraph, listenToEvents: Boolean, detached: Boolean): Unit = { try { if (jobGraph == null) { - sender ! akka.actor.Status.Failure(new IllegalArgumentException("JobGraph must not be" + - " null.")) + sender ! Failure(new JobSubmissionException(null, "JobGraph must not be null.")) } else { log.info(s"Received job ${jobGraph.getJobID} (${jobGraph.getName}).") if (jobGraph.getNumberOfVertices == 0) { - sender ! SubmissionFailure(jobGraph.getJobID, new IllegalArgumentException("Job is " + - "empty.")) + sender ! Failure(new JobSubmissionException(jobGraph.getJobID ,"Job is empty.")) } else { // Create the user code class loader libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys) @@ -415,10 +416,8 @@ class JobManager(val configuration: Configuration, } if (log.isDebugEnabled) { - log.debug(s"Running master initialization of job ${jobGraph.getJobID} (${ - jobGraph - .getName - }}).") + log.debug(s"Running master initialization of job ${jobGraph.getJobID} " + + s"(${jobGraph.getName}).") } for (vertex <- jobGraph.getVertices.asScala) { @@ -464,32 +463,31 @@ class JobManager(val configuration: Configuration, executionGraph.scheduleForExecution(scheduler) - sender ! SubmissionSuccess(jobGraph.getJobID) + sender ! Success(jobGraph.getJobID) } } } catch { case t: Throwable => - log.error(t, "Job submission failed.") + log.error(t, "Job submission of job {} failed.", jobGraph.getJobID) currentJobs.get(jobGraph.getJobID) match { case Some((executionGraph, jobInfo)) => /* - * Register self to be notified about job status changes in case that it did not happen - * before. That way the proper cleanup of the job is triggered in the JobStatusChanged - * handler. - */ + * Register self to be notified about job status changes in case that it did not happen + * before. That way the proper cleanup of the job is triggered in the JobStatusChanged + * handler. + */ if (!executionGraph.containsJobStatusListener(self)) { executionGraph.registerJobStatusListener(self) } + // let the execution graph fail, which will send a failure to the job client executionGraph.fail(t) - case None => libraryCacheManager.unregisterJob(jobGraph.getJobID) currentJobs.remove(jobGraph.getJobID) + sender ! Failure(t) } - - sender ! SubmissionFailure(jobGraph.getJobID, t) } } http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala index f25083f..8aef552 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala @@ -179,10 +179,6 @@ object JobManagerMessages { */ case class RequestFinalJobStatus(jobID: JobID) - sealed trait JobResult{ - def jobID: JobID - } - /** * Denotes a successful job execution. * @@ -191,39 +187,7 @@ object JobManagerMessages { * @param accumulatorResults */ case class JobResultSuccess(jobID: JobID, runtime: Long, accumulatorResults: java.util.Map[String, - AnyRef]) extends JobResult {} - - /** - * Denotes a cancellation of the job. - * @param jobID - * @param t - */ - case class JobResultCanceled(jobID: JobID, t: Throwable) extends JobResult - - /** - * Denotes a failed job execution. - * @param jobID - * @param t - */ - case class JobResultFailed(jobID: JobID, t: Throwable) extends JobResult - - sealed trait SubmissionResponse{ - def jobID: JobID - } - - /** - * Denotes a successful job submission. - * @param jobID - */ - case class SubmissionSuccess(jobID: JobID) extends SubmissionResponse - - /** - * Denotes a failed job submission. The cause of the failure is denoted by [[cause]]. - * - * @param jobID - * @param cause of the submission failure - */ - case class SubmissionFailure(jobID: JobID, cause: Throwable) extends SubmissionResponse + AnyRef]) {} sealed trait CancellationResponse{ def jobID: JobID http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index a83886c..e6a487d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -55,8 +55,6 @@ import org.apache.flink.runtime.testingUtils.TestingUtils; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.None; -import scala.None$; import scala.Option; import scala.concurrent.Await; import scala.concurrent.Future; http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala index 2104034..28be994 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala @@ -19,13 +19,13 @@ package org.apache.flink.runtime.jobmanager import akka.actor.ActorSystem +import akka.actor.Status.Success import akka.testkit.{ImplicitSender, TestKit} import org.apache.flink.runtime.jobgraph.{JobGraph, DistributionPattern, AbstractJobVertex} import org.apache.flink.runtime.jobmanager.Tasks.{Receiver, Sender} import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup -import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, SubmissionSuccess, -SubmitJob} +import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, SubmitJob} import org.apache.flink.runtime.testingUtils.TestingUtils import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner @@ -70,7 +70,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with WrapA try { within(TestingUtils.TESTING_DURATION) { jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) + expectMsg(Success(jobGraph.getJobID)) expectMsgType[JobResultSuccess] } http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala index 89d5d43..7e2840a 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala @@ -20,9 +20,11 @@ package org.apache.flink.runtime.jobmanager import Tasks._ import akka.actor.ActorSystem +import akka.actor.Status.{Success, Failure} import akka.pattern.ask import akka.testkit.{ImplicitSender, TestKit} import akka.util.Timeout +import org.apache.flink.runtime.client.JobExecutionException import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph, ScheduleMode} import org.apache.flink.runtime.messages.JobManagerMessages._ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved @@ -70,9 +72,14 @@ WordSpecLike with Matchers with BeforeAndAfterAll { within(1 second) { jm ! SubmitJob(jobGraph) - expectMsg(SubmissionFailure(jobGraph.getJobID, new NoResourceAvailableException(1,1,0))) + val failure = expectMsgType[Failure] - expectMsg(JobResultFailed(jobGraph.getJobID, new NoResourceAvailableException(1,1,0))) + failure.cause match { + case e: JobExecutionException => + jobGraph.getJobID should equal(e.getJobID) + new NoResourceAvailableException(1,1,0) should equal(e.getCause) + case e => fail(s"Received wrong exception of type $e.") + } } jm ! NotifyWhenJobRemoved(jobGraph.getJobID) @@ -103,7 +110,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll { within(TestingUtils.TESTING_DURATION) { jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) + expectMsg(Success(jobGraph.getJobID)) val result = expectMsgType[JobResultSuccess] result.jobID should equal(jobGraph.getJobID) @@ -133,7 +140,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll { within(TestingUtils.TESTING_DURATION) { jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) + expectMsg(Success(jobGraph.getJobID)) val result = expectMsgType[JobResultSuccess] @@ -168,7 +175,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll { within(TestingUtils.TESTING_DURATION) { jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) + expectMsg(Success(jobGraph.getJobID)) val result = expectMsgType[JobResultSuccess] @@ -203,7 +210,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll { within(TestingUtils.TESTING_DURATION) { jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) + expectMsg(Success(jobGraph.getJobID)) expectMsgType[JobResultSuccess] } @@ -240,8 +247,15 @@ WordSpecLike with Matchers with BeforeAndAfterAll { within(TestingUtils.TESTING_DURATION) { jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) - expectMsgType[JobResultFailed] + expectMsg(Success(jobGraph.getJobID)) + val failure = expectMsgType[Failure] + + failure.cause match { + case e: JobExecutionException => + jobGraph.getJobID should equal(e.getJobID) + + case e => fail(s"Received wrong exception $e.") + } } jm ! NotifyWhenJobRemoved(jobGraph.getJobID) @@ -276,7 +290,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll { try { within(TestingUtils.TESTING_DURATION) { jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) + expectMsg(Success(jobGraph.getJobID)) expectMsgType[JobResultSuccess] } @@ -321,7 +335,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll { within(TestingUtils.TESTING_DURATION) { jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) + expectMsg(Success(jobGraph.getJobID)) expectMsgType[JobResultSuccess] @@ -359,8 +373,16 @@ WordSpecLike with Matchers with BeforeAndAfterAll { within(TestingUtils.TESTING_DURATION) { jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) - expectMsgType[JobResultFailed] + expectMsg(Success(jobGraph.getJobID)) + + val failure = expectMsgType[Failure] + + failure.cause match { + case e: JobExecutionException => + jobGraph.getJobID should equal(e.getJobID) + + case e => fail(s"Received wrong exception $e.") + } } jm ! NotifyWhenJobRemoved(jobGraph.getJobID) @@ -399,8 +421,15 @@ WordSpecLike with Matchers with BeforeAndAfterAll { within(TestingUtils.TESTING_DURATION) { jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) - expectMsgType[JobResultFailed] + expectMsg(Success(jobGraph.getJobID)) + val failure = expectMsgType[Failure] + + failure.cause match { + case e: JobExecutionException => + jobGraph.getJobID should equal(e.getJobID) + + case e => fail(s"Received wrong exception $e.") + } } jm ! NotifyWhenJobRemoved(jobGraph.getJobID) @@ -431,8 +460,15 @@ WordSpecLike with Matchers with BeforeAndAfterAll { try { within(TestingUtils.TESTING_DURATION) { jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) - expectMsgType[JobResultFailed] + expectMsg(Success(jobGraph.getJobID)) + val failure = expectMsgType[Failure] + + failure.cause match { + case e: JobExecutionException => + jobGraph.getJobID should equal(e.getJobID) + + case e => fail(s"Received wrong exception $e.") + } } jm ! NotifyWhenJobRemoved(jobGraph.getJobID) @@ -466,8 +502,15 @@ WordSpecLike with Matchers with BeforeAndAfterAll { expectMsg(num_tasks) jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) - expectMsgType[JobResultFailed] + expectMsg(Success(jobGraph.getJobID)) + val failure = expectMsgType[Failure] + + failure.cause match { + case e: JobExecutionException => + jobGraph.getJobID should equal(e.getJobID) + + case e => fail(s"Received wrong exception $e.") + } } jm ! NotifyWhenJobRemoved(jobGraph.getJobID) @@ -506,8 +549,15 @@ WordSpecLike with Matchers with BeforeAndAfterAll { expectMsg(num_tasks) jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) - expectMsgType[JobResultFailed] + expectMsg(Success(jobGraph.getJobID)) + val failure = expectMsgType[Failure] + + failure.cause match { + case e: JobExecutionException => + jobGraph.getJobID should equal(e.getJobID) + + case e => fail(s"Received wrong exception $e.") + } } jm ! NotifyWhenJobRemoved(jobGraph.getJobID) @@ -539,7 +589,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll { within(TestingUtils.TESTING_DURATION){ jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) + expectMsg(Success(jobGraph.getJobID)) expectMsgType[JobResultSuccess] } http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala index a7be14b..2c1f82f 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala @@ -18,14 +18,14 @@ package org.apache.flink.runtime.jobmanager +import akka.actor.Status.Success import akka.actor.{ActorRef, PoisonPill, ActorSystem} import akka.testkit.{ImplicitSender, TestKit} import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, DistributionPattern, AbstractJobVertex} import org.apache.flink.runtime.jobmanager.Tasks.{BlockingOnceReceiver, FailingOnceReceiver} import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup -import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, SubmissionSuccess, -SubmitJob} +import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, SubmitJob} import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ import org.apache.flink.runtime.testingUtils.TestingUtils import org.junit.runner.RunWith @@ -68,7 +68,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll { within(TestingUtils.TESTING_DURATION){ jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) + expectMsg(Success(jobGraph.getJobID)) val result = expectMsgType[JobResultSuccess] @@ -111,7 +111,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll { within(TestingUtils.TESTING_DURATION){ jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) + expectMsg(Success(jobGraph.getJobID)) val result = expectMsgType[JobResultSuccess] @@ -155,7 +155,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll { within(TestingUtils.TESTING_DURATION){ jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) + expectMsg(Success(jobGraph.getJobID)) jm ! WaitForAllVerticesToBeRunningOrFinished(jobGraph.getJobID) http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala index 289f759..d719dc3 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala @@ -19,11 +19,12 @@ package org.apache.flink.runtime.jobmanager import akka.actor.ActorSystem +import akka.actor.Status.Success import akka.testkit.{ImplicitSender, TestKit} import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph} import org.apache.flink.runtime.jobmanager.Tasks.{Sender, AgnosticBinaryReceiver, Receiver} import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup -import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, SubmissionSuccess, SubmitJob} +import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, SubmitJob} import org.apache.flink.runtime.testingUtils.TestingUtils import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner @@ -65,7 +66,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll { try { within(TestingUtils.TESTING_DURATION) { jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) + expectMsg(Success(jobGraph.getJobID)) expectMsgType[JobResultSuccess] } @@ -108,7 +109,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll { try { within(TestingUtils.TESTING_DURATION) { jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) + expectMsg(Success(jobGraph.getJobID)) expectMsgType[JobResultSuccess] } } finally { http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala index 626c518..dcfc899 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala @@ -18,12 +18,14 @@ package org.apache.flink.runtime.jobmanager +import akka.actor.Status.{Failure, Success} import akka.actor.{Kill, ActorSystem, PoisonPill} import akka.testkit.{ImplicitSender, TestKit} +import org.apache.flink.runtime.client.JobExecutionException import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph} import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender} import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup -import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailed, SubmissionSuccess, SubmitJob} +import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ import org.apache.flink.runtime.testingUtils.TestingUtils import org.junit.runner.RunWith @@ -68,7 +70,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { try{ within(TestingUtils.TESTING_DURATION) { jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) + expectMsg(Success(jobGraph.getJobID)) jm ! WaitForAllVerticesToBeRunningOrFinished(jobID) @@ -77,7 +79,13 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { //kill task manager taskManagers(0) ! PoisonPill - expectMsgType[JobResultFailed] + val failure = expectMsgType[Failure] + + failure.cause match { + case e: JobExecutionException => + jobGraph.getJobID should equal(e.getJobID) + case e => fail(s"Received wrong exception $e.") + } } }finally{ cluster.stop() @@ -111,7 +119,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { try{ within(TestingUtils.TESTING_DURATION) { jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) + expectMsg(Success(jobGraph.getJobID)) jm ! WaitForAllVerticesToBeRunningOrFinished(jobID) expectMsg(AllVerticesRunning(jobID)) @@ -119,7 +127,14 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { //kill task manager taskManagers(0) ! Kill - expectMsgType[JobResultFailed] + val failure = expectMsgType[Failure] + + failure.cause match { + case e: JobExecutionException => + jobGraph.getJobID should equal(e.getJobID) + + case e => fail(s"Received wrong exception $e.") + } } }finally{ cluster.stop() http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala index 15c55ea..566f661 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -114,6 +114,7 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala { case NotifyWhenTaskManagerTerminated(taskManager) => val waiting = waitForTaskManagerToBeTerminated.getOrElse(taskManager.path.name, Set()) waitForTaskManagerToBeTerminated += taskManager.path.name -> (waiting + sender) + case msg@Terminated(taskManager) => super.receiveWithLogMessages(msg) http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java new file mode 100644 index 0000000..7da4125 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.failingPrograms; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobClient; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.jobgraph.AbstractJobVertex; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.Tasks; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@RunWith(Parameterized.class) +public class JobSubmissionFailsITCase { + + private static ActorSystem system; + + private static JobGraph workingJobGraph; + + @BeforeClass + public static void setup() { + system = ActorSystem.create("TestingActorSystem", AkkaUtils.getDefaultAkkaConfig()); + + final AbstractJobVertex jobVertex = new AbstractJobVertex("Working job vertex."); + jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); + + workingJobGraph = new JobGraph("Working testing job", jobVertex); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + private boolean detached; + + public JobSubmissionFailsITCase(boolean detached) { + this.detached = detached; + } + + @Parameterized.Parameters(name = "Detached mode = {0}") + public static Collection<Boolean[]> executionModes(){ + return Arrays.asList(new Boolean[]{false}, + new Boolean[]{true}); + } + + private JobExecutionResult submitJob(JobGraph jobGraph, ActorRef jobClient) throws Exception { + if(detached) { + JobClient.submitJobDetached(jobGraph, jobClient, TestingUtils.TESTING_DURATION()); + return null; + } else { + return JobClient.submitJobAndWait(jobGraph, false, jobClient, TestingUtils.TESTING_DURATION()); + } + } + + @Test + public void testExceptionInInitializeOnMaster() { + new JavaTestKit(system) {{ + final int numSlots = 20; + + final ForkableFlinkMiniCluster cluster = + ForkableFlinkMiniCluster.startCluster(numSlots/2, 2, + TestingUtils.TESTING_DURATION().toString()); + + final ActorRef jobClient = cluster.getJobClient(); + + final AbstractJobVertex failingJobVertex = new FailingJobVertex("Failing job vertex"); + failingJobVertex.setInvokableClass(Tasks.NoOpInvokable.class); + + final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex); + + try { + new Within(TestingUtils.TESTING_DURATION()) { + + @Override + protected void run() { + try { + submitJob(failingJobGraph, jobClient); + fail("Expected JobExecutionException."); + } catch (JobExecutionException e) { + assertEquals("Test exception.", e.getCause().getMessage()); + } catch (Throwable t) { + fail("Caught wrong exception of type " + t.getClass() + "."); + t.printStackTrace(); + } + + try { + JobClient.submitJobAndWait(workingJobGraph, false, jobClient, + TestingUtils.TESTING_DURATION()); + } catch (Throwable t) { + fail("Caught unexpected exception " + t.getMessage() + "."); + } + } + }; + } finally { + cluster.stop(); + } + }}; + } + + @Test + public void testSubmitEmptyJobGraph() { + new JavaTestKit(system) {{ + final int numSlots = 20; + + final ForkableFlinkMiniCluster cluster = + ForkableFlinkMiniCluster.startCluster(numSlots/2, 2, + TestingUtils.TESTING_DURATION().toString()); + + final ActorRef jobClient = cluster.getJobClient(); + + final JobGraph jobGraph = new JobGraph("Testing job"); + + try { + new Within(TestingUtils.TESTING_DURATION()) { + + @Override + protected void run() { + try { + submitJob(jobGraph, jobClient); + fail("Expected JobSubmissionException."); + } catch (JobSubmissionException e) { + assertEquals("Job is empty.", e.getMessage()); + } catch (Throwable t) { + fail("Caught wrong exception of type " + t.getClass() + "."); + t.printStackTrace(); + } + + try { + JobClient.submitJobAndWait(workingJobGraph, false, jobClient, + TestingUtils.TESTING_DURATION()); + } catch (Throwable t) { + fail("Caught unexpected exception " + t.getMessage() + "."); + } + } + }; + } finally { + cluster.stop(); + } + }}; + } + + @Test + public void testSubmitNullJobGraph() { + new JavaTestKit(system) {{ + final int numSlots = 20; + + final ForkableFlinkMiniCluster cluster = + ForkableFlinkMiniCluster.startCluster(numSlots/2, 2, + TestingUtils.TESTING_DURATION().toString()); + + final ActorRef jobClient = cluster.getJobClient(); + + try { + new Within(TestingUtils.TESTING_DURATION()) { + + @Override + protected void run() { + try { + submitJob(null, jobClient); + fail("Expected JobSubmissionException."); + } catch (JobSubmissionException e) { + assertEquals("JobGraph must not be null.", e.getMessage()); + } catch (Throwable t) { + fail("Caught wrong exception of type " + t.getClass() + "."); + t.printStackTrace(); + } + + try { + JobClient.submitJobAndWait(workingJobGraph, false, jobClient, + TestingUtils.TESTING_DURATION()); + } catch (Throwable t) { + fail("Caught unexpected exception " + t.getMessage() + "."); + } + } + }; + } finally { + cluster.stop(); + } + }}; + } + + public static class FailingJobVertex extends AbstractJobVertex { + public FailingJobVertex(final String msg) { + super(msg); + } + + @Override + public void initializeOnMaster(ClassLoader loader) throws Exception { + throw new Exception("Test exception."); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4ff91cd7/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala index a6ee119..60c67fb 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala @@ -18,15 +18,17 @@ package org.apache.flink.api.scala.runtime.taskmanager +import akka.actor.Status.{Failure, Success} import akka.actor.{ActorSystem, Kill, PoisonPill} import akka.testkit.{ImplicitSender, TestKit} import org.apache.flink.configuration.Configuration import org.apache.flink.configuration.ConfigConstants import org.apache.flink.runtime.akka.AkkaUtils +import org.apache.flink.runtime.client.JobExecutionException import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph} import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender} -import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailed, RequestNumberRegisteredTaskManager, SubmissionSuccess, SubmitJob} +import org.apache.flink.runtime.messages.JobManagerMessages.{RequestNumberRegisteredTaskManager, SubmitJob} import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ import org.apache.flink.runtime.testingUtils.TestingUtils import org.apache.flink.test.util.ForkableFlinkMiniCluster @@ -103,7 +105,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { try { within(TestingUtils.TESTING_DURATION) { jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) + expectMsg(Success(jobGraph.getJobID)) jm ! WaitForAllVerticesToBeRunningOrFinished(jobID) @@ -114,9 +116,17 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { val tm = expectMsgType[WorkingTaskManager].taskManager // kill one task manager tm ! PoisonPill - expectMsgType[JobResultFailed] + + val failure = expectMsgType[Failure] + + failure.cause match { + case e: JobExecutionException => + jobGraph.getJobID should equal(e.getJobID) + + case e => fail(s"Received wrong exception $e.") + } } - }finally{ + } finally { cluster.stop() } } @@ -142,16 +152,24 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { try { within(TestingUtils.TESTING_DURATION) { jm ! SubmitJob(jobGraph) - expectMsg(SubmissionSuccess(jobGraph.getJobID)) + expectMsg(Success(jobGraph.getJobID)) jm ! WaitForAllVerticesToBeRunningOrFinished(jobID) expectMsg(AllVerticesRunning(jobID)) // kill one task manager taskManagers(0) ! Kill - expectMsgType[JobResultFailed] + + val failure = expectMsgType[Failure] + + failure.cause match { + case e: JobExecutionException => + jobGraph.getJobID should equal(e.getJobID) + + case e => fail(s"Received wrong exception $e.") + } } - }finally{ + } finally { cluster.stop() } }