tillrohrmann closed pull request #7115: [FLINK-10883] Failing batch jobs with 
NoResourceAvailableException when slot request times out
URL: https://github.com/apache/flink/pull/7115
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index cbf51037b8b..e3b501e52e8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -41,6 +41,7 @@
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -427,7 +428,18 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
                        deploymentFuture.whenComplete(
                                (Void ignored, Throwable failure) -> {
                                        if (failure != null) {
-                                               
markFailed(ExceptionUtils.stripCompletionException(failure));
+                                               final Throwable 
stripCompletionException = ExceptionUtils.stripCompletionException(failure);
+                                               final Throwable 
schedulingFailureCause;
+
+                                               if (stripCompletionException 
instanceof TimeoutException) {
+                                                       schedulingFailureCause 
= new NoResourceAvailableException(
+                                                               "Could not 
allocate enough slots within timeout of " + allocationTimeout + " to run the 
job. " +
+                                                                       "Please 
make sure that the cluster has enough resources.");
+                                               } else {
+                                                       schedulingFailureCause 
= stripCompletionException;
+                                               }
+
+                                               
markFailed(schedulingFailureCause);
                                        }
                                });
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3b55e009116..56315e07146 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -911,7 +911,7 @@ public void scheduleForExecution() throws JobException {
                                final CompletableFuture<Void> 
schedulingJobVertexFuture = ejv.scheduleAll(
                                        slotProvider,
                                        allowQueuedScheduling,
-                                       LocationPreferenceConstraint.ALL,// 
since it is an input vertex, the input based location preferences should be 
empty
+                                       LocationPreferenceConstraint.ALL, // 
since it is an input vertex, the input based location preferences should be 
empty
                                        Collections.emptySet());
 
                                
schedulingFutures.add(schedulingJobVertexFuture);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index c108eaee36a..585e1badf47 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -101,7 +101,42 @@ public void runJobWithMultipleRpcServices() throws 
Exception {
        }
 
        @Test
-       public void testHandleJobsWhenNotEnoughSlot() throws Exception {
+       public void testHandleStreamingJobsWhenNotEnoughSlot() throws Exception 
{
+               try {
+                       
setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode.EAGER);
+                       fail("Job should fail.");
+               } catch (JobExecutionException e) {
+                       assertTrue(findThrowableWithMessage(e, "Job execution 
failed.").isPresent());
+                       assertTrue(findThrowable(e, 
NoResourceAvailableException.class).isPresent());
+                       assertTrue(findThrowableWithMessage(e, "Slots required: 
2, slots allocated: 1").isPresent());
+               }
+       }
+
+       @Test
+       public void testHandleBatchJobsWhenNotEnoughSlot() throws Exception {
+               try {
+                       
setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode.LAZY_FROM_SOURCES);
+                       fail("Job should fail.");
+               } catch (JobExecutionException e) {
+                       assertTrue(findThrowableWithMessage(e, "Job execution 
failed.").isPresent());
+                       assertTrue(findThrowable(e, 
NoResourceAvailableException.class).isPresent());
+                       assertTrue(findThrowableWithMessage(e, "Could not 
allocate enough slots").isPresent());
+               }
+       }
+
+       private void setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode 
scheduleMode) throws Exception {
+               final JobVertex vertex = new JobVertex("Test Vertex");
+               vertex.setParallelism(2);
+               vertex.setMaxParallelism(2);
+               vertex.setInvokableClass(BlockingNoOpInvokable.class);
+
+               final JobGraph jobGraph = new JobGraph("Test Job", vertex);
+               jobGraph.setScheduleMode(scheduleMode);
+
+               runHandleJobsWhenNotEnoughSlots(jobGraph);
+       }
+
+       private void runHandleJobsWhenNotEnoughSlots(final JobGraph jobGraph) 
throws Exception {
                final Configuration configuration = getDefaultConfiguration();
                configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 
100L);
 
@@ -114,24 +149,7 @@ public void testHandleJobsWhenNotEnoughSlot() throws 
Exception {
                try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
                        miniCluster.start();
 
-                       final JobVertex vertex = new JobVertex("Test Vertex");
-                       vertex.setParallelism(2);
-                       vertex.setMaxParallelism(2);
-                       vertex.setInvokableClass(BlockingNoOpInvokable.class);
-
-                       final JobGraph jobGraph = new JobGraph("Test Job", 
vertex);
-                       jobGraph.setScheduleMode(ScheduleMode.EAGER);
-
-                       try {
-                               miniCluster.executeJobBlocking(jobGraph);
-
-                               fail("Job should fail.");
-                       } catch (JobExecutionException e) {
-                               assertTrue(findThrowableWithMessage(e, "Job 
execution failed.").isPresent());
-
-                               assertTrue(findThrowable(e, 
NoResourceAvailableException.class).isPresent());
-                               assertTrue(findThrowableWithMessage(e, "Slots 
required: 2, slots allocated: 1").isPresent());
-                       }
+                       miniCluster.executeJobBlocking(jobGraph);
                }
        }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to