Repository: flink Updated Branches: refs/heads/master e4343ba0d -> b05c3c1b0
[FLINK-4619] Answer with JobResultFailure if savepoint restore fails during submission This closes #2498. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b05c3c1b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b05c3c1b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b05c3c1b Branch: refs/heads/master Commit: b05c3c1b01fc48674d01e138e5e3e628c823974f Parents: e4343ba Author: Maciek Próchniak <[email protected]> Authored: Wed Sep 14 14:27:27 2016 +0200 Committer: Ufuk Celebi <[email protected]> Committed: Wed Oct 19 11:41:26 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/jobmanager/JobManager.scala | 6 ++- .../flink/runtime/jobmanager/JobSubmitTest.java | 44 +++++++++++++++----- .../test/checkpointing/RescalingITCase.java | 15 ++----- .../test/checkpointing/SavepointITCase.java | 6 +-- 4 files changed, 46 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b05c3c1b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 5dc9e24..18ded6f 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1302,6 +1302,8 @@ class JobManager( executionGraph.restoreLatestCheckpointedState() } catch { case e: Exception => + jobInfo.notifyClients( + decorateMessage(JobResultFailure(new SerializedThrowable(e)))) throw new SuppressRestartsException(e) } } @@ -1313,7 +1315,9 @@ class JobManager( case t: Throwable => // Don't restart the execution if this fails. Otherwise, the // job graph will skip ZooKeeper in case of HA. - new SuppressRestartsException(t) + jobInfo.notifyClients( + decorateMessage(JobResultFailure(new SerializedThrowable(t)))) + throw new SuppressRestartsException(t) } } http://git-wip-us.apache.org/repos/asf/flink/blob/b05c3c1b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java index b4f1d3d..4ea3628 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java @@ -18,9 +18,12 @@ package org.apache.flink.runtime.jobmanager; -import akka.actor.ActorRef; import akka.actor.ActorSystem; -import org.apache.flink.api.common.ExecutionConfig; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -28,29 +31,25 @@ import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager; import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.util.NetUtils; - import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; - import scala.Tuple2; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.concurrent.TimeUnit; - import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; /** @@ -199,4 +198,29 @@ public class JobSubmitTest { fail(e.getMessage()); } } + + @Test + public void testAnswerFailureWhenSavepointReadFails() throws Exception { + // create a simple job graph + JobGraph jg = createSimpleJobGraph(); + jg.setSavepointPath("pathThatReallyDoesNotExist..."); + + // submit the job + Future<Object> submitFuture = jmGateway.ask( + new JobManagerMessages.SubmitJob(jg, ListeningBehaviour.DETACHED), timeout); + Object result = Await.result(submitFuture, timeout); + assertEquals(JobManagerMessages.JobResultFailure.class, result.getClass()); + } + + private JobGraph createSimpleJobGraph() { + JobVertex jobVertex = new JobVertex("Vertex"); + + jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); + List<JobVertexID> vertexIdList = Collections.singletonList(jobVertex.getID()); + + JobGraph jg = new JobGraph("test job", jobVertex); + jg.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, + 5000, 5000, 0L, 10)); + return jg; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/b05c3c1b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 48d720a..0e513fa 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -29,7 +29,6 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.messages.JobManagerMessages; @@ -292,16 +291,10 @@ public class RescalingITCase extends TestLogger { jobID = null; } catch (JobExecutionException exception) { - if (exception.getCause() instanceof SuppressRestartsException) { - SuppressRestartsException suppressRestartsException = (SuppressRestartsException) exception.getCause(); - - if (suppressRestartsException.getCause() instanceof IllegalStateException) { - // we expect a IllegalStateException wrapped in a SuppressRestartsException wrapped - // in a JobExecutionException, because the job containing non-partitioned state - // is being rescaled - } else { - throw exception; - } + if (exception.getCause() instanceof IllegalStateException) { + // we expect a IllegalStateException wrapped + // in a JobExecutionException, because the job containing non-partitioned state + // is being rescaled } else { throw exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/b05c3c1b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 74de942..92e1f41 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -33,8 +33,8 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.checkpoint.TaskState; import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1; +import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; -import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -459,8 +459,8 @@ public class SavepointITCase extends TestLogger { flink.submitJobAndWait(jobGraph, false); } catch (Exception e) { - assertEquals(SuppressRestartsException.class, e.getCause().getClass()); - assertEquals(IllegalArgumentException.class, e.getCause().getCause().getClass()); + assertEquals(JobExecutionException.class, e.getClass()); + assertEquals(IllegalArgumentException.class, e.getCause().getClass()); } } finally {
