This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 40b0a31 [FLINK-24340] Only print exception on job failure/suspension
40b0a31 is described below
commit 40b0a317a17da3f1cd0b06a421edb6a050c63911
Author: Chesnay Schepler <[email protected]>
AuthorDate: Tue Sep 21 15:34:48 2021 +0200
[FLINK-24340] Only print exception on job failure/suspension
---
.../apache/flink/runtime/dispatcher/Dispatcher.java | 20 ++++++++++++++------
1 file changed, 14 insertions(+), 6 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index e1bb919..e08fe58 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -843,28 +843,36 @@ public abstract class Dispatcher extends
PermanentlyFencedRpcEndpoint<Dispatcher
protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo
executionGraphInfo) {
final ArchivedExecutionGraph archivedExecutionGraph =
executionGraphInfo.getArchivedExecutionGraph();
+ final JobStatus terminalJobStatus = archivedExecutionGraph.getState();
Preconditions.checkArgument(
- archivedExecutionGraph.getState().isTerminalState(),
+ terminalJobStatus.isTerminalState(),
"Job %s is in state %s which is not terminal.",
archivedExecutionGraph.getJobID(),
- archivedExecutionGraph.getState());
+ terminalJobStatus);
- if (archivedExecutionGraph.getFailureInfo() != null) {
+ // the failureInfo contains the reason for why job was
failed/suspended, but for
+ // finished/canceled jobs it may contain the last cause of a restart
(if there were any)
+ // for finished/canceled jobs we don't want to print it because it is
misleading
+ final boolean isFailureInfoRelatedToJobTermination =
+ terminalJobStatus == JobStatus.SUSPENDED || terminalJobStatus
== JobStatus.FAILED;
+
+ if (archivedExecutionGraph.getFailureInfo() != null
+ && isFailureInfoRelatedToJobTermination) {
log.info(
"Job {} reached terminal state {}.\n{}",
archivedExecutionGraph.getJobID(),
- archivedExecutionGraph.getState(),
+ terminalJobStatus,
archivedExecutionGraph.getFailureInfo().getExceptionAsString().trim());
} else {
log.info(
"Job {} reached terminal state {}.",
archivedExecutionGraph.getJobID(),
- archivedExecutionGraph.getState());
+ terminalJobStatus);
}
archiveExecutionGraph(executionGraphInfo);
- return archivedExecutionGraph.getState().isGloballyTerminalState()
+ return terminalJobStatus.isGloballyTerminalState()
? CleanupJobState.GLOBAL
: CleanupJobState.LOCAL;
}