This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fd6c3eea8c8c9d371e2cdc5e98ce650be5b5fc37
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Thu Nov 15 17:27:22 2018 +0100

    [FLINK-10883] Failing batch jobs with NoResourceAvailableException when 
slot request times out
    
    Instead of failing the ExecutionGraph with a generic TimeoutException if a 
slot request times out,
    this commit changes the exception to a more meaningful 
NoResourceAvailableException.
---
 .../org/apache/flink/runtime/executiongraph/Execution.java | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index cbf5103..e3b501e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -427,7 +428,18 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                        deploymentFuture.whenComplete(
                                (Void ignored, Throwable failure) -> {
                                        if (failure != null) {
-                                               
markFailed(ExceptionUtils.stripCompletionException(failure));
+                                               final Throwable 
stripCompletionException = ExceptionUtils.stripCompletionException(failure);
+                                               final Throwable 
schedulingFailureCause;
+
+                                               if (stripCompletionException 
instanceof TimeoutException) {
+                                                       schedulingFailureCause 
= new NoResourceAvailableException(
+                                                               "Could not 
allocate enough slots within timeout of " + allocationTimeout + " to run the 
job. " +
+                                                                       "Please 
make sure that the cluster has enough resources.");
+                                               } else {
+                                                       schedulingFailureCause 
= stripCompletionException;
+                                               }
+
+                                               
markFailed(schedulingFailureCause);
                                        }
                                });
 

Reply via email to