XComp commented on a change in pull request #18189:
URL: https://github.com/apache/flink/pull/18189#discussion_r792153729



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java
##########
@@ -52,22 +69,70 @@ public DispatcherLeaderProcessFactory createFactory(
 
         try {
             jobGraph =
-                    jobGraphRetriever.retrieveJobGraph(
-                            partialDispatcherServices.getConfiguration());
+                    Preconditions.checkNotNull(
+                            jobGraphRetriever.retrieveJobGraph(
+                                    
partialDispatcherServices.getConfiguration()));
         } catch (FlinkException e) {
             throw new FlinkRuntimeException("Could not retrieve the 
JobGraph.", e);
         }
 
+        final JobResultStore jobResultStore = 
jobPersistenceComponentFactory.createJobResultStore();
+        final Collection<JobResult> recoveredDirtyJobResults = 
getDirtyJobResults(jobResultStore);
+
+        final Optional<JobResult> recoveredDirtyJobResultOptional =
+                extractDirtyJobResult(recoveredDirtyJobResults, jobGraph);
+        final Optional<JobGraph> jobGraphOptional =
+                getJobGraphBasedOnDirtyJobResults(jobGraph, 
recoveredDirtyJobResults);
+
         final DefaultDispatcherGatewayServiceFactory 
defaultDispatcherServiceFactory =
                 new DefaultDispatcherGatewayServiceFactory(
                         JobDispatcherFactory.INSTANCE, rpcService, 
partialDispatcherServices);
 
         return new JobDispatcherLeaderProcessFactory(
-                defaultDispatcherServiceFactory, jobGraph, fatalErrorHandler);
+                defaultDispatcherServiceFactory,
+                jobGraphOptional.orElse(null),
+                recoveredDirtyJobResultOptional.orElse(null),
+                jobResultStore,
+                fatalErrorHandler);
     }
 
     public static JobDispatcherLeaderProcessFactoryFactory create(
             JobGraphRetriever jobGraphRetriever) {
         return new JobDispatcherLeaderProcessFactoryFactory(jobGraphRetriever);
     }
+
+    private static Collection<JobResult> getDirtyJobResults(JobResultStore 
jobResultStore) {
+        try {
+            return jobResultStore.getDirtyResults();
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(
+                    "Could not retrieve the JobResults of dirty jobs from the 
underlying JobResultStore.",
+                    e);
+        }
+    }
+
+    private static Optional<JobResult> extractDirtyJobResult(
+            Collection<JobResult> dirtyJobResults, JobGraph jobGraph) {
+        Optional<JobResult> actualDirtyJobResult = Optional.empty();
+        for (JobResult dirtyJobResult : dirtyJobResults) {
+            if (dirtyJobResult.getJobId().equals(jobGraph.getJobID())) {
+                actualDirtyJobResult = Optional.of(dirtyJobResult);
+            } else {
+                LOG.warn(
+                        "Unexpected dirty JobResultStore entry: Job '{}' is 
listed as dirty, isn't part of this single-job cluster, though.",

Review comment:
       I would leave it like that to have it being more robust to modifications 
done by the user in a file-based setup...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to