nicoweidner commented on a change in pull request #17323:
URL: https://github.com/apache/flink/pull/17323#discussion_r712890135
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -843,28 +843,36 @@ protected void onFatalError(Throwable throwable) {
protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo
executionGraphInfo) {
final ArchivedExecutionGraph archivedExecutionGraph =
executionGraphInfo.getArchivedExecutionGraph();
+ final JobStatus terminalJobStatus = archivedExecutionGraph.getState();
Preconditions.checkArgument(
- archivedExecutionGraph.getState().isTerminalState(),
+ terminalJobStatus.isTerminalState(),
"Job %s is in state %s which is not terminal.",
archivedExecutionGraph.getJobID(),
- archivedExecutionGraph.getState());
+ terminalJobStatus);
- if (archivedExecutionGraph.getFailureInfo() != null) {
+ // the failureInfo contains the reason for why job was
failed/suspended, but for
+ // finished/canceled jobs it may contain the last cause of a restart
(if there were any)
+ // for finished/canceled jobs we don't want to print it because it is
misleading
+ final boolean isFailureInfoRelatedToJobTermination =
+ terminalJobStatus == JobStatus.SUSPENDED || terminalJobStatus
== JobStatus.FAILED;
Review comment:
Can the user set job status `SUSPENDED` manually? Then some more
detailed treatment would be necessary.
Just asking because of the (probably different) meaning of status in
Ververica Platform.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]