This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 72fc287e06a3bc124d850df9c53bdea3b3c13393
Author: Till Rohrmann <[email protected]>
AuthorDate: Fri Apr 23 16:00:56 2021 +0200

    [FLINK-22431] Add information when and why the AdaptiveScheduler restarts 
or fails jobs
    
    This commit adds info log statements to tell the user when and why it 
restarts or fails a job.
    
    This closes #15736.
---
 .../runtime/scheduler/adaptive/AdaptiveScheduler.java    |  2 +-
 .../flink/runtime/scheduler/adaptive/Executing.java      | 16 ++++++++--------
 .../runtime/scheduler/adaptive/StopWithSavepoint.java    |  2 ++
 .../flink/runtime/scheduler/adaptive/ExecutingTest.java  |  5 +++--
 .../scheduler/adaptive/StopWithSavepointTest.java        |  6 +++---
 5 files changed, 17 insertions(+), 14 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index bad1cc9..c7da630 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -1058,7 +1058,7 @@ public class AdaptiveScheduler
         restartBackoffTimeStrategy.notifyFailure(failure);
         if (restartBackoffTimeStrategy.canRestart()) {
             return Executing.FailureResult.canRestart(
-                    
Duration.ofMillis(restartBackoffTimeStrategy.getBackoffTime()));
+                    failure, 
Duration.ofMillis(restartBackoffTimeStrategy.getBackoffTime()));
         } else {
             return Executing.FailureResult.canNotRestart(
                     new JobException(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
index dcb0366..83beda3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
@@ -85,12 +85,14 @@ class Executing extends StateWithExecutionGraph implements 
ResourceConsumer {
         final FailureResult failureResult = context.howToHandleFailure(cause);
 
         if (failureResult.canRestart()) {
+            getLogger().info("Restarting job.", 
failureResult.getFailureCause());
             context.goToRestarting(
                     getExecutionGraph(),
                     getExecutionGraphHandler(),
                     getOperatorCoordinatorHandler(),
                     failureResult.getBackoffTime());
         } else {
+            getLogger().info("Failing job.", failureResult.getFailureCause());
             context.goToFailing(
                     getExecutionGraph(),
                     getExecutionGraphHandler(),
@@ -281,9 +283,9 @@ class Executing extends StateWithExecutionGraph implements 
ResourceConsumer {
     static final class FailureResult {
         @Nullable private final Duration backoffTime;
 
-        @Nullable private final Throwable failureCause;
+        private final Throwable failureCause;
 
-        private FailureResult(@Nullable Duration backoffTime, @Nullable 
Throwable failureCause) {
+        private FailureResult(Throwable failureCause, @Nullable Duration 
backoffTime) {
             this.backoffTime = backoffTime;
             this.failureCause = failureCause;
         }
@@ -299,20 +301,18 @@ class Executing extends StateWithExecutionGraph 
implements ResourceConsumer {
         }
 
         Throwable getFailureCause() {
-            Preconditions.checkState(
-                    failureCause != null,
-                    "Failure result must not be restartable to return a 
failure cause.");
             return failureCause;
         }
 
         /**
          * Creates a FailureResult which allows to restart the job.
          *
+         * @param failureCause failureCause for restarting the job
          * @param backoffTime backoffTime to wait before restarting the job
          * @return FailureResult which allows to restart the job
          */
-        static FailureResult canRestart(Duration backoffTime) {
-            return new FailureResult(backoffTime, null);
+        static FailureResult canRestart(Throwable failureCause, Duration 
backoffTime) {
+            return new FailureResult(failureCause, backoffTime);
         }
 
         /**
@@ -322,7 +322,7 @@ class Executing extends StateWithExecutionGraph implements 
ResourceConsumer {
          * @return FailureResult which does not allow to restart the job
          */
         static FailureResult canNotRestart(Throwable failureCause) {
-            return new FailureResult(null, failureCause);
+            return new FailureResult(failureCause, null);
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
index dcde71c..ab4f937 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java
@@ -179,12 +179,14 @@ class StopWithSavepoint extends StateWithExecutionGraph {
         final Executing.FailureResult failureResult = 
context.howToHandleFailure(cause);
 
         if (failureResult.canRestart()) {
+            getLogger().info("Restarting job.", 
failureResult.getFailureCause());
             context.goToRestarting(
                     getExecutionGraph(),
                     getExecutionGraphHandler(),
                     getOperatorCoordinatorHandler(),
                     failureResult.getBackoffTime());
         } else {
+            getLogger().info("Failing job.", failureResult.getFailureCause());
             context.goToFailing(
                     getExecutionGraph(),
                     getExecutionGraphHandler(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
index df5f233..2852bda 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
@@ -140,7 +140,7 @@ public class ExecutingTest extends TestLogger {
             ctx.setExpectRestarting(
                     (restartingArguments ->
                             assertThat(restartingArguments.getBackoffTime(), 
is(duration))));
-            ctx.setHowToHandleFailure((t) -> 
Executing.FailureResult.canRestart(duration));
+            ctx.setHowToHandleFailure((t) -> 
Executing.FailureResult.canRestart(t, duration));
             exec.handleGlobalFailure(new RuntimeException("Recoverable 
error"));
         }
     }
@@ -234,7 +234,8 @@ public class ExecutingTest extends TestLogger {
                     new ExecutingStateBuilder()
                             
.setExecutionGraph(returnsFailedStateExecutionGraph)
                             .build(ctx);
-            ctx.setHowToHandleFailure((ign) -> 
Executing.FailureResult.canRestart(Duration.ZERO));
+            ctx.setHowToHandleFailure(
+                    (throwable) -> 
Executing.FailureResult.canRestart(throwable, Duration.ZERO));
             ctx.setExpectRestarting(assertNonNull());
 
             exec.updateTaskExecutionState(createFailingStateTransition());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
index e0383fa..e9a1157 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java
@@ -174,7 +174,7 @@ public class StopWithSavepointTest extends TestLogger {
             StopWithSavepoint sws = createStopWithSavepoint(ctx);
             ctx.setStopWithSavepoint(sws);
             ctx.setHowToHandleFailure(
-                    (ignore) -> 
Executing.FailureResult.canRestart(Duration.ZERO));
+                    (throwable) -> 
Executing.FailureResult.canRestart(throwable, Duration.ZERO));
 
             ctx.setExpectRestarting(assertNonNull());
 
@@ -229,7 +229,7 @@ public class StopWithSavepointTest extends TestLogger {
                     createStopWithSavepoint(ctx, new 
StateTrackingMockExecutionGraph());
             ctx.setStopWithSavepoint(sws);
             ctx.setHowToHandleFailure(
-                    (ignore) -> 
Executing.FailureResult.canRestart(Duration.ZERO));
+                    (throwable) -> 
Executing.FailureResult.canRestart(throwable, Duration.ZERO));
 
             ctx.setExpectRestarting(assertNonNull());
 
@@ -277,7 +277,7 @@ public class StopWithSavepointTest extends TestLogger {
             ctx.setStopWithSavepoint(sws);
 
             ctx.setHowToHandleFailure(
-                    (ignore) -> 
Executing.FailureResult.canRestart(Duration.ZERO));
+                    (throwable) -> 
Executing.FailureResult.canRestart(throwable, Duration.ZERO));
 
             ctx.setExpectRestarting(assertNonNull());
 

Reply via email to