Repository: flink
Updated Branches:
  refs/heads/release-1.1 dc768d3b7 -> 662434837


[FLINK-4619] Answer with JobResultFailure if savepoint restore fails during 
submission

This closes #2498.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/66243483
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/66243483
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/66243483

Branch: refs/heads/release-1.1
Commit: 662434837deba0a2b76fc89d01224e127f86245c
Parents: dc768d3
Author: Maciek Próchniak <[email protected]>
Authored: Wed Sep 14 14:27:27 2016 +0200
Committer: Ufuk Celebi <[email protected]>
Committed: Tue Oct 25 14:33:03 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   | 28 ++++++-------
 .../flink/runtime/jobmanager/JobSubmitTest.java | 44 +++++++++++++++-----
 .../test/checkpointing/SavepointITCase.java     |  6 +--
 3 files changed, 49 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/66243483/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index f14a37f..a4c2403 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.jobmanager
 
 import java.io.{File, IOException}
-import java.net.{BindException, ServerSocket, UnknownHostException, 
InetAddress, InetSocketAddress}
 import java.lang.management.ManagementFactory
+import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket, 
UnknownHostException}
 import java.util.UUID
 import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
 import javax.management.ObjectName
@@ -28,27 +28,25 @@ import javax.management.ObjectName
 import akka.actor.Status.Failure
 import akka.actor._
 import akka.pattern.ask
-
 import grizzled.slf4j.Logger
-
 import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration}
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.io.InputSplitAssigner
-import org.apache.flink.metrics.{Gauge, MetricGroup}
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup
+import org.apache.flink.metrics.{Gauge, MetricGroup}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint._
-import org.apache.flink.runtime.checkpoint.savepoint.{SavepointStoreFactory, 
SavepointStore}
-import org.apache.flink.runtime.checkpoint.stats.{CheckpointStatsTracker, 
SimpleCheckpointStatsTracker, DisabledCheckpointStatsTracker}
+import org.apache.flink.runtime.checkpoint.savepoint.{SavepointStore, 
SavepointStoreFactory}
+import org.apache.flink.runtime.checkpoint.stats.{CheckpointStatsTracker, 
DisabledCheckpointStatsTracker, SimpleCheckpointStatsTracker}
 import org.apache.flink.runtime.client._
-import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.messages._
 import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.executiongraph.{ExecutionGraph, 
ExecutionJobVertex}
@@ -58,21 +56,18 @@ import org.apache.flink.runtime.jobgraph.{JobGraph, 
JobStatus, JobVertexID}
 import 
org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
 import org.apache.flink.runtime.leaderelection.{LeaderContender, 
LeaderElectionService, StandaloneLeaderElectionService}
-
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
 import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.messages.Messages.{Disconnect, Acknowledge}
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
 import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, 
SendStackTrace}
 import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, 
UpdateTaskExecutionState}
 import org.apache.flink.runtime.messages.accumulators.{AccumulatorMessage, 
AccumulatorResultStringsFound, AccumulatorResultsErroneous, 
AccumulatorResultsFound, RequestAccumulatorResults, 
RequestAccumulatorResultsStringified}
-import org.apache.flink.runtime.messages.checkpoint.{DeclineCheckpoint, 
AbstractCheckpointMessage, AcknowledgeCheckpoint}
-
-import org.apache.flink.runtime.messages.webmonitor.InfoMessage
-import org.apache.flink.runtime.messages.webmonitor._
-import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
+import 
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, 
AcknowledgeCheckpoint, DeclineCheckpoint}
+import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
+import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
@@ -81,7 +76,6 @@ import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, 
LogMessages}
 import org.apache.flink.util.{InstantiationUtil, NetUtils}
-
 import org.jboss.netty.channel.ChannelException
 
 import scala.annotation.tailrec
@@ -1302,6 +1296,7 @@ class JobManager(
                   executionGraph.restoreSavepoint(savepointPath)
                 } catch {
                   case e: Exception =>
+                    jobInfo.client ! decorateMessage(JobResultFailure(new 
SerializedThrowable(e)))
                     throw new SuppressRestartsException(e)
                 }
               }
@@ -1313,7 +1308,8 @@ class JobManager(
               case t: Throwable =>
                 // Don't restart the execution if this fails. Otherwise, the
                 // job graph will skip ZooKeeper in case of HA.
-                new SuppressRestartsException(t)
+                jobInfo.client ! decorateMessage(JobResultFailure(new 
SerializedThrowable(t)))
+                throw new SuppressRestartsException(t)
             }
           }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/66243483/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 959b9a7..93aed2d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -18,9 +18,12 @@
 
 package org.apache.flink.runtime.jobmanager;
 
-import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import org.apache.flink.api.common.ExecutionConfig;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -28,29 +31,25 @@ import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.client.JobExecutionException;
-import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
 import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.NetUtils;
-
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-
 import scala.Tuple2;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 /**
@@ -198,4 +197,29 @@ public class JobSubmitTest {
                        fail(e.getMessage());
                }
        }
+
+       @Test
+       public void testAnswerFailureWhenSavepointReadFails() throws Exception {
+               // create a simple job graph
+               JobGraph jg = createSimpleJobGraph();
+               jg.setSavepointPath("pathThatReallyDoesNotExist...");
+
+               // submit the job
+               Future<Object> submitFuture = jmGateway.ask(
+                       new JobManagerMessages.SubmitJob(jg, 
ListeningBehaviour.DETACHED), timeout);
+               Object result = Await.result(submitFuture, timeout);
+               assertEquals(JobManagerMessages.JobResultFailure.class, 
result.getClass());
+       }
+
+       private JobGraph createSimpleJobGraph() {
+               JobVertex jobVertex = new JobVertex("Vertex");
+
+               jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+               List<JobVertexID> vertexIdList = 
Collections.singletonList(jobVertex.getID());
+
+               JobGraph jg = new JobGraph("test job", jobVertex);
+               jg.setSnapshotSettings(new 
JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
+                       5000, 5000, 0L, 10));
+               return jg;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/66243483/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 0ed28ad..2d5bc3c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -35,8 +35,8 @@ import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV0;
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -742,8 +742,8 @@ public class SavepointITCase extends TestLogger {
                                flink.submitJobAndWait(jobGraph, false);
                        }
                        catch (Exception e) {
-                               assertEquals(SuppressRestartsException.class, 
e.getCause().getClass());
-                               assertEquals(IllegalArgumentException.class, 
e.getCause().getCause().getClass());
+                               assertEquals(JobExecutionException.class, 
e.getClass());
+                               assertEquals(IllegalArgumentException.class, 
e.getCause().getClass());
                        }
                }
                finally {

Reply via email to