zentol commented on a change in pull request #17323:
URL: https://github.com/apache/flink/pull/17323#discussion_r712904614
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -843,28 +843,36 @@ protected void onFatalError(Throwable throwable) {
protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo
executionGraphInfo) {
final ArchivedExecutionGraph archivedExecutionGraph =
executionGraphInfo.getArchivedExecutionGraph();
+ final JobStatus terminalJobStatus = archivedExecutionGraph.getState();
Preconditions.checkArgument(
- archivedExecutionGraph.getState().isTerminalState(),
+ terminalJobStatus.isTerminalState(),
"Job %s is in state %s which is not terminal.",
archivedExecutionGraph.getJobID(),
- archivedExecutionGraph.getState());
+ terminalJobStatus);
- if (archivedExecutionGraph.getFailureInfo() != null) {
+ // the failureInfo contains the reason for why job was
failed/suspended, but for
+ // finished/canceled jobs it may contain the last cause of a restart
(if there were any)
+ // for finished/canceled jobs we don't want to print it because it is
misleading
+ final boolean isFailureInfoRelatedToJobTermination =
+ terminalJobStatus == JobStatus.SUSPENDED || terminalJobStatus
== JobStatus.FAILED;
Review comment:
Usually, no. Suspension in Flink occurs if the JM that currently runs
the job lost leadership (e.g., because ZK went down).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]