XComp commented on a change in pull request #18189:
URL: https://github.com/apache/flink/pull/18189#discussion_r780310849



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -868,6 +867,14 @@ private void cleanUpRemainingJobData(JobID jobId, boolean 
jobGraphRemoved) {
         blobServer.cleanupJob(jobId, jobGraphRemoved);
     }
 
+    private void markJobAsClean(JobID jobId) {
+        try {
+            jobResultStore.markResultAsClean(jobId);
+        } catch (IOException e) {
+            log.warn("Could not properly mark job {} result as clean.", jobId, 
e);

Review comment:
       Yes, this would be addressed when introducing the retry mechanism

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
##########
@@ -92,34 +99,61 @@ private void startServices() {
         }
     }
 
-    private void createDispatcherIfRunning(Collection<JobGraph> jobGraphs) {
-        runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs));
+    private void createDispatcherIfRunning(
+            Collection<JobGraph> jobGraphs, Collection<JobResult> 
globallyTerminatedJobs) {
+        runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs, 
globallyTerminatedJobs));
     }
 
-    private void createDispatcher(Collection<JobGraph> jobGraphs) {
+    private void createDispatcher(
+            Collection<JobGraph> jobGraphs, Collection<JobResult> 
globallyTerminatedJobs) {
 
         final DispatcherGatewayService dispatcherService =
                 dispatcherGatewayServiceFactory.create(
-                        DispatcherId.fromUuid(getLeaderSessionId()), 
jobGraphs, jobGraphStore);
+                        DispatcherId.fromUuid(getLeaderSessionId()),
+                        jobGraphs,
+                        globallyTerminatedJobs,
+                        jobGraphStore,
+                        jobResultStore);
 
         completeDispatcherSetup(dispatcherService);
     }
 
-    private CompletableFuture<Collection<JobGraph>> recoverJobsAsync() {
-        return CompletableFuture.supplyAsync(this::recoverJobsIfRunning, 
ioExecutor);
+    private CompletableFuture<Void>
+            
createDispatcherBasedOnRecoveredJobGraphsAndGloballyTerminatedJobs() {
+        return CompletableFuture.supplyAsync(
+                        this::getGloballyCompletedJobResultsIfRunning, 
ioExecutor)
+                .thenCompose(
+                        globallyTerminatedJobs ->
+                                CompletableFuture.supplyAsync(
+                                                () ->
+                                                        
this.recoverJobsIfRunning(
+                                                                
globallyTerminatedJobs.stream()
+                                                                        
.map(JobResult::getJobId)
+                                                                        
.collect(
+                                                                               
 Collectors
+                                                                               
         .toSet())),
+                                                ioExecutor)
+                                        .thenAccept(
+                                                jobGraphs ->
+                                                        
createDispatcherIfRunning(
+                                                                jobGraphs, 
globallyTerminatedJobs))
+                                        .handle(this::onErrorIfRunning));

Review comment:
       Interesting, I didn't know about the `.thenAcceptBoth` method. That 
makes the code way easier to read 👍 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
##########
@@ -92,34 +99,61 @@ private void startServices() {
         }
     }
 
-    private void createDispatcherIfRunning(Collection<JobGraph> jobGraphs) {
-        runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs));
+    private void createDispatcherIfRunning(
+            Collection<JobGraph> jobGraphs, Collection<JobResult> 
globallyTerminatedJobs) {
+        runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs, 
globallyTerminatedJobs));
     }
 
-    private void createDispatcher(Collection<JobGraph> jobGraphs) {
+    private void createDispatcher(
+            Collection<JobGraph> jobGraphs, Collection<JobResult> 
globallyTerminatedJobs) {
 
         final DispatcherGatewayService dispatcherService =
                 dispatcherGatewayServiceFactory.create(
-                        DispatcherId.fromUuid(getLeaderSessionId()), 
jobGraphs, jobGraphStore);
+                        DispatcherId.fromUuid(getLeaderSessionId()),
+                        jobGraphs,
+                        globallyTerminatedJobs,
+                        jobGraphStore,
+                        jobResultStore);
 
         completeDispatcherSetup(dispatcherService);
     }
 
-    private CompletableFuture<Collection<JobGraph>> recoverJobsAsync() {
-        return CompletableFuture.supplyAsync(this::recoverJobsIfRunning, 
ioExecutor);
+    private CompletableFuture<Void>
+            
createDispatcherBasedOnRecoveredJobGraphsAndGloballyTerminatedJobs() {
+        return CompletableFuture.supplyAsync(
+                        this::getGloballyCompletedJobResultsIfRunning, 
ioExecutor)
+                .thenCompose(
+                        globallyTerminatedJobs ->
+                                CompletableFuture.supplyAsync(
+                                                () ->
+                                                        
this.recoverJobsIfRunning(
+                                                                
globallyTerminatedJobs.stream()
+                                                                        
.map(JobResult::getJobId)
+                                                                        
.collect(
+                                                                               
 Collectors
+                                                                               
         .toSet())),
+                                                ioExecutor)
+                                        .thenAccept(
+                                                jobGraphs ->
+                                                        
createDispatcherIfRunning(
+                                                                jobGraphs, 
globallyTerminatedJobs))
+                                        .handle(this::onErrorIfRunning));

Review comment:
       I'm wondering whether the `dirtyJobsFuture.thenApplyAsync` is necessary 
here. Wouldn't it be executed in the same thread pool as the previous 
`supplyAsync` since both are chained together?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
##########
@@ -130,11 +130,11 @@ public JobGraphStore getJobGraphStore() throws Exception {
     }
 
     @Override
-    public RunningJobsRegistry getRunningJobsRegistry() {
-        if (runningJobsRegistry == null) {
-            this.runningJobsRegistry = createRunningJobsRegistry();
+    public JobResultStore getJobResultStore() throws Exception {
+        if (jobResultStore == null) {

Review comment:
       Good point, that's some leftover from previous `RunningJobsRegistry` 
where I didn't focus enough on cleaning it up

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
##########
@@ -244,11 +244,13 @@ public void cleanupJobData(JobID jobID) throws Exception {
     protected abstract JobGraphStore createJobGraphStore() throws Exception;
 
     /**
-     * Create the registry that holds information about whether jobs are 
currently running.
+     * Create the store that holds completed job results.
      *
-     * @return Running job registry to retrieve running jobs
+     * @return Job result store to retrieve completed jobs
      */
-    protected abstract RunningJobsRegistry createRunningJobsRegistry();
+    protected JobResultStore createJobResultStore() {
+        return new EmbeddedJobResultStore();

Review comment:
       I removed this method entirely. The `JobResultStore` is passed in as a 
parameter through the constructor now. The actual implementations of 
`AbstractHaServices` are forced to instantiate it and pass it through the 
parent class constructor.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobPersistenceComponentFactory.java
##########
@@ -18,13 +18,22 @@
 
 package org.apache.flink.runtime.jobmanager;
 
-/** Factory for {@link JobGraphStore}. */
-public interface JobGraphStoreFactory {
+import org.apache.flink.runtime.highavailability.JobResultStore;
+
+/** Factory for components that are responsible for persisting a job for 
recovery. */
+public interface JobPersistenceComponentFactory {
 
     /**
      * Creates a {@link JobGraphStore}.
      *
      * @return a {@link JobGraphStore} instance
      */
-    JobGraphStore create();
+    JobGraphStore createJobGraphStore();
+
+    /**
+     * Creates {@link JobResultStore} instances.
+     *
+     * @return {@code JobResultStore} instances.

Review comment:
       I prefer having only one link per JavaDoc paragraph (like it's done in 
Wikipedia) to reduce the distraction caused by link formatings when reading it. 
I aligned the JavaDoc of both methods in `JobPersistenceComponentFactory` 
accordingly. 👍 

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
##########
@@ -65,7 +68,14 @@ public void onStart() throws Exception {
     }
 
     void completeJobExecution(ExecutionGraphInfo executionGraphInfo) {
-        runAsync(() -> jobReachedTerminalState(executionGraphInfo));
+        runAsync(
+                () -> {
+                    try {
+                        jobReachedTerminalState(executionGraphInfo);
+                    } catch (Exception e) {
+                        e.printStackTrace();

Review comment:
       yikes, that one I overlooked 👍 

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
##########
@@ -235,8 +235,7 @@ private void runCleanupTestWithJob(
             final LeaderElectionService jobManagerLeaderElectionService =
                     
zooKeeperHaServices.getJobManagerLeaderElectionService(jobId);
 
-            final RunningJobsRegistry runningJobsRegistry =
-                    zooKeeperHaServices.getRunningJobsRegistry();
+            final JobResultStore jobResultStore = 
zooKeeperHaServices.getJobResultStore();

Review comment:
       good catch. We should remove it. The test is about cleaning up the 
znodes. It's just about creating some paths. The integration of this component 
into the test becomes obsolete with the `JobResultStore` replacing the 
`RunningJobRegistry` since no ZK-related artifacts are created by the 
`JobResultStore`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to