tillrohrmann closed pull request #7116: [BP-1.7][FLINK-10883] Failing batch
jobs with NoResourceAvailableException when slot request times out
URL: https://github.com/apache/flink/pull/7116
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index cbf51037b8b..e3b501e52e8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -41,6 +41,7 @@
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -427,7 +428,18 @@ public void setInitialState(@Nullable
JobManagerTaskRestore taskRestore) {
deploymentFuture.whenComplete(
(Void ignored, Throwable failure) -> {
if (failure != null) {
-
markFailed(ExceptionUtils.stripCompletionException(failure));
+ final Throwable
stripCompletionException = ExceptionUtils.stripCompletionException(failure);
+ final Throwable
schedulingFailureCause;
+
+ if (stripCompletionException
instanceof TimeoutException) {
+ schedulingFailureCause
= new NoResourceAvailableException(
+ "Could not
allocate enough slots within timeout of " + allocationTimeout + " to run the
job. " +
+ "Please
make sure that the cluster has enough resources.");
+ } else {
+ schedulingFailureCause
= stripCompletionException;
+ }
+
+
markFailed(schedulingFailureCause);
}
});
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3b55e009116..56315e07146 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -911,7 +911,7 @@ public void scheduleForExecution() throws JobException {
final CompletableFuture<Void>
schedulingJobVertexFuture = ejv.scheduleAll(
slotProvider,
allowQueuedScheduling,
- LocationPreferenceConstraint.ALL,//
since it is an input vertex, the input based location preferences should be
empty
+ LocationPreferenceConstraint.ALL, //
since it is an input vertex, the input based location preferences should be
empty
Collections.emptySet());
schedulingFutures.add(schedulingJobVertexFuture);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index c108eaee36a..585e1badf47 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -101,7 +101,42 @@ public void runJobWithMultipleRpcServices() throws
Exception {
}
@Test
- public void testHandleJobsWhenNotEnoughSlot() throws Exception {
+ public void testHandleStreamingJobsWhenNotEnoughSlot() throws Exception
{
+ try {
+
setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode.EAGER);
+ fail("Job should fail.");
+ } catch (JobExecutionException e) {
+ assertTrue(findThrowableWithMessage(e, "Job execution
failed.").isPresent());
+ assertTrue(findThrowable(e,
NoResourceAvailableException.class).isPresent());
+ assertTrue(findThrowableWithMessage(e, "Slots required:
2, slots allocated: 1").isPresent());
+ }
+ }
+
+ @Test
+ public void testHandleBatchJobsWhenNotEnoughSlot() throws Exception {
+ try {
+
setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode.LAZY_FROM_SOURCES);
+ fail("Job should fail.");
+ } catch (JobExecutionException e) {
+ assertTrue(findThrowableWithMessage(e, "Job execution
failed.").isPresent());
+ assertTrue(findThrowable(e,
NoResourceAvailableException.class).isPresent());
+ assertTrue(findThrowableWithMessage(e, "Could not
allocate enough slots").isPresent());
+ }
+ }
+
+ private void setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode
scheduleMode) throws Exception {
+ final JobVertex vertex = new JobVertex("Test Vertex");
+ vertex.setParallelism(2);
+ vertex.setMaxParallelism(2);
+ vertex.setInvokableClass(BlockingNoOpInvokable.class);
+
+ final JobGraph jobGraph = new JobGraph("Test Job", vertex);
+ jobGraph.setScheduleMode(scheduleMode);
+
+ runHandleJobsWhenNotEnoughSlots(jobGraph);
+ }
+
+ private void runHandleJobsWhenNotEnoughSlots(final JobGraph jobGraph)
throws Exception {
final Configuration configuration = getDefaultConfiguration();
configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT,
100L);
@@ -114,24 +149,7 @@ public void testHandleJobsWhenNotEnoughSlot() throws
Exception {
try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
miniCluster.start();
- final JobVertex vertex = new JobVertex("Test Vertex");
- vertex.setParallelism(2);
- vertex.setMaxParallelism(2);
- vertex.setInvokableClass(BlockingNoOpInvokable.class);
-
- final JobGraph jobGraph = new JobGraph("Test Job",
vertex);
- jobGraph.setScheduleMode(ScheduleMode.EAGER);
-
- try {
- miniCluster.executeJobBlocking(jobGraph);
-
- fail("Job should fail.");
- } catch (JobExecutionException e) {
- assertTrue(findThrowableWithMessage(e, "Job
execution failed.").isPresent());
-
- assertTrue(findThrowable(e,
NoResourceAvailableException.class).isPresent());
- assertTrue(findThrowableWithMessage(e, "Slots
required: 2, slots allocated: 1").isPresent());
- }
+ miniCluster.executeJobBlocking(jobGraph);
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services