Repository: flink
Updated Branches:
  refs/heads/master e4343ba0d -> b05c3c1b0


[FLINK-4619] Answer with JobResultFailure if savepoint restore fails during 
submission

This closes #2498.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b05c3c1b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b05c3c1b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b05c3c1b

Branch: refs/heads/master
Commit: b05c3c1b01fc48674d01e138e5e3e628c823974f
Parents: e4343ba
Author: Maciek Próchniak <[email protected]>
Authored: Wed Sep 14 14:27:27 2016 +0200
Committer: Ufuk Celebi <[email protected]>
Committed: Wed Oct 19 11:41:26 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   |  6 ++-
 .../flink/runtime/jobmanager/JobSubmitTest.java | 44 +++++++++++++++-----
 .../test/checkpointing/RescalingITCase.java     | 15 ++-----
 .../test/checkpointing/SavepointITCase.java     |  6 +--
 4 files changed, 46 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b05c3c1b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 5dc9e24..18ded6f 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1302,6 +1302,8 @@ class JobManager(
                   executionGraph.restoreLatestCheckpointedState()
                 } catch {
                   case e: Exception =>
+                    jobInfo.notifyClients(
+                      decorateMessage(JobResultFailure(new 
SerializedThrowable(e))))
                     throw new SuppressRestartsException(e)
                 }
               }
@@ -1313,7 +1315,9 @@ class JobManager(
               case t: Throwable =>
                 // Don't restart the execution if this fails. Otherwise, the
                 // job graph will skip ZooKeeper in case of HA.
-                new SuppressRestartsException(t)
+                jobInfo.notifyClients(
+                  decorateMessage(JobResultFailure(new 
SerializedThrowable(t))))
+                throw new SuppressRestartsException(t)
             }
           }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b05c3c1b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index b4f1d3d..4ea3628 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -18,9 +18,12 @@
 
 package org.apache.flink.runtime.jobmanager;
 
-import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import org.apache.flink.api.common.ExecutionConfig;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -28,29 +31,25 @@ import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.client.JobExecutionException;
-import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
 import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.NetUtils;
-
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-
 import scala.Tuple2;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 /**
@@ -199,4 +198,29 @@ public class JobSubmitTest {
                        fail(e.getMessage());
                }
        }
+
+       @Test
+       public void testAnswerFailureWhenSavepointReadFails() throws Exception {
+               // create a simple job graph
+               JobGraph jg = createSimpleJobGraph();
+               jg.setSavepointPath("pathThatReallyDoesNotExist...");
+
+               // submit the job
+               Future<Object> submitFuture = jmGateway.ask(
+                       new JobManagerMessages.SubmitJob(jg, 
ListeningBehaviour.DETACHED), timeout);
+               Object result = Await.result(submitFuture, timeout);
+               assertEquals(JobManagerMessages.JobResultFailure.class, 
result.getClass());
+       }
+
+       private JobGraph createSimpleJobGraph() {
+               JobVertex jobVertex = new JobVertex("Vertex");
+
+               jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+               List<JobVertexID> vertexIdList = 
Collections.singletonList(jobVertex.getID());
+
+               JobGraph jg = new JobGraph("test job", jobVertex);
+               jg.setSnapshotSettings(new 
JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
+                       5000, 5000, 0L, 10));
+               return jg;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b05c3c1b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 48d720a..0e513fa 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -292,16 +291,10 @@ public class RescalingITCase extends TestLogger {
                        jobID = null;
 
                } catch (JobExecutionException exception) {
-                       if (exception.getCause() instanceof 
SuppressRestartsException) {
-                               SuppressRestartsException 
suppressRestartsException = (SuppressRestartsException) exception.getCause();
-
-                               if (suppressRestartsException.getCause() 
instanceof IllegalStateException) {
-                                       // we expect a IllegalStateException 
wrapped in a SuppressRestartsException wrapped
-                                       // in a JobExecutionException, because 
the job containing non-partitioned state
-                                       // is being rescaled
-                               } else {
-                                       throw exception;
-                               }
+                       if (exception.getCause() instanceof 
IllegalStateException) {
+                               // we expect a IllegalStateException wrapped
+                               // in a JobExecutionException, because the job 
containing non-partitioned state
+                               // is being rescaled
                        } else {
                                throw exception;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b05c3c1b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 74de942..92e1f41 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -33,8 +33,8 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -459,8 +459,8 @@ public class SavepointITCase extends TestLogger {
                                flink.submitJobAndWait(jobGraph, false);
                        }
                        catch (Exception e) {
-                               assertEquals(SuppressRestartsException.class, 
e.getCause().getClass());
-                               assertEquals(IllegalArgumentException.class, 
e.getCause().getCause().getClass());
+                               assertEquals(JobExecutionException.class, 
e.getClass());
+                               assertEquals(IllegalArgumentException.class, 
e.getCause().getClass());
                        }
                }
                finally {

Reply via email to