This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3795a71680be939912101361f2d3bc192b973991
Author: Till Rohrmann <[email protected]>
AuthorDate: Fri Apr 23 16:12:17 2021 +0200

    [hotfix] Add debug logging to the states of the AdaptiveScheduler
    
    This commit adds debug log statements to the states of the 
AdaptiveScheduler to log
    whenever we ignore a global failure.
---
 .../runtime/scheduler/adaptive/AdaptiveScheduler.java    | 16 +++++++++++++---
 .../flink/runtime/scheduler/adaptive/Canceling.java      |  6 +++++-
 .../apache/flink/runtime/scheduler/adaptive/Failing.java |  6 +++++-
 .../flink/runtime/scheduler/adaptive/Finished.java       |  7 ++++++-
 .../flink/runtime/scheduler/adaptive/Restarting.java     |  6 +++++-
 .../scheduler/adaptive/StateWithExecutionGraph.java      |  4 ++++
 6 files changed, 38 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index c7da630..9a1a252 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -1035,14 +1035,24 @@ public class AdaptiveScheduler
 
     @Override
     public void onFinished(ArchivedExecutionGraph archivedExecutionGraph) {
+
+        @Nullable
+        final Throwable optionalFailure =
+                archivedExecutionGraph.getFailureInfo() != null
+                        ? 
archivedExecutionGraph.getFailureInfo().getException()
+                        : null;
+        LOG.info(
+                "Job {} reached terminal state {}.",
+                archivedExecutionGraph.getJobID(),
+                archivedExecutionGraph.getState(),
+                optionalFailure);
+
         if (jobStatusListener != null) {
             jobStatusListener.jobStatusChanges(
                     jobInformation.getJobID(),
                     archivedExecutionGraph.getState(),
                     
archivedExecutionGraph.getStatusTimestamp(archivedExecutionGraph.getState()),
-                    archivedExecutionGraph.getFailureInfo() != null
-                            ? 
archivedExecutionGraph.getFailureInfo().getException()
-                            : null);
+                    optionalFailure);
         }
 
         jobTerminationFuture.complete(archivedExecutionGraph.getState());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java
index 54ddabc..f5d82a1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java
@@ -56,7 +56,11 @@ class Canceling extends StateWithExecutionGraph {
 
     @Override
     public void handleGlobalFailure(Throwable cause) {
-        // ignore global failures
+        getLogger()
+                .debug(
+                        "Ignored global failure because we are already 
canceling the job {}.",
+                        getJobId(),
+                        cause);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java
index dc44c3c..c836161 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java
@@ -58,7 +58,11 @@ class Failing extends StateWithExecutionGraph {
 
     @Override
     public void handleGlobalFailure(Throwable cause) {
-        // nothing to do since we are already failing
+        getLogger()
+                .debug(
+                        "Ignored global failure because we are already failing 
the job {}.",
+                        getJobId(),
+                        cause);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java
index 7a7c9f2..f36a98b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java
@@ -54,7 +54,12 @@ class Finished implements State {
     }
 
     @Override
-    public void handleGlobalFailure(Throwable cause) {}
+    public void handleGlobalFailure(Throwable cause) {
+        logger.debug(
+                "Ignore global failure because we already finished the job 
{}.",
+                archivedExecutionGraph.getJobID(),
+                cause);
+    }
 
     @Override
     public Logger getLogger() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
index ffd7eb6..ed5146a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java
@@ -77,7 +77,11 @@ class Restarting extends StateWithExecutionGraph {
 
     @Override
     public void handleGlobalFailure(Throwable cause) {
-        // don't do anything
+        getLogger()
+                .debug(
+                        "Ignored global failure because we are already 
restarting the job {}.",
+                        getJobId(),
+                        cause);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
index a85caac..9962c78 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
@@ -112,6 +112,10 @@ abstract class StateWithExecutionGraph implements State {
         return executionGraph;
     }
 
+    JobID getJobId() {
+        return executionGraph.getJobID();
+    }
+
     protected OperatorCoordinatorHandler getOperatorCoordinatorHandler() {
         return operatorCoordinatorHandler;
     }

Reply via email to