This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f5db114940fad1b96b549801c80b312a9367a55a
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Thu Nov 15 17:27:22 2018 +0100

    [FLINK-10883] Failing batch jobs with NoResourceAvailableException when 
slot request times out
    
    Instead of failing the ExecutionGraph with a generic TimeoutException if a 
slot request times out,
    this commit changes the exception to a more meaningful 
NoResourceAvailableException.
---
 .../flink/runtime/executiongraph/Execution.java    | 14 +++++-
 .../runtime/minicluster/MiniClusterITCase.java     | 56 ++++++++++++++--------
 2 files changed, 50 insertions(+), 20 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index cbf5103..e3b501e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -427,7 +428,18 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                        deploymentFuture.whenComplete(
                                (Void ignored, Throwable failure) -> {
                                        if (failure != null) {
-                                               
markFailed(ExceptionUtils.stripCompletionException(failure));
+                                               final Throwable 
stripCompletionException = ExceptionUtils.stripCompletionException(failure);
+                                               final Throwable 
schedulingFailureCause;
+
+                                               if (stripCompletionException 
instanceof TimeoutException) {
+                                                       schedulingFailureCause 
= new NoResourceAvailableException(
+                                                               "Could not 
allocate enough slots within timeout of " + allocationTimeout + " to run the 
job. " +
+                                                                       "Please 
make sure that the cluster has enough resources.");
+                                               } else {
+                                                       schedulingFailureCause 
= stripCompletionException;
+                                               }
+
+                                               
markFailed(schedulingFailureCause);
                                        }
                                });
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index c108eae..585e1ba 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -101,7 +101,42 @@ public class MiniClusterITCase extends TestLogger {
        }
 
        @Test
-       public void testHandleJobsWhenNotEnoughSlot() throws Exception {
+       public void testHandleStreamingJobsWhenNotEnoughSlot() throws Exception 
{
+               try {
+                       
setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode.EAGER);
+                       fail("Job should fail.");
+               } catch (JobExecutionException e) {
+                       assertTrue(findThrowableWithMessage(e, "Job execution 
failed.").isPresent());
+                       assertTrue(findThrowable(e, 
NoResourceAvailableException.class).isPresent());
+                       assertTrue(findThrowableWithMessage(e, "Slots required: 
2, slots allocated: 1").isPresent());
+               }
+       }
+
+       @Test
+       public void testHandleBatchJobsWhenNotEnoughSlot() throws Exception {
+               try {
+                       
setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode.LAZY_FROM_SOURCES);
+                       fail("Job should fail.");
+               } catch (JobExecutionException e) {
+                       assertTrue(findThrowableWithMessage(e, "Job execution 
failed.").isPresent());
+                       assertTrue(findThrowable(e, 
NoResourceAvailableException.class).isPresent());
+                       assertTrue(findThrowableWithMessage(e, "Could not 
allocate enough slots").isPresent());
+               }
+       }
+
+       private void setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode 
scheduleMode) throws Exception {
+               final JobVertex vertex = new JobVertex("Test Vertex");
+               vertex.setParallelism(2);
+               vertex.setMaxParallelism(2);
+               vertex.setInvokableClass(BlockingNoOpInvokable.class);
+
+               final JobGraph jobGraph = new JobGraph("Test Job", vertex);
+               jobGraph.setScheduleMode(scheduleMode);
+
+               runHandleJobsWhenNotEnoughSlots(jobGraph);
+       }
+
+       private void runHandleJobsWhenNotEnoughSlots(final JobGraph jobGraph) 
throws Exception {
                final Configuration configuration = getDefaultConfiguration();
                configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 
100L);
 
@@ -114,24 +149,7 @@ public class MiniClusterITCase extends TestLogger {
                try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
                        miniCluster.start();
 
-                       final JobVertex vertex = new JobVertex("Test Vertex");
-                       vertex.setParallelism(2);
-                       vertex.setMaxParallelism(2);
-                       vertex.setInvokableClass(BlockingNoOpInvokable.class);
-
-                       final JobGraph jobGraph = new JobGraph("Test Job", 
vertex);
-                       jobGraph.setScheduleMode(ScheduleMode.EAGER);
-
-                       try {
-                               miniCluster.executeJobBlocking(jobGraph);
-
-                               fail("Job should fail.");
-                       } catch (JobExecutionException e) {
-                               assertTrue(findThrowableWithMessage(e, "Job 
execution failed.").isPresent());
-
-                               assertTrue(findThrowable(e, 
NoResourceAvailableException.class).isPresent());
-                               assertTrue(findThrowableWithMessage(e, "Slots 
required: 2, slots allocated: 1").isPresent());
-                       }
+                       miniCluster.executeJobBlocking(jobGraph);
                }
        }
 

Reply via email to