This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 82118dd  [FLINK-24340] Only print exception on job failure/suspension
82118dd is described below

commit 82118dd297d0dbd51a3527a754662c55123d3d7d
Author: Chesnay Schepler <[email protected]>
AuthorDate: Tue Sep 21 15:34:48 2021 +0200

    [FLINK-24340] Only print exception on job failure/suspension
---
 .../apache/flink/runtime/dispatcher/Dispatcher.java  | 20 ++++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index b6eb1db..feafa7d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -822,28 +822,36 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
     protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo 
executionGraphInfo) {
         final ArchivedExecutionGraph archivedExecutionGraph =
                 executionGraphInfo.getArchivedExecutionGraph();
+        final JobStatus terminalJobStatus = archivedExecutionGraph.getState();
         Preconditions.checkArgument(
-                archivedExecutionGraph.getState().isTerminalState(),
+                terminalJobStatus.isTerminalState(),
                 "Job %s is in state %s which is not terminal.",
                 archivedExecutionGraph.getJobID(),
-                archivedExecutionGraph.getState());
+                terminalJobStatus);
 
-        if (archivedExecutionGraph.getFailureInfo() != null) {
+        // the failureInfo contains the reason for why job was 
failed/suspended, but for
+        // finished/canceled jobs it may contain the last cause of a restart 
(if there were any)
+        // for finished/canceled jobs we don't want to print it because it is 
misleading
+        final boolean isFailureInfoRelatedToJobTermination =
+                terminalJobStatus == JobStatus.SUSPENDED || terminalJobStatus 
== JobStatus.FAILED;
+
+        if (archivedExecutionGraph.getFailureInfo() != null
+                && isFailureInfoRelatedToJobTermination) {
             log.info(
                     "Job {} reached terminal state {}.\n{}",
                     archivedExecutionGraph.getJobID(),
-                    archivedExecutionGraph.getState(),
+                    terminalJobStatus,
                     
archivedExecutionGraph.getFailureInfo().getExceptionAsString().trim());
         } else {
             log.info(
                     "Job {} reached terminal state {}.",
                     archivedExecutionGraph.getJobID(),
-                    archivedExecutionGraph.getState());
+                    terminalJobStatus);
         }
 
         archiveExecutionGraph(executionGraphInfo);
 
-        return archivedExecutionGraph.getState().isGloballyTerminalState()
+        return terminalJobStatus.isGloballyTerminalState()
                 ? CleanupJobState.GLOBAL
                 : CleanupJobState.LOCAL;
     }

Reply via email to