tillrohrmann closed pull request #7117: [BP-1.6][FLINK-10883] Failing batch 
jobs with NoResourceAvailableException when slot request times out
URL: https://github.com/apache/flink/pull/7117
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index cbf51037b8b..e3b501e52e8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -41,6 +41,7 @@
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -427,7 +428,18 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
                        deploymentFuture.whenComplete(
                                (Void ignored, Throwable failure) -> {
                                        if (failure != null) {
-                                               
markFailed(ExceptionUtils.stripCompletionException(failure));
+                                               final Throwable 
stripCompletionException = ExceptionUtils.stripCompletionException(failure);
+                                               final Throwable 
schedulingFailureCause;
+
+                                               if (stripCompletionException 
instanceof TimeoutException) {
+                                                       schedulingFailureCause 
= new NoResourceAvailableException(
+                                                               "Could not 
allocate enough slots within timeout of " + allocationTimeout + " to run the 
job. " +
+                                                                       "Please 
make sure that the cluster has enough resources.");
+                                               } else {
+                                                       schedulingFailureCause 
= stripCompletionException;
+                                               }
+
+                                               
markFailed(schedulingFailureCause);
                                        }
                                });
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3b55e009116..56315e07146 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -911,7 +911,7 @@ public void scheduleForExecution() throws JobException {
                                final CompletableFuture<Void> 
schedulingJobVertexFuture = ejv.scheduleAll(
                                        slotProvider,
                                        allowQueuedScheduling,
-                                       LocationPreferenceConstraint.ALL,// 
since it is an input vertex, the input based location preferences should be 
empty
+                                       LocationPreferenceConstraint.ALL, // 
since it is an input vertex, the input based location preferences should be 
empty
                                        Collections.emptySet());
 
                                
schedulingFutures.add(schedulingJobVertexFuture);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to