Repository: flink Updated Branches: refs/heads/release-1.1 dc768d3b7 -> 662434837
[FLINK-4619] Answer with JobResultFailure if savepoint restore fails during submission This closes #2498. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/66243483 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/66243483 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/66243483 Branch: refs/heads/release-1.1 Commit: 662434837deba0a2b76fc89d01224e127f86245c Parents: dc768d3 Author: Maciek Próchniak <[email protected]> Authored: Wed Sep 14 14:27:27 2016 +0200 Committer: Ufuk Celebi <[email protected]> Committed: Tue Oct 25 14:33:03 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/jobmanager/JobManager.scala | 28 ++++++------- .../flink/runtime/jobmanager/JobSubmitTest.java | 44 +++++++++++++++----- .../test/checkpointing/SavepointITCase.java | 6 +-- 3 files changed, 49 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/66243483/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index f14a37f..a4c2403 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -19,8 +19,8 @@ package org.apache.flink.runtime.jobmanager import java.io.{File, IOException} -import java.net.{BindException, ServerSocket, UnknownHostException, InetAddress, InetSocketAddress} import java.lang.management.ManagementFactory +import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket, UnknownHostException} import java.util.UUID import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException} import javax.management.ObjectName @@ -28,27 +28,25 @@ import javax.management.ObjectName import akka.actor.Status.Failure import akka.actor._ import akka.pattern.ask - import grizzled.slf4j.Logger - import org.apache.flink.api.common.{ExecutionConfig, JobID} import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} import org.apache.flink.core.fs.FileSystem import org.apache.flink.core.io.InputSplitAssigner -import org.apache.flink.metrics.{Gauge, MetricGroup} import org.apache.flink.metrics.groups.UnregisteredMetricsGroup +import org.apache.flink.metrics.{Gauge, MetricGroup} import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.blob.BlobServer import org.apache.flink.runtime.checkpoint._ -import org.apache.flink.runtime.checkpoint.savepoint.{SavepointStoreFactory, SavepointStore} -import org.apache.flink.runtime.checkpoint.stats.{CheckpointStatsTracker, SimpleCheckpointStatsTracker, DisabledCheckpointStatsTracker} +import org.apache.flink.runtime.checkpoint.savepoint.{SavepointStore, SavepointStoreFactory} +import org.apache.flink.runtime.checkpoint.stats.{CheckpointStatsTracker, DisabledCheckpointStatsTracker, SimpleCheckpointStatsTracker} import org.apache.flink.runtime.client._ -import org.apache.flink.runtime.execution.SuppressRestartsException import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.messages._ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.execution.SuppressRestartsException import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex} @@ -58,21 +56,18 @@ import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID} import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService, StandaloneLeaderElectionService} - import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged import org.apache.flink.runtime.messages.JobManagerMessages._ -import org.apache.flink.runtime.messages.Messages.{Disconnect, Acknowledge} +import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} import org.apache.flink.runtime.messages.RegistrationMessages._ import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace} import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState} import org.apache.flink.runtime.messages.accumulators.{AccumulatorMessage, AccumulatorResultStringsFound, AccumulatorResultsErroneous, AccumulatorResultsFound, RequestAccumulatorResults, RequestAccumulatorResultsStringified} -import org.apache.flink.runtime.messages.checkpoint.{DeclineCheckpoint, AbstractCheckpointMessage, AcknowledgeCheckpoint} - -import org.apache.flink.runtime.messages.webmonitor.InfoMessage -import org.apache.flink.runtime.messages.webmonitor._ -import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry} +import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint} +import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _} import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup +import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry} import org.apache.flink.runtime.process.ProcessReaper import org.apache.flink.runtime.security.SecurityUtils import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner @@ -81,7 +76,6 @@ import org.apache.flink.runtime.util._ import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} import org.apache.flink.util.{InstantiationUtil, NetUtils} - import org.jboss.netty.channel.ChannelException import scala.annotation.tailrec @@ -1302,6 +1296,7 @@ class JobManager( executionGraph.restoreSavepoint(savepointPath) } catch { case e: Exception => + jobInfo.client ! decorateMessage(JobResultFailure(new SerializedThrowable(e))) throw new SuppressRestartsException(e) } } @@ -1313,7 +1308,8 @@ class JobManager( case t: Throwable => // Don't restart the execution if this fails. Otherwise, the // job graph will skip ZooKeeper in case of HA. - new SuppressRestartsException(t) + jobInfo.client ! decorateMessage(JobResultFailure(new SerializedThrowable(t))) + throw new SuppressRestartsException(t) } } http://git-wip-us.apache.org/repos/asf/flink/blob/66243483/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java index 959b9a7..93aed2d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java @@ -18,9 +18,12 @@ package org.apache.flink.runtime.jobmanager; -import akka.actor.ActorRef; import akka.actor.ActorSystem; -import org.apache.flink.api.common.ExecutionConfig; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -28,29 +31,25 @@ import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager; import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.util.NetUtils; - import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; - import scala.Tuple2; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.concurrent.TimeUnit; - import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; /** @@ -198,4 +197,29 @@ public class JobSubmitTest { fail(e.getMessage()); } } + + @Test + public void testAnswerFailureWhenSavepointReadFails() throws Exception { + // create a simple job graph + JobGraph jg = createSimpleJobGraph(); + jg.setSavepointPath("pathThatReallyDoesNotExist..."); + + // submit the job + Future<Object> submitFuture = jmGateway.ask( + new JobManagerMessages.SubmitJob(jg, ListeningBehaviour.DETACHED), timeout); + Object result = Await.result(submitFuture, timeout); + assertEquals(JobManagerMessages.JobResultFailure.class, result.getClass()); + } + + private JobGraph createSimpleJobGraph() { + JobVertex jobVertex = new JobVertex("Vertex"); + + jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); + List<JobVertexID> vertexIdList = Collections.singletonList(jobVertex.getID()); + + JobGraph jg = new JobGraph("test job", jobVertex); + jg.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, + 5000, 5000, 0L, 10)); + return jg; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/66243483/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 0ed28ad..2d5bc3c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -35,8 +35,8 @@ import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.checkpoint.TaskState; import org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory; import org.apache.flink.runtime.checkpoint.savepoint.SavepointV0; +import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; -import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -742,8 +742,8 @@ public class SavepointITCase extends TestLogger { flink.submitJobAndWait(jobGraph, false); } catch (Exception e) { - assertEquals(SuppressRestartsException.class, e.getCause().getClass()); - assertEquals(IllegalArgumentException.class, e.getCause().getCause().getClass()); + assertEquals(JobExecutionException.class, e.getClass()); + assertEquals(IllegalArgumentException.class, e.getCause().getClass()); } } finally {
