This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new b552505c513 [FLINK-29223][coordination] Add missing output info for 
jobs already reached terminal state
b552505c513 is described below

commit b552505c513795e14d6318e2b47167b75449c0af
Author: snuyanzin <[email protected]>
AuthorDate: Thu Sep 8 09:24:39 2022 +0200

    [FLINK-29223][coordination] Add missing output info for jobs already 
reached terminal state
---
 .../runner/JobDispatcherLeaderProcessFactoryFactory.java       | 10 +++++++---
 .../dispatcher/runner/SessionDispatcherLeaderProcess.java      |  4 ++++
 2 files changed, 11 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
index 2ebc3bce8fc..8d38825bc58 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
@@ -131,8 +131,12 @@ public class JobDispatcherLeaderProcessFactoryFactory
             JobGraph jobGraph, Collection<JobResult> dirtyJobResults) {
         final Set<JobID> jobIdsOfFinishedJobs =
                 
dirtyJobResults.stream().map(JobResult::getJobId).collect(Collectors.toSet());
-        return jobIdsOfFinishedJobs.contains(jobGraph.getJobID())
-                ? Optional.empty()
-                : Optional.of(jobGraph);
+        if (jobIdsOfFinishedJobs.contains(jobGraph.getJobID())) {
+            LOG.info(
+                    "Skipping recovery of a job with job id {}, because it 
already reached a globally terminal state",
+                    jobGraph.getJobID());
+            return Optional.empty();
+        }
+        return Optional.of(jobGraph);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
index 63187002acc..5b74536781d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
@@ -148,6 +148,10 @@ public class SessionDispatcherLeaderProcess extends 
AbstractDispatcherLeaderProc
         for (JobID jobId : jobIds) {
             if (!recoveredDirtyJobResults.contains(jobId)) {
                 tryRecoverJob(jobId).ifPresent(recoveredJobGraphs::add);
+            } else {
+                log.info(
+                        "Skipping recovery of a job with job id {}, because it 
already reached a globally terminal state",
+                        jobId);
             }
         }
 

Reply via email to