This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3795a71680be939912101361f2d3bc192b973991 Author: Till Rohrmann <[email protected]> AuthorDate: Fri Apr 23 16:12:17 2021 +0200 [hotfix] Add debug logging to the states of the AdaptiveScheduler This commit adds debug log statements to the states of the AdaptiveScheduler to log whenever we ignore a global failure. --- .../runtime/scheduler/adaptive/AdaptiveScheduler.java | 16 +++++++++++++--- .../flink/runtime/scheduler/adaptive/Canceling.java | 6 +++++- .../apache/flink/runtime/scheduler/adaptive/Failing.java | 6 +++++- .../flink/runtime/scheduler/adaptive/Finished.java | 7 ++++++- .../flink/runtime/scheduler/adaptive/Restarting.java | 6 +++++- .../scheduler/adaptive/StateWithExecutionGraph.java | 4 ++++ 6 files changed, 38 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index c7da630..9a1a252 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -1035,14 +1035,24 @@ public class AdaptiveScheduler @Override public void onFinished(ArchivedExecutionGraph archivedExecutionGraph) { + + @Nullable + final Throwable optionalFailure = + archivedExecutionGraph.getFailureInfo() != null + ? archivedExecutionGraph.getFailureInfo().getException() + : null; + LOG.info( + "Job {} reached terminal state {}.", + archivedExecutionGraph.getJobID(), + archivedExecutionGraph.getState(), + optionalFailure); + if (jobStatusListener != null) { jobStatusListener.jobStatusChanges( jobInformation.getJobID(), archivedExecutionGraph.getState(), archivedExecutionGraph.getStatusTimestamp(archivedExecutionGraph.getState()), - archivedExecutionGraph.getFailureInfo() != null - ? archivedExecutionGraph.getFailureInfo().getException() - : null); + optionalFailure); } jobTerminationFuture.complete(archivedExecutionGraph.getState()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java index 54ddabc..f5d82a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java @@ -56,7 +56,11 @@ class Canceling extends StateWithExecutionGraph { @Override public void handleGlobalFailure(Throwable cause) { - // ignore global failures + getLogger() + .debug( + "Ignored global failure because we are already canceling the job {}.", + getJobId(), + cause); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java index dc44c3c..c836161 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java @@ -58,7 +58,11 @@ class Failing extends StateWithExecutionGraph { @Override public void handleGlobalFailure(Throwable cause) { - // nothing to do since we are already failing + getLogger() + .debug( + "Ignored global failure because we are already failing the job {}.", + getJobId(), + cause); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java index 7a7c9f2..f36a98b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java @@ -54,7 +54,12 @@ class Finished implements State { } @Override - public void handleGlobalFailure(Throwable cause) {} + public void handleGlobalFailure(Throwable cause) { + logger.debug( + "Ignore global failure because we already finished the job {}.", + archivedExecutionGraph.getJobID(), + cause); + } @Override public Logger getLogger() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java index ffd7eb6..ed5146a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java @@ -77,7 +77,11 @@ class Restarting extends StateWithExecutionGraph { @Override public void handleGlobalFailure(Throwable cause) { - // don't do anything + getLogger() + .debug( + "Ignored global failure because we are already restarting the job {}.", + getJobId(), + cause); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java index a85caac..9962c78 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java @@ -112,6 +112,10 @@ abstract class StateWithExecutionGraph implements State { return executionGraph; } + JobID getJobId() { + return executionGraph.getJobID(); + } + protected OperatorCoordinatorHandler getOperatorCoordinatorHandler() { return operatorCoordinatorHandler; }
