This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 82118dd [FLINK-24340] Only print exception on job failure/suspension
82118dd is described below
commit 82118dd297d0dbd51a3527a754662c55123d3d7d
Author: Chesnay Schepler <[email protected]>
AuthorDate: Tue Sep 21 15:34:48 2021 +0200
[FLINK-24340] Only print exception on job failure/suspension
---
.../apache/flink/runtime/dispatcher/Dispatcher.java | 20 ++++++++++++++------
1 file changed, 14 insertions(+), 6 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index b6eb1db..feafa7d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -822,28 +822,36 @@ public abstract class Dispatcher extends
PermanentlyFencedRpcEndpoint<Dispatcher
protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo
executionGraphInfo) {
final ArchivedExecutionGraph archivedExecutionGraph =
executionGraphInfo.getArchivedExecutionGraph();
+ final JobStatus terminalJobStatus = archivedExecutionGraph.getState();
Preconditions.checkArgument(
- archivedExecutionGraph.getState().isTerminalState(),
+ terminalJobStatus.isTerminalState(),
"Job %s is in state %s which is not terminal.",
archivedExecutionGraph.getJobID(),
- archivedExecutionGraph.getState());
+ terminalJobStatus);
- if (archivedExecutionGraph.getFailureInfo() != null) {
+ // the failureInfo contains the reason for why job was
failed/suspended, but for
+ // finished/canceled jobs it may contain the last cause of a restart
(if there were any)
+ // for finished/canceled jobs we don't want to print it because it is
misleading
+ final boolean isFailureInfoRelatedToJobTermination =
+ terminalJobStatus == JobStatus.SUSPENDED || terminalJobStatus
== JobStatus.FAILED;
+
+ if (archivedExecutionGraph.getFailureInfo() != null
+ && isFailureInfoRelatedToJobTermination) {
log.info(
"Job {} reached terminal state {}.\n{}",
archivedExecutionGraph.getJobID(),
- archivedExecutionGraph.getState(),
+ terminalJobStatus,
archivedExecutionGraph.getFailureInfo().getExceptionAsString().trim());
} else {
log.info(
"Job {} reached terminal state {}.",
archivedExecutionGraph.getJobID(),
- archivedExecutionGraph.getState());
+ terminalJobStatus);
}
archiveExecutionGraph(executionGraphInfo);
- return archivedExecutionGraph.getState().isGloballyTerminalState()
+ return terminalJobStatus.isGloballyTerminalState()
? CleanupJobState.GLOBAL
: CleanupJobState.LOCAL;
}