This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b1965eeb854f16f385329a546d47d684e2891d70 Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Thu Nov 15 17:27:22 2018 +0100 [FLINK-10883] Failing batch jobs with NoResourceAvailableException when slot request times out Instead of failing the ExecutionGraph with a generic TimeoutException if a slot request times out, this commit changes the exception to a more meaningful NoResourceAvailableException. --- .../flink/runtime/executiongraph/Execution.java | 14 +++++- .../runtime/minicluster/MiniClusterITCase.java | 56 ++++++++++++++-------- 2 files changed, 50 insertions(+), 20 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index cbf5103..e3b501e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; @@ -427,7 +428,18 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution deploymentFuture.whenComplete( (Void ignored, Throwable failure) -> { if (failure != null) { - markFailed(ExceptionUtils.stripCompletionException(failure)); + final Throwable stripCompletionException = ExceptionUtils.stripCompletionException(failure); + final Throwable schedulingFailureCause; + + if (stripCompletionException instanceof TimeoutException) { + schedulingFailureCause = new NoResourceAvailableException( + "Could not allocate enough slots within timeout of " + allocationTimeout + " to run the job. " + + "Please make sure that the cluster has enough resources."); + } else { + schedulingFailureCause = stripCompletionException; + } + + markFailed(schedulingFailureCause); } }); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java index c108eae..585e1ba 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java @@ -101,7 +101,42 @@ public class MiniClusterITCase extends TestLogger { } @Test - public void testHandleJobsWhenNotEnoughSlot() throws Exception { + public void testHandleStreamingJobsWhenNotEnoughSlot() throws Exception { + try { + setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode.EAGER); + fail("Job should fail."); + } catch (JobExecutionException e) { + assertTrue(findThrowableWithMessage(e, "Job execution failed.").isPresent()); + assertTrue(findThrowable(e, NoResourceAvailableException.class).isPresent()); + assertTrue(findThrowableWithMessage(e, "Slots required: 2, slots allocated: 1").isPresent()); + } + } + + @Test + public void testHandleBatchJobsWhenNotEnoughSlot() throws Exception { + try { + setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode.LAZY_FROM_SOURCES); + fail("Job should fail."); + } catch (JobExecutionException e) { + assertTrue(findThrowableWithMessage(e, "Job execution failed.").isPresent()); + assertTrue(findThrowable(e, NoResourceAvailableException.class).isPresent()); + assertTrue(findThrowableWithMessage(e, "Could not allocate enough slots").isPresent()); + } + } + + private void setupAndRunHandleJobsWhenNotEnoughSlots(ScheduleMode scheduleMode) throws Exception { + final JobVertex vertex = new JobVertex("Test Vertex"); + vertex.setParallelism(2); + vertex.setMaxParallelism(2); + vertex.setInvokableClass(BlockingNoOpInvokable.class); + + final JobGraph jobGraph = new JobGraph("Test Job", vertex); + jobGraph.setScheduleMode(scheduleMode); + + runHandleJobsWhenNotEnoughSlots(jobGraph); + } + + private void runHandleJobsWhenNotEnoughSlots(final JobGraph jobGraph) throws Exception { final Configuration configuration = getDefaultConfiguration(); configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 100L); @@ -114,24 +149,7 @@ public class MiniClusterITCase extends TestLogger { try (final MiniCluster miniCluster = new MiniCluster(cfg)) { miniCluster.start(); - final JobVertex vertex = new JobVertex("Test Vertex"); - vertex.setParallelism(2); - vertex.setMaxParallelism(2); - vertex.setInvokableClass(BlockingNoOpInvokable.class); - - final JobGraph jobGraph = new JobGraph("Test Job", vertex); - jobGraph.setScheduleMode(ScheduleMode.EAGER); - - try { - miniCluster.executeJobBlocking(jobGraph); - - fail("Job should fail."); - } catch (JobExecutionException e) { - assertTrue(findThrowableWithMessage(e, "Job execution failed.").isPresent()); - - assertTrue(findThrowable(e, NoResourceAvailableException.class).isPresent()); - assertTrue(findThrowableWithMessage(e, "Slots required: 2, slots allocated: 1").isPresent()); - } + miniCluster.executeJobBlocking(jobGraph); } }