This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 72fc287e06a3bc124d850df9c53bdea3b3c13393 Author: Till Rohrmann <[email protected]> AuthorDate: Fri Apr 23 16:00:56 2021 +0200 [FLINK-22431] Add information when and why the AdaptiveScheduler restarts or fails jobs This commit adds info log statements to tell the user when and why it restarts or fails a job. This closes #15736. --- .../runtime/scheduler/adaptive/AdaptiveScheduler.java | 2 +- .../flink/runtime/scheduler/adaptive/Executing.java | 16 ++++++++-------- .../runtime/scheduler/adaptive/StopWithSavepoint.java | 2 ++ .../flink/runtime/scheduler/adaptive/ExecutingTest.java | 5 +++-- .../scheduler/adaptive/StopWithSavepointTest.java | 6 +++--- 5 files changed, 17 insertions(+), 14 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index bad1cc9..c7da630 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -1058,7 +1058,7 @@ public class AdaptiveScheduler restartBackoffTimeStrategy.notifyFailure(failure); if (restartBackoffTimeStrategy.canRestart()) { return Executing.FailureResult.canRestart( - Duration.ofMillis(restartBackoffTimeStrategy.getBackoffTime())); + failure, Duration.ofMillis(restartBackoffTimeStrategy.getBackoffTime())); } else { return Executing.FailureResult.canNotRestart( new JobException( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java index dcb0366..83beda3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java @@ -85,12 +85,14 @@ class Executing extends StateWithExecutionGraph implements ResourceConsumer { final FailureResult failureResult = context.howToHandleFailure(cause); if (failureResult.canRestart()) { + getLogger().info("Restarting job.", failureResult.getFailureCause()); context.goToRestarting( getExecutionGraph(), getExecutionGraphHandler(), getOperatorCoordinatorHandler(), failureResult.getBackoffTime()); } else { + getLogger().info("Failing job.", failureResult.getFailureCause()); context.goToFailing( getExecutionGraph(), getExecutionGraphHandler(), @@ -281,9 +283,9 @@ class Executing extends StateWithExecutionGraph implements ResourceConsumer { static final class FailureResult { @Nullable private final Duration backoffTime; - @Nullable private final Throwable failureCause; + private final Throwable failureCause; - private FailureResult(@Nullable Duration backoffTime, @Nullable Throwable failureCause) { + private FailureResult(Throwable failureCause, @Nullable Duration backoffTime) { this.backoffTime = backoffTime; this.failureCause = failureCause; } @@ -299,20 +301,18 @@ class Executing extends StateWithExecutionGraph implements ResourceConsumer { } Throwable getFailureCause() { - Preconditions.checkState( - failureCause != null, - "Failure result must not be restartable to return a failure cause."); return failureCause; } /** * Creates a FailureResult which allows to restart the job. * + * @param failureCause failureCause for restarting the job * @param backoffTime backoffTime to wait before restarting the job * @return FailureResult which allows to restart the job */ - static FailureResult canRestart(Duration backoffTime) { - return new FailureResult(backoffTime, null); + static FailureResult canRestart(Throwable failureCause, Duration backoffTime) { + return new FailureResult(failureCause, backoffTime); } /** @@ -322,7 +322,7 @@ class Executing extends StateWithExecutionGraph implements ResourceConsumer { * @return FailureResult which does not allow to restart the job */ static FailureResult canNotRestart(Throwable failureCause) { - return new FailureResult(null, failureCause); + return new FailureResult(failureCause, null); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java index dcde71c..ab4f937 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java @@ -179,12 +179,14 @@ class StopWithSavepoint extends StateWithExecutionGraph { final Executing.FailureResult failureResult = context.howToHandleFailure(cause); if (failureResult.canRestart()) { + getLogger().info("Restarting job.", failureResult.getFailureCause()); context.goToRestarting( getExecutionGraph(), getExecutionGraphHandler(), getOperatorCoordinatorHandler(), failureResult.getBackoffTime()); } else { + getLogger().info("Failing job.", failureResult.getFailureCause()); context.goToFailing( getExecutionGraph(), getExecutionGraphHandler(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java index df5f233..2852bda 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java @@ -140,7 +140,7 @@ public class ExecutingTest extends TestLogger { ctx.setExpectRestarting( (restartingArguments -> assertThat(restartingArguments.getBackoffTime(), is(duration)))); - ctx.setHowToHandleFailure((t) -> Executing.FailureResult.canRestart(duration)); + ctx.setHowToHandleFailure((t) -> Executing.FailureResult.canRestart(t, duration)); exec.handleGlobalFailure(new RuntimeException("Recoverable error")); } } @@ -234,7 +234,8 @@ public class ExecutingTest extends TestLogger { new ExecutingStateBuilder() .setExecutionGraph(returnsFailedStateExecutionGraph) .build(ctx); - ctx.setHowToHandleFailure((ign) -> Executing.FailureResult.canRestart(Duration.ZERO)); + ctx.setHowToHandleFailure( + (throwable) -> Executing.FailureResult.canRestart(throwable, Duration.ZERO)); ctx.setExpectRestarting(assertNonNull()); exec.updateTaskExecutionState(createFailingStateTransition()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java index e0383fa..e9a1157 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java @@ -174,7 +174,7 @@ public class StopWithSavepointTest extends TestLogger { StopWithSavepoint sws = createStopWithSavepoint(ctx); ctx.setStopWithSavepoint(sws); ctx.setHowToHandleFailure( - (ignore) -> Executing.FailureResult.canRestart(Duration.ZERO)); + (throwable) -> Executing.FailureResult.canRestart(throwable, Duration.ZERO)); ctx.setExpectRestarting(assertNonNull()); @@ -229,7 +229,7 @@ public class StopWithSavepointTest extends TestLogger { createStopWithSavepoint(ctx, new StateTrackingMockExecutionGraph()); ctx.setStopWithSavepoint(sws); ctx.setHowToHandleFailure( - (ignore) -> Executing.FailureResult.canRestart(Duration.ZERO)); + (throwable) -> Executing.FailureResult.canRestart(throwable, Duration.ZERO)); ctx.setExpectRestarting(assertNonNull()); @@ -277,7 +277,7 @@ public class StopWithSavepointTest extends TestLogger { ctx.setStopWithSavepoint(sws); ctx.setHowToHandleFailure( - (ignore) -> Executing.FailureResult.canRestart(Duration.ZERO)); + (throwable) -> Executing.FailureResult.canRestart(throwable, Duration.ZERO)); ctx.setExpectRestarting(assertNonNull());
