tillrohrmann closed pull request #7117: [BP-1.6][FLINK-10883] Failing batch
jobs with NoResourceAvailableException when slot request times out
URL: https://github.com/apache/flink/pull/7117
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index cbf51037b8b..e3b501e52e8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -41,6 +41,7 @@
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -427,7 +428,18 @@ public void setInitialState(@Nullable
JobManagerTaskRestore taskRestore) {
deploymentFuture.whenComplete(
(Void ignored, Throwable failure) -> {
if (failure != null) {
-
markFailed(ExceptionUtils.stripCompletionException(failure));
+ final Throwable
stripCompletionException = ExceptionUtils.stripCompletionException(failure);
+ final Throwable
schedulingFailureCause;
+
+ if (stripCompletionException
instanceof TimeoutException) {
+ schedulingFailureCause
= new NoResourceAvailableException(
+ "Could not
allocate enough slots within timeout of " + allocationTimeout + " to run the
job. " +
+ "Please
make sure that the cluster has enough resources.");
+ } else {
+ schedulingFailureCause
= stripCompletionException;
+ }
+
+
markFailed(schedulingFailureCause);
}
});
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3b55e009116..56315e07146 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -911,7 +911,7 @@ public void scheduleForExecution() throws JobException {
final CompletableFuture<Void>
schedulingJobVertexFuture = ejv.scheduleAll(
slotProvider,
allowQueuedScheduling,
- LocationPreferenceConstraint.ALL,//
since it is an input vertex, the input based location preferences should be
empty
+ LocationPreferenceConstraint.ALL, //
since it is an input vertex, the input based location preferences should be
empty
Collections.emptySet());
schedulingFutures.add(schedulingJobVertexFuture);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services