XComp commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1281945188


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -516,7 +517,7 @@ public CompletableFuture<Acknowledge> submitJob(JobGraph 
jobGraph, Time timeout)
 
         try {
             if (isDuplicateJob(jobGraph.getJobID())) {
-                if (isInGloballyTerminalState(jobGraph.getJobID())) {
+                if (isInGloballyTerminalState(jobGraph.getJobID()).get()) {

Review Comment:
   Reminder: That's going to create conflicts when rebasing this branch to 
master the next time.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java:
##########
@@ -77,7 +77,14 @@ public JobDispatcherLeaderProcessFactory createFactory(
         }
 
         final JobResultStore jobResultStore = 
jobPersistenceComponentFactory.createJobResultStore();
-        final Collection<JobResult> recoveredDirtyJobResults = 
getDirtyJobResults(jobResultStore);
+        final Collection<JobResult> recoveredDirtyJobResults;

Review Comment:
   This class/file doesn't need to be changed anymore after moving back to 
`JobResultStore#getDirtyResult`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -516,7 +517,7 @@ public CompletableFuture<Acknowledge> submitJob(JobGraph 
jobGraph, Time timeout)
 
         try {
             if (isDuplicateJob(jobGraph.getJobID())) {
-                if (isInGloballyTerminalState(jobGraph.getJobID())) {
+                if (isInGloballyTerminalState(jobGraph.getJobID()).get()) {

Review Comment:
   Additionally, we shouldn't call `.get()` here where the error is the caught 
in a `catch` class returning a separate `CompletableFuture` again. Can't we use 
the Future methods here? Or is that harder to read? :thinking: 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -580,17 +581,13 @@ private boolean isDuplicateJob(JobID jobId) throws 
FlinkException {
      * Checks whether the given job has already been executed.
      *
      * @param jobId identifying the submitted job
-     * @return true if the job has already finished, either successfully or as 
a failure
+     * @return a successfully completed future with {@code true} if the job 
has already finished,
+     *     either successfully or as a failure
      * @throws FlinkException if the job scheduling status cannot be retrieved

Review Comment:
   ```suggestion
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1390,29 +1392,71 @@ private CompletableFuture<CleanupJobState> 
registerGloballyTerminatedJobInJobRes
                 "Job %s is in state %s which is not globally terminal.",
                 jobId,
                 terminalJobStatus);
-
-        ioExecutor.execute(
-                () -> {
-                    try {
-                        if (jobResultStore.hasCleanJobResultEntry(jobId)) {
-                            log.warn(
-                                    "Job {} is already marked as clean but 
clean up was triggered again.",
-                                    jobId);
-                        } else if 
(!jobResultStore.hasDirtyJobResultEntry(jobId)) {
-                            jobResultStore.createDirtyResult(
-                                    new JobResultEntry(
-                                            
JobResult.createFrom(archivedExecutionGraph)));
-                            log.info(
-                                    "Job {} has been registered for cleanup in 
the JobResultStore after reaching a terminal state.",
-                                    jobId);
-                        }
-                    } catch (IOException e) {
-                        writeFuture.completeExceptionally(e);
+        CompletableFuture<Boolean> shouldCheckDirtyJobResult =
+                jobResultStore
+                        .hasCleanJobResultEntryAsync(jobId)
+                        .handleAsync(
+                                (hasCleanJobResultEntry, throwable) -> {
+                                    if (throwable != null) {
+                                        
writeFuture.completeExceptionally(throwable);
+                                        return false;
+                                    } else {
+                                        if (hasCleanJobResultEntry) {
+                                            log.warn(
+                                                    "Job {} is already marked 
as clean but "
+                                                            + "clean up was 
triggered again.",
+                                                    jobId);
+                                            writeFuture.complete(null);
+                                            return false;
+                                        } else {
+                                            return true;
+                                        }
+                                    }
+                                });
+        shouldCheckDirtyJobResult.whenCompleteAsync(
+                (shouldCheck, throwable1) -> {
+                    if (throwable1 != null) {
+                        writeFuture.completeExceptionally(throwable1);
                         return;
                     }
-                    writeFuture.complete(null);
+                    if (shouldCheck) {
+                        jobResultStore
+                                .hasDirtyJobResultEntryAsync(jobId)
+                                .whenCompleteAsync(
+                                        (hasDirtyJobResultEntry, throwable2) 
-> {
+                                            if (throwable2 != null) {
+                                                
writeFuture.completeExceptionally(throwable2);
+                                                return;
+                                            }
+                                            if (!hasDirtyJobResultEntry) {
+                                                jobResultStore
+                                                        
.createDirtyResultAsync(
+                                                                new 
JobResultEntry(
+                                                                        
JobResult.createFrom(
+                                                                               
 archivedExecutionGraph)))
+                                                        .whenCompleteAsync(
+                                                                (unused, 
throwable3) -> {
+                                                                    if 
(throwable3 != null) {
+                                                                        
writeFuture
+                                                                               
 .completeExceptionally(
+                                                                               
         throwable3);
+                                                                        return;
+                                                                    }
+                                                                    log.info(
+                                                                            
"Job {} has been registered "
+                                                                               
     + "for cleanup in "
+                                                                               
     + "the JobResultStore "
+                                                                               
     + "after reaching a "
+                                                                               
     + "terminal state.",
+                                                                            
jobId);
+                                                                    
writeFuture.complete(null);
+                                                                });
+                                            } else {
+                                                writeFuture.complete(null);
+                                            }
+                                        });
+                    }

Review Comment:
   This code block can be simplified (we don't need an extra `writeFuture` 
anymore) in the following way:
   ```
   return jobResultStore
                   .hasCleanJobResultEntryAsync(jobId)
                   .thenCompose(
                           hasCleanJobResultEntry -> {
                               if (hasCleanJobResultEntry) {
                                   log.warn(
                                           "Job {} is already marked as clean 
but clean up was triggered again.",
                                           jobId);
                                   return FutureUtils.completedVoidFuture();
                               } else {
                                   return jobResultStore
                                           .hasDirtyJobResultEntryAsync(jobId)
                                           .thenCompose(
                                                   hasDirtyJobResultEntry -> {
                                                       if 
(hasDirtyJobResultEntry) {
                                                           return 
FutureUtils.completedVoidFuture();
                                                       }
                                                       return 
jobResultStore.createDirtyResultAsync(
                                                               new 
JobResultEntry(
                                                                       
JobResult.createFrom(
                                                                               
archivedExecutionGraph)));
                                                   });
                               }
                           })
                   .handleAsync(
                           (ignored, error) -> {
                               if (error != null) {
                                   fatalErrorHandler.onFatalError(
                                           new FlinkException(
                                                   String.format(
                                                           "The job %s couldn't 
be marked as pre-cleanup finished in JobResultStore.",
                                                           
executionGraphInfo.getJobId()),
                                                   error));
                               }
                               return 
CleanupJobState.globalCleanup(terminalJobStatus);
                           },
                           getMainThreadExecutor());
   ```
   WDYT?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##########
@@ -43,69 +44,70 @@ public interface JobResultStore {
      * Registers the passed {@link JobResultEntry} instance as {@code dirty} 
which indicates that
      * clean-up operations still need to be performed. Once the job resource 
cleanup has been
      * finalized, we can mark the {@code JobResultEntry} as {@code clean} 
result using {@link
-     * #markResultAsClean(JobID)}.
+     * #markResultAsCleanAsync(JobID)}.
      *
      * @param jobResultEntry The job result we wish to persist.
-     * @throws IOException if the creation of the dirty result failed for IO 
reasons.
-     * @throws IllegalStateException if the passed {@code jobResultEntry} has 
a {@code JobID}
-     *     attached that is already registered in this {@code JobResultStore}.
+     * @return a successfully completed future with {@code true} if the dirty 
result is created
+     *     successfully. The method will throw {@link IllegalStateException} 
if the passed {@code

Review Comment:
   It's not the method that will throw the exception. The future will complete 
exceptionally.



##########
flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java:
##########
@@ -381,9 +381,12 @@ public static void rethrowException(Throwable t) throws 
Exception {
      * @param e exception to throw if not null.
      * @throws Exception
      */
-    public static void tryRethrowException(@Nullable Exception e) throws 
Exception {
+    public static void tryRethrowException(@Nullable Throwable e) throws 
Exception {

Review Comment:
   ```suggestion
       public static void tryRethrow(@Nullable Throwable e) throws Throwable {
   ```
   I'm wondering whether we should refactor this properly in a separate hotfix 
commit. WDYT?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##########
@@ -43,69 +44,70 @@ public interface JobResultStore {
      * Registers the passed {@link JobResultEntry} instance as {@code dirty} 
which indicates that
      * clean-up operations still need to be performed. Once the job resource 
cleanup has been
      * finalized, we can mark the {@code JobResultEntry} as {@code clean} 
result using {@link
-     * #markResultAsClean(JobID)}.
+     * #markResultAsCleanAsync(JobID)}.
      *
      * @param jobResultEntry The job result we wish to persist.
-     * @throws IOException if the creation of the dirty result failed for IO 
reasons.
-     * @throws IllegalStateException if the passed {@code jobResultEntry} has 
a {@code JobID}
-     *     attached that is already registered in this {@code JobResultStore}.
+     * @return a successfully completed future with {@code true} if the dirty 
result is created
+     *     successfully. The method will throw {@link IllegalStateException} 
if the passed {@code
+     *     jobResultEntry} has a {@code JobID} attached that is already 
registered in this {@code
+     *     JobResultStore}.
      */
-    void createDirtyResult(JobResultEntry jobResultEntry) throws IOException, 
IllegalStateException;
+    CompletableFuture<Void> createDirtyResultAsync(JobResultEntry 
jobResultEntry);
 
     /**
      * Marks an existing {@link JobResultEntry} as {@code clean}. This 
indicates that no more
      * resource cleanup steps need to be performed. No actions should be 
triggered if the passed
      * {@code JobID} belongs to a job that was already marked as clean.
      *
      * @param jobId Ident of the job we wish to mark as clean.
-     * @throws IOException if marking the {@code dirty} {@code JobResultEntry} 
as {@code clean}
-     *     failed for IO reasons.
-     * @throws NoSuchElementException if there is no corresponding {@code 
dirty} job present in the
-     *     store for the given {@code JobID}.
+     * @return a successfully completed future if the result is marked 
successfully, The future will
+     *     completed with {@link NoSuchElementException} if there is no 
corresponding {@code dirty}
+     *     job present in the store for the given {@code JobID}.
      */
-    void markResultAsClean(JobID jobId) throws IOException, 
NoSuchElementException;
+    CompletableFuture<Void> markResultAsCleanAsync(JobID jobId);
 
     /**
-     * Returns whether the store already contains an entry for a job.
+     * Returns the future of whether the store already contains an entry for a 
job.
      *
      * @param jobId Ident of the job we wish to check the store for.
-     * @return {@code true} if a {@code dirty} or {@code clean} {@link 
JobResultEntry} exists for
-     *     the given {@code JobID}; otherwise {@code false}.
-     * @throws IOException if determining whether a job entry is present in 
the store failed for IO
-     *     reasons.
+     * @return a successfully completed future with {@code true} if a {@code 
dirty} or {@code clean}
+     *     {@link JobResultEntry} exists for the given {@code JobID}; 
otherwise a successfully
+     *     completed future with {@code false}.
      */
-    default boolean hasJobResultEntry(JobID jobId) throws IOException {
-        return hasDirtyJobResultEntry(jobId) || hasCleanJobResultEntry(jobId);
+    default CompletableFuture<Boolean> hasJobResultEntryAsync(JobID jobId) {
+        return hasDirtyJobResultEntryAsync(jobId)
+                .thenCombine(
+                        hasCleanJobResultEntryAsync(jobId),
+                        (result1, result2) -> result1 || result2);
     }
 
     /**
-     * Returns whether the store already contains a {@code dirty} entry for 
the given {@code JobID}.
+     * Returns the future of whether the store contains a {@code dirty} entry 
for the given {@code
+     * JobID}.
      *
      * @param jobId Ident of the job we wish to check the store for.
-     * @return {@code true}, if a {@code dirty} entry exists for the given 
{@code JobID}; otherwise
-     *     {@code false}.
-     * @throws IOException if determining whether a job entry is present in 
the store failed for IO
-     *     reasons.
+     * @return a successfully completed future with {@code true}, if a {@code 
dirty} entry exists
+     *     for the given {@code JobID}; otherwise a successfully completed 
future with {@code

Review Comment:
   ```suggestion
        *     for the given {@code JobID}; otherwise {@code
   ```
   nit: no need to add this - it should be understood



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java:
##########
@@ -57,17 +61,17 @@ public void markResultAsCleanInternal(JobID jobId) throws 
IOException, NoSuchEle
     }
 
     @Override
-    public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws 
IOException {
+    public Boolean hasDirtyJobResultEntryInternal(JobID jobId) {
         return dirtyJobResults.containsKey(jobId);
     }
 
     @Override
-    public boolean hasCleanJobResultEntryInternal(JobID jobId) throws 
IOException {
+    public Boolean hasCleanJobResultEntryInternal(JobID jobId) {

Review Comment:
   ```suggestion
       public boolean hasCleanJobResultEntryInternal(JobID jobId) {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -277,18 +277,11 @@ private void startJobMasterServiceProcessAsync(UUID 
leaderSessionId) {
 
     @GuardedBy("lock")
     private void 
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID leaderSessionId)
-            throws FlinkException {
-        try {
-            if (jobResultStore.hasJobResultEntry(getJobID())) {
-                jobAlreadyDone(leaderSessionId);
-            } else {
-                createNewJobMasterServiceProcess(leaderSessionId);
-            }
-        } catch (IOException e) {
-            throw new FlinkException(
-                    String.format(
-                            "Could not retrieve the job scheduling status for 
job %s.", getJobID()),
-                    e);
+            throws FlinkException, ExecutionException, InterruptedException {
+        if (jobResultStore.hasJobResultEntryAsync(getJobID()).get()) {

Review Comment:
   This one, I feel like addressing in a separate review because it requires 
some deeper changes. But we could leverage the async nature here as well.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java:
##########
@@ -116,46 +123,74 @@ public void 
testBaseDirectoryCreationOnResultStoreInitialization() throws Except
         assertThat(emptyBaseDirectory).doesNotExist();
 
         fileSystemJobResultStore =
-                new FileSystemJobResultStore(basePath.getFileSystem(), 
basePath, false);
+                new FileSystemJobResultStore(
+                        basePath.getFileSystem(), basePath, false, 
manuallyTriggeredExecutor);
         // Result store operations are creating the base directory on-the-fly
         assertThat(emptyBaseDirectory).doesNotExist();
-        fileSystemJobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        CompletableFuture<Void> dirtyResultAsync =
+                
fileSystemJobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY);
+        manuallyTriggeredExecutor.trigger();

Review Comment:
   hint: With my proposal of the `AbstractThreadsafeJobResultStore` these kind 
of tests will start to timeout because we're having two separate async 
operations being performed (checking that no entry exist and the actual 
operation of creating the file). Therefore, if you go with my proposal (or 
something similar), you would have to change from `trigger()` to `triggerAll()` 
here and in other test methods.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java:
##########
@@ -116,58 +111,53 @@ public static TestingJobResultStore.Builder builder() {
     /** {@code Builder} for instantiating {@code TestingJobResultStore} 
instances. */
     public static class Builder {
 
-        private ThrowingConsumer<JobResultEntry, ? extends IOException> 
createDirtyResultConsumer =
-                ignored -> {};
-        private ThrowingConsumer<JobID, ? extends IOException> 
markResultAsCleanConsumer =
-                ignored -> {};
+        private Function<JobResultEntry, CompletableFuture<Void>> 
createDirtyResultConsumer =
+                jobResultEntry -> CompletableFuture.completedFuture(null);

Review Comment:
   ```suggestion
                   jobResultEntry -> FutureUtils.completedVoidFuture();
   ```
   That's a really nitty thing. Feel free to ignore it. I just wanted to point 
out that `FutureUtils` has a utility method for that (that would also apply to 
some of the other fields if you want to change that).



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -332,11 +332,16 @@ public void testJobBeingMarkedAsDirtyBeforeCleanup() 
throws Exception {
                                 TestingJobResultStore.builder()
                                         .withCreateDirtyResultConsumer(
                                                 ignoredJobResultEntry -> {
+                                                    CompletableFuture<Boolean> 
result =
+                                                            new 
CompletableFuture<>();
                                                     try {
                                                         
markAsDirtyLatch.await();
                                                     } catch 
(InterruptedException e) {
-                                                        throw new 
RuntimeException(e);
+                                                        
result.completeExceptionally(
+                                                                new 
RuntimeException(e));

Review Comment:
   That is correct. But we don't need the `RuntimeException` anymore. We can 
just use `FutureUtils.completeExceptionally(e)` with the `InterruptedException` 
here



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java:
##########
@@ -181,21 +185,19 @@ public void markResultAsCleanInternal(JobID jobId) throws 
IOException, NoSuchEle
     }
 
     @Override
-    public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws 
IOException {
+    public Boolean hasDirtyJobResultEntryInternal(JobID jobId) throws 
IOException {
         return fileSystem.exists(constructDirtyPath(jobId));
     }
 
     @Override
-    public boolean hasCleanJobResultEntryInternal(JobID jobId) throws 
IOException {
+    public Boolean hasCleanJobResultEntryInternal(JobID jobId) throws 
IOException {
         return fileSystem.exists(constructCleanPath(jobId));
     }
 
     @Override
     public Set<JobResult> getDirtyResultsInternal() throws IOException {
         createBasePathIfNeeded();
-
         final FileStatus[] statuses = fileSystem.listStatus(this.basePath);
-

Review Comment:
   Is there a reason why you removed these lines?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java:
##########
@@ -181,21 +185,19 @@ public void markResultAsCleanInternal(JobID jobId) throws 
IOException, NoSuchEle
     }
 
     @Override
-    public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws 
IOException {
+    public Boolean hasDirtyJobResultEntryInternal(JobID jobId) throws 
IOException {
         return fileSystem.exists(constructDirtyPath(jobId));
     }
 
     @Override
-    public boolean hasCleanJobResultEntryInternal(JobID jobId) throws 
IOException {
+    public Boolean hasCleanJobResultEntryInternal(JobID jobId) throws 
IOException {

Review Comment:
   ```suggestion
       public boolean hasCleanJobResultEntryInternal(JobID jobId) throws 
IOException {
   ```
   This change is not necessary



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java:
##########
@@ -43,61 +47,126 @@ public abstract class AbstractThreadsafeJobResultStore 
implements JobResultStore
 
     private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
 
-    @Override
-    public void createDirtyResult(JobResultEntry jobResultEntry) throws 
IOException {
-        Preconditions.checkState(
-                !hasJobResultEntry(jobResultEntry.getJobId()),
-                "Job result store already contains an entry for job %s",
-                jobResultEntry.getJobId());
+    private final Executor ioExecutor;
+
+    public AbstractThreadsafeJobResultStore(Executor ioExecutor) {

Review Comment:
   ```suggestion
       protected AbstractThreadsafeJobResultStore(Executor ioExecutor) {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java:
##########


Review Comment:
   Maybe it helps to compare your approach with the proposal above to get a 
better understanding of the `CompletableFuture` usage. Feel free to ask 
questions about it. And don't hesitate to question my approach! See it as a 
discussion item. :-)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java:
##########


Review Comment:
   I'm gonna provide a general proposal for this class. It already looks good. 
I just think that we could leverage the CompletableFuture utility methods in a 
more efficient way. PTAL:
   ```
   package org.apache.flink.runtime.highavailability;
   
   import org.apache.flink.api.common.JobID;
   import org.apache.flink.runtime.jobmaster.JobResult;
   import org.apache.flink.util.Preconditions;
   import org.apache.flink.util.concurrent.FutureUtils;
   import org.apache.flink.util.function.SupplierWithException;
   import org.apache.flink.util.function.ThrowingRunnable;
   
   import org.slf4j.Logger;
   import org.slf4j.LoggerFactory;
   
   import javax.annotation.concurrent.GuardedBy;
   
   import java.io.IOException;
   import java.util.NoSuchElementException;
   import java.util.Set;
   import java.util.concurrent.CompletableFuture;
   import java.util.concurrent.Executor;
   import java.util.concurrent.locks.ReadWriteLock;
   import java.util.concurrent.locks.ReentrantReadWriteLock;
   
   /** An abstract class for threadsafe implementations of the {@link 
JobResultStore}. */
   public abstract class AbstractThreadsafeJobResultStore implements 
JobResultStore {
   
       private static final Logger LOG =
               LoggerFactory.getLogger(AbstractThreadsafeJobResultStore.class);
   
       private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
   
       private final Executor ioExecutor;
   
       protected AbstractThreadsafeJobResultStore(Executor ioExecutor) {
           this.ioExecutor = ioExecutor;
       }
   
       @Override
       public CompletableFuture<Void> createDirtyResultAsync(JobResultEntry 
jobResultEntry) {
           return hasJobResultEntryAsync(jobResultEntry.getJobId())
                   .thenAccept(
                           hasJobResultEntry ->
                                   Preconditions.checkState(
                                           !hasJobResultEntry,
                                           "Job result store already contains 
an entry for job %s",
                                           jobResultEntry.getJobId()))
                   .thenCompose(
                           ignoredVoid ->
                                   withWriteLockAsync(
                                           () -> 
createDirtyResultInternal(jobResultEntry)));
       }
   
       @GuardedBy("readWriteLock")
       protected abstract void createDirtyResultInternal(JobResultEntry 
jobResultEntry)
               throws IOException;
   
       @Override
       public CompletableFuture<Void> markResultAsCleanAsync(JobID jobId) {
           return hasCleanJobResultEntryAsync(jobId)
                   .thenCompose(
                           hasCleanJobResultEntry -> {
                               if (hasCleanJobResultEntry) {
                                   LOG.debug(
                                           "The job {} is already marked as 
clean. No action required.",
                                           jobId);
                                   return FutureUtils.completedVoidFuture();
                               }
   
                               return withWriteLockAsync(() -> 
markResultAsCleanInternal(jobId));
                           });
       }
   
       @GuardedBy("readWriteLock")
       protected abstract void markResultAsCleanInternal(JobID jobId)
               throws IOException, NoSuchElementException;
   
       @Override
       public CompletableFuture<Boolean> hasJobResultEntryAsync(JobID jobId) {
           return withReadLockAsync(
                   () ->
                           hasDirtyJobResultEntryInternal(jobId)
                                   || hasCleanJobResultEntryInternal(jobId));
       }
   
       @Override
       public CompletableFuture<Boolean> hasDirtyJobResultEntryAsync(JobID 
jobId) {
           return withReadLockAsync(() -> 
hasDirtyJobResultEntryInternal(jobId));
       }
   
       @GuardedBy("readWriteLock")
       protected abstract Boolean hasDirtyJobResultEntryInternal(JobID jobId) 
throws IOException;
   
       @Override
       public CompletableFuture<Boolean> hasCleanJobResultEntryAsync(JobID 
jobId) {
           return withReadLockAsync(() -> 
hasCleanJobResultEntryInternal(jobId));
       }
   
       @GuardedBy("readWriteLock")
       protected abstract Boolean hasCleanJobResultEntryInternal(JobID jobId) 
throws IOException;
   
       @Override
       public Set<JobResult> getDirtyResults() throws IOException {
           return getDirtyResultsInternal();
       }
   
       @GuardedBy("readWriteLock")
       protected abstract Set<JobResult> getDirtyResultsInternal() throws 
IOException;
   
       private CompletableFuture<Void> 
withWriteLockAsync(ThrowingRunnable<IOException> runnable) {
           return FutureUtils.runAsync(() -> withWriteLock(runnable), 
ioExecutor);
       }
   
       private void withWriteLock(ThrowingRunnable<IOException> runnable) 
throws IOException {
           readWriteLock.writeLock().lock();
           try {
               runnable.run();
           } finally {
               readWriteLock.writeLock().unlock();
           }
       }
   
       private <T> CompletableFuture<T> withReadLockAsync(
               SupplierWithException<T, IOException> runnable) {
           return FutureUtils.supplyAsync(() -> withReadLock(runnable), 
ioExecutor);
       }
   
       private <T> T withReadLock(SupplierWithException<T, IOException> 
supplier) throws IOException {
           readWriteLock.readLock().lock();
           try {
               return supplier.get();
           } finally {
               readWriteLock.readLock().unlock();
           }
       }
   }
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##########
@@ -43,69 +44,70 @@ public interface JobResultStore {
      * Registers the passed {@link JobResultEntry} instance as {@code dirty} 
which indicates that
      * clean-up operations still need to be performed. Once the job resource 
cleanup has been
      * finalized, we can mark the {@code JobResultEntry} as {@code clean} 
result using {@link
-     * #markResultAsClean(JobID)}.
+     * #markResultAsCleanAsync(JobID)}.
      *
      * @param jobResultEntry The job result we wish to persist.
-     * @throws IOException if the creation of the dirty result failed for IO 
reasons.
-     * @throws IllegalStateException if the passed {@code jobResultEntry} has 
a {@code JobID}
-     *     attached that is already registered in this {@code JobResultStore}.
+     * @return a successfully completed future with {@code true} if the dirty 
result is created
+     *     successfully. The method will throw {@link IllegalStateException} 
if the passed {@code
+     *     jobResultEntry} has a {@code JobID} attached that is already 
registered in this {@code
+     *     JobResultStore}.
      */
-    void createDirtyResult(JobResultEntry jobResultEntry) throws IOException, 
IllegalStateException;
+    CompletableFuture<Void> createDirtyResultAsync(JobResultEntry 
jobResultEntry);
 
     /**
      * Marks an existing {@link JobResultEntry} as {@code clean}. This 
indicates that no more
      * resource cleanup steps need to be performed. No actions should be 
triggered if the passed
      * {@code JobID} belongs to a job that was already marked as clean.
      *
      * @param jobId Ident of the job we wish to mark as clean.
-     * @throws IOException if marking the {@code dirty} {@code JobResultEntry} 
as {@code clean}
-     *     failed for IO reasons.
-     * @throws NoSuchElementException if there is no corresponding {@code 
dirty} job present in the
-     *     store for the given {@code JobID}.
+     * @return a successfully completed future if the result is marked 
successfully, The future will
+     *     completed with {@link NoSuchElementException} if there is no 
corresponding {@code dirty}
+     *     job present in the store for the given {@code JobID}.
      */
-    void markResultAsClean(JobID jobId) throws IOException, 
NoSuchElementException;
+    CompletableFuture<Void> markResultAsCleanAsync(JobID jobId);
 
     /**
-     * Returns whether the store already contains an entry for a job.
+     * Returns the future of whether the store already contains an entry for a 
job.
      *
      * @param jobId Ident of the job we wish to check the store for.
-     * @return {@code true} if a {@code dirty} or {@code clean} {@link 
JobResultEntry} exists for
-     *     the given {@code JobID}; otherwise {@code false}.
-     * @throws IOException if determining whether a job entry is present in 
the store failed for IO
-     *     reasons.
+     * @return a successfully completed future with {@code true} if a {@code 
dirty} or {@code clean}
+     *     {@link JobResultEntry} exists for the given {@code JobID}; 
otherwise a successfully
+     *     completed future with {@code false}.
      */
-    default boolean hasJobResultEntry(JobID jobId) throws IOException {
-        return hasDirtyJobResultEntry(jobId) || hasCleanJobResultEntry(jobId);
+    default CompletableFuture<Boolean> hasJobResultEntryAsync(JobID jobId) {
+        return hasDirtyJobResultEntryAsync(jobId)
+                .thenCombine(
+                        hasCleanJobResultEntryAsync(jobId),
+                        (result1, result2) -> result1 || result2);
     }
 
     /**
-     * Returns whether the store already contains a {@code dirty} entry for 
the given {@code JobID}.
+     * Returns the future of whether the store contains a {@code dirty} entry 
for the given {@code
+     * JobID}.
      *
      * @param jobId Ident of the job we wish to check the store for.
-     * @return {@code true}, if a {@code dirty} entry exists for the given 
{@code JobID}; otherwise
-     *     {@code false}.
-     * @throws IOException if determining whether a job entry is present in 
the store failed for IO
-     *     reasons.
+     * @return a successfully completed future with {@code true}, if a {@code 
dirty} entry exists
+     *     for the given {@code JobID}; otherwise a successfully completed 
future with {@code
+     *     false}.
      */
-    boolean hasDirtyJobResultEntry(JobID jobId) throws IOException;
+    CompletableFuture<Boolean> hasDirtyJobResultEntryAsync(JobID jobId);
 
     /**
-     * Returns whether the store already contains a {@code clean} entry for 
the given {@code JobID}.
+     * Returns the future of whether the store contains a {@code clean} entry 
for the given {@code
+     * JobID}.
      *
      * @param jobId Ident of the job we wish to check the store for.
-     * @return {@code true}, if a {@code clean} entry exists for the given 
{@code JobID}; otherwise
-     *     {@code false}.
-     * @throws IOException if determining whether a job entry is present in 
the store failed for IO
-     *     reasons.
+     * @return a successfully completed future with {@code true}, if a {@code 
clean} entry exists
+     *     for the given {@code JobID}; otherwise a successfully completed 
future with {@code
+     *     false}.
      */
-    boolean hasCleanJobResultEntry(JobID jobId) throws IOException;
+    CompletableFuture<Boolean> hasCleanJobResultEntryAsync(JobID jobId);
 
     /**
-     * Get the persisted {@link JobResult} instances that are marked as {@code 
dirty}. This is
+     * Returns persisted {@link JobResult} instances that are marked as {@code 
dirty}. This is
      * useful for recovery of finalization steps.
      *
-     * @return A set of dirty {@code JobResults} from the store.
-     * @throws IOException if collecting the set of dirty results failed for 
IO reasons.
+     * @return a set of dirty {@code JobResults} from the store.

Review Comment:
   The JavaDoc should be reverted as well.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1273,7 +1270,22 @@ private CompletableFuture<Void> removeJob(JobID jobId, 
CleanupJobState cleanupJo
         if (cleanupJobState.isGlobalCleanup()) {
             return globalResourceCleaner
                     .cleanupAsync(jobId)
-                    .thenRunAsync(() -> markJobAsClean(jobId), ioExecutor)
+                    .thenCompose(unused -> 
jobResultStore.markResultAsCleanAsync(jobId))
+                    .handle(
+                            (BiFunction<Void, Throwable, Void>)
+                                    (unused, e) -> {

Review Comment:
   ```suggestion
                       .thenCompose(unusedVoid -> 
jobResultStore.markResultAsCleanAsync(jobId))
                       .handle(
                                       (unusedVoid, error) -> {
   ```
   The cast is not necessary. Instead, you could use meaningful parameter names 
that explain the purpose of each parameter.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1390,29 +1392,71 @@ private CompletableFuture<CleanupJobState> 
registerGloballyTerminatedJobInJobRes
                 "Job %s is in state %s which is not globally terminal.",
                 jobId,
                 terminalJobStatus);
-
-        ioExecutor.execute(
-                () -> {
-                    try {
-                        if (jobResultStore.hasCleanJobResultEntry(jobId)) {
-                            log.warn(
-                                    "Job {} is already marked as clean but 
clean up was triggered again.",
-                                    jobId);
-                        } else if 
(!jobResultStore.hasDirtyJobResultEntry(jobId)) {
-                            jobResultStore.createDirtyResult(
-                                    new JobResultEntry(
-                                            
JobResult.createFrom(archivedExecutionGraph)));
-                            log.info(
-                                    "Job {} has been registered for cleanup in 
the JobResultStore after reaching a terminal state.",
-                                    jobId);
-                        }
-                    } catch (IOException e) {
-                        writeFuture.completeExceptionally(e);
+        CompletableFuture<Boolean> shouldCheckDirtyJobResult =
+                jobResultStore
+                        .hasCleanJobResultEntryAsync(jobId)
+                        .handleAsync(
+                                (hasCleanJobResultEntry, throwable) -> {
+                                    if (throwable != null) {
+                                        
writeFuture.completeExceptionally(throwable);
+                                        return false;
+                                    } else {
+                                        if (hasCleanJobResultEntry) {
+                                            log.warn(
+                                                    "Job {} is already marked 
as clean but "
+                                                            + "clean up was 
triggered again.",
+                                                    jobId);
+                                            writeFuture.complete(null);
+                                            return false;
+                                        } else {
+                                            return true;
+                                        }
+                                    }
+                                });
+        shouldCheckDirtyJobResult.whenCompleteAsync(
+                (shouldCheck, throwable1) -> {
+                    if (throwable1 != null) {
+                        writeFuture.completeExceptionally(throwable1);
                         return;
                     }
-                    writeFuture.complete(null);
+                    if (shouldCheck) {
+                        jobResultStore
+                                .hasDirtyJobResultEntryAsync(jobId)
+                                .whenCompleteAsync(
+                                        (hasDirtyJobResultEntry, throwable2) 
-> {
+                                            if (throwable2 != null) {
+                                                
writeFuture.completeExceptionally(throwable2);
+                                                return;
+                                            }
+                                            if (!hasDirtyJobResultEntry) {
+                                                jobResultStore
+                                                        
.createDirtyResultAsync(
+                                                                new 
JobResultEntry(
+                                                                        
JobResult.createFrom(
+                                                                               
 archivedExecutionGraph)))
+                                                        .whenCompleteAsync(
+                                                                (unused, 
throwable3) -> {
+                                                                    if 
(throwable3 != null) {
+                                                                        
writeFuture
+                                                                               
 .completeExceptionally(
+                                                                               
         throwable3);
+                                                                        return;
+                                                                    }
+                                                                    log.info(
+                                                                            
"Job {} has been registered "
+                                                                               
     + "for cleanup in "
+                                                                               
     + "the JobResultStore "
+                                                                               
     + "after reaching a "
+                                                                               
     + "terminal state.",
+                                                                            
jobId);
+                                                                    
writeFuture.complete(null);
+                                                                });
+                                            } else {
+                                                writeFuture.complete(null);
+                                            }
+                                        });
+                    }

Review Comment:
   This could be split up into meaningful private methods to handle the 
individual if branches:
   ```
       private CompletableFuture<CleanupJobState> 
registerGloballyTerminatedJobInJobResultStore(
               ExecutionGraphInfo executionGraphInfo) {
           final JobID jobId = executionGraphInfo.getJobId();
   
           final AccessExecutionGraph archivedExecutionGraph =
                   executionGraphInfo.getArchivedExecutionGraph();
   
           final JobStatus terminalJobStatus = 
archivedExecutionGraph.getState();
           Preconditions.checkArgument(
                   terminalJobStatus.isGloballyTerminalState(),
                   "Job %s is in state %s which is not globally terminal.",
                   jobId,
                   terminalJobStatus);
   
           return jobResultStore
                   .hasCleanJobResultEntryAsync(jobId)
                   .thenCompose(
                           hasCleanJobResultEntry ->
                                   createDirtyJobResultEntryIfMissingAsync(
                                           archivedExecutionGraph, 
hasCleanJobResultEntry))
                   .handleAsync(
                           (ignored, error) -> {
                               if (error != null) {
                                   fatalErrorHandler.onFatalError(
                                           new FlinkException(
                                                   String.format(
                                                           "The job %s couldn't 
be marked as pre-cleanup finished in JobResultStore.",
                                                           
executionGraphInfo.getJobId()),
                                                   error));
                               }
                               return 
CleanupJobState.globalCleanup(terminalJobStatus);
                           },
                           getMainThreadExecutor());
       }
   
       /**
        * Creates a dirty entry in the {@link #jobResultStore} if there's no 
entry at all for the given
        * {@code executionGraph} in the {@code JobResultStore}.
        *
        * @param executionGraph The {@link AccessExecutionGraph} for which the 
{@link JobResult} shall
        *     be persisted.
        * @param hasCleanJobResultEntry The decision the dirty entry check is 
based on.
        * @return {@code CompletableFuture} that completes as soon as the entry 
exists.
        */
       private CompletableFuture<Void> createDirtyJobResultEntryIfMissingAsync(
               AccessExecutionGraph executionGraph, boolean 
hasCleanJobResultEntry) {
           final JobID jobId = executionGraph.getJobID();
           if (hasCleanJobResultEntry) {
               log.warn("Job {} is already marked as clean but clean up was 
triggered again.", jobId);
               return FutureUtils.completedVoidFuture();
           } else {
               return jobResultStore
                       .hasDirtyJobResultEntryAsync(jobId)
                       .thenCompose(
                               hasDirtyJobResultEntry ->
                                       createDirtyJobResultEntryAsync(
                                               executionGraph, 
hasDirtyJobResultEntry));
           }
       }
   
       /**
        * Creates a dirty entry in the {@link #jobResultStore} based on the 
passed {@code
        * hasDirtyJobResultEntry} flag.
        *
        * @param executionGraph The {@link AccessExecutionGraph} that is used 
to generate the entry.
        * @param hasDirtyJobResultEntry The decision the entry creation is 
based on.
        * @return {@code CompletableFuture} that completes as soon as the entry 
exists.
        */
       private CompletableFuture<Void> createDirtyJobResultEntryAsync(
               AccessExecutionGraph executionGraph, boolean 
hasDirtyJobResultEntry) {
           if (hasDirtyJobResultEntry) {
               return FutureUtils.completedVoidFuture();
           }
   
           return jobResultStore.createDirtyResultAsync(
                   new JobResultEntry(JobResult.createFrom(executionGraph)));
       }
   ```
   
   Having the individual methods should help to understand the actual subtasks 
when reading the code. WDYT?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##########
@@ -43,69 +44,70 @@ public interface JobResultStore {
      * Registers the passed {@link JobResultEntry} instance as {@code dirty} 
which indicates that
      * clean-up operations still need to be performed. Once the job resource 
cleanup has been
      * finalized, we can mark the {@code JobResultEntry} as {@code clean} 
result using {@link
-     * #markResultAsClean(JobID)}.
+     * #markResultAsCleanAsync(JobID)}.
      *
      * @param jobResultEntry The job result we wish to persist.
-     * @throws IOException if the creation of the dirty result failed for IO 
reasons.
-     * @throws IllegalStateException if the passed {@code jobResultEntry} has 
a {@code JobID}
-     *     attached that is already registered in this {@code JobResultStore}.
+     * @return a successfully completed future with {@code true} if the dirty 
result is created
+     *     successfully. The method will throw {@link IllegalStateException} 
if the passed {@code
+     *     jobResultEntry} has a {@code JobID} attached that is already 
registered in this {@code
+     *     JobResultStore}.
      */
-    void createDirtyResult(JobResultEntry jobResultEntry) throws IOException, 
IllegalStateException;
+    CompletableFuture<Void> createDirtyResultAsync(JobResultEntry 
jobResultEntry);
 
     /**
      * Marks an existing {@link JobResultEntry} as {@code clean}. This 
indicates that no more
      * resource cleanup steps need to be performed. No actions should be 
triggered if the passed
      * {@code JobID} belongs to a job that was already marked as clean.
      *
      * @param jobId Ident of the job we wish to mark as clean.
-     * @throws IOException if marking the {@code dirty} {@code JobResultEntry} 
as {@code clean}
-     *     failed for IO reasons.
-     * @throws NoSuchElementException if there is no corresponding {@code 
dirty} job present in the
-     *     store for the given {@code JobID}.
+     * @return a successfully completed future if the result is marked 
successfully, The future will
+     *     completed with {@link NoSuchElementException} if there is no 
corresponding {@code dirty}
+     *     job present in the store for the given {@code JobID}.
      */
-    void markResultAsClean(JobID jobId) throws IOException, 
NoSuchElementException;
+    CompletableFuture<Void> markResultAsCleanAsync(JobID jobId);
 
     /**
-     * Returns whether the store already contains an entry for a job.
+     * Returns the future of whether the store already contains an entry for a 
job.
      *
      * @param jobId Ident of the job we wish to check the store for.
-     * @return {@code true} if a {@code dirty} or {@code clean} {@link 
JobResultEntry} exists for
-     *     the given {@code JobID}; otherwise {@code false}.
-     * @throws IOException if determining whether a job entry is present in 
the store failed for IO
-     *     reasons.
+     * @return a successfully completed future with {@code true} if a {@code 
dirty} or {@code clean}
+     *     {@link JobResultEntry} exists for the given {@code JobID}; 
otherwise a successfully
+     *     completed future with {@code false}.

Review Comment:
   ```suggestion
        *     {@link JobResultEntry} exists for the given {@code JobID}; 
otherwise {@code false}.
   ```
   nit: no need to add this - it should be understood



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java:
##########
@@ -181,21 +185,19 @@ public void markResultAsCleanInternal(JobID jobId) throws 
IOException, NoSuchEle
     }
 
     @Override
-    public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws 
IOException {
+    public Boolean hasDirtyJobResultEntryInternal(JobID jobId) throws 
IOException {

Review Comment:
   ```suggestion
       public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws 
IOException {
   ```
   This change is not necessary



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -580,17 +581,13 @@ private boolean isDuplicateJob(JobID jobId) throws 
FlinkException {
      * Checks whether the given job has already been executed.
      *
      * @param jobId identifying the submitted job
-     * @return true if the job has already finished, either successfully or as 
a failure
+     * @return a successfully completed future with {@code true} if the job 
has already finished,
+     *     either successfully or as a failure
      * @throws FlinkException if the job scheduling status cannot be retrieved
      */
-    private boolean isInGloballyTerminalState(JobID jobId) throws 
FlinkException {
-        try {
-            return jobResultStore.hasJobResultEntry(jobId);
-        } catch (IOException e) {
-            throw new FlinkException(
-                    String.format("Failed to retrieve job scheduling status 
for job %s.", jobId),
-                    e);
-        }
+    private CompletableFuture<Boolean> isInGloballyTerminalState(JobID jobId)

Review Comment:
   Here it makes sense to keep the method (even though it's also just calling a 
single other method) because `isInGloballyTerminalState` is more descriptive 
than `jobResultStore.hasJobResultEntryAsync` :+1: 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java:
##########
@@ -57,17 +61,17 @@ public void markResultAsCleanInternal(JobID jobId) throws 
IOException, NoSuchEle
     }
 
     @Override
-    public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws 
IOException {
+    public Boolean hasDirtyJobResultEntryInternal(JobID jobId) {

Review Comment:
   ```suggestion
       public boolean hasDirtyJobResultEntryInternal(JobID jobId) {
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -579,10 +587,15 @@ public void 
testErrorHandlingIfJobCannotBeMarkedAsCleanInJobResultStore() throws
         final CompletableFuture<JobResultEntry> dirtyJobFuture = new 
CompletableFuture<>();
         final JobResultStore jobResultStore =
                 TestingJobResultStore.builder()
-                        
.withCreateDirtyResultConsumer(dirtyJobFuture::complete)
+                        .withCreateDirtyResultConsumer(
+                                jobResultEntry -> {
+                                    dirtyJobFuture.complete(jobResultEntry);
+                                    return FutureUtils.completedVoidFuture();
+                                })
                         .withMarkResultAsCleanConsumer(
                                 jobId -> {
-                                    throw new IOException("Expected 
IOException.");
+                                    return FutureUtils.completedExceptionally(

Review Comment:
   This can be shortened (the brackets are obsolete in a oneliner). Analogously 
to your change above.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##########
@@ -43,69 +44,70 @@ public interface JobResultStore {
      * Registers the passed {@link JobResultEntry} instance as {@code dirty} 
which indicates that
      * clean-up operations still need to be performed. Once the job resource 
cleanup has been
      * finalized, we can mark the {@code JobResultEntry} as {@code clean} 
result using {@link
-     * #markResultAsClean(JobID)}.
+     * #markResultAsCleanAsync(JobID)}.
      *
      * @param jobResultEntry The job result we wish to persist.
-     * @throws IOException if the creation of the dirty result failed for IO 
reasons.
-     * @throws IllegalStateException if the passed {@code jobResultEntry} has 
a {@code JobID}
-     *     attached that is already registered in this {@code JobResultStore}.
+     * @return a successfully completed future with {@code true} if the dirty 
result is created
+     *     successfully. The method will throw {@link IllegalStateException} 
if the passed {@code
+     *     jobResultEntry} has a {@code JobID} attached that is already 
registered in this {@code
+     *     JobResultStore}.
      */
-    void createDirtyResult(JobResultEntry jobResultEntry) throws IOException, 
IllegalStateException;
+    CompletableFuture<Void> createDirtyResultAsync(JobResultEntry 
jobResultEntry);
 
     /**
      * Marks an existing {@link JobResultEntry} as {@code clean}. This 
indicates that no more
      * resource cleanup steps need to be performed. No actions should be 
triggered if the passed
      * {@code JobID} belongs to a job that was already marked as clean.
      *
      * @param jobId Ident of the job we wish to mark as clean.
-     * @throws IOException if marking the {@code dirty} {@code JobResultEntry} 
as {@code clean}
-     *     failed for IO reasons.
-     * @throws NoSuchElementException if there is no corresponding {@code 
dirty} job present in the
-     *     store for the given {@code JobID}.
+     * @return a successfully completed future if the result is marked 
successfully, The future will
+     *     completed with {@link NoSuchElementException} if there is no 
corresponding {@code dirty}
+     *     job present in the store for the given {@code JobID}.

Review Comment:
   We should remove the documentation for the `NoSuchElementException`. That 
should be still part of the contract and, therefore, mentioned in the JavaDoc. 
Could you add this as part of the returned future documentation as an 
additional sentence?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java:
##########


Review Comment:
   Removing the `IOException` from the method signatures in 
`EmbeddedJobResultStore` could happen in a separate hotfix commit.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/JobResultStoreContractTest.java:
##########
@@ -46,72 +47,97 @@ public interface JobResultStoreContractTest {
     @Test
     default void testStoreJobResultsWithDuplicateIDsThrowsException() throws 
IOException {
         JobResultStore jobResultStore = createJobResultStore();
-        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).join();
         final JobResultEntry otherEntryWithDuplicateId =
                 new JobResultEntry(
                         TestingJobResultStore.createSuccessfulJobResult(
                                 DUMMY_JOB_RESULT_ENTRY.getJobId()));
-        assertThatThrownBy(() -> 
jobResultStore.createDirtyResult(otherEntryWithDuplicateId))
-                .isInstanceOf(IllegalStateException.class);
+        assertThatThrownBy(
+                        () ->
+                                jobResultStore
+                                        
.createDirtyResultAsync(otherEntryWithDuplicateId)
+                                        .join())
+                .isInstanceOf(CompletionException.class);
     }
 
     @Test
     default void 
testStoreDirtyEntryForAlreadyCleanedJobResultThrowsException() throws 
IOException {
         JobResultStore jobResultStore = createJobResultStore();
-        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
-        jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
-        assertThatThrownBy(() -> 
jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY))
-                .isInstanceOf(IllegalStateException.class);
+        jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).join();
+        
jobResultStore.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).join();
+        assertThatThrownBy(
+                        () -> 
jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).join())
+                .isInstanceOf(CompletionException.class);
     }
 
     @Test
     default void testCleaningDuplicateEntryThrowsNoException() throws 
IOException {
         JobResultStore jobResultStore = createJobResultStore();
-        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
-        jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
+        jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).join();
+        
jobResultStore.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).join();
         assertThatNoException()
                 .isThrownBy(
-                        () -> 
jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId()));
+                        () ->
+                                jobResultStore
+                                        
.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+                                        .join());
     }
 
     @Test
     default void testCleaningNonExistentEntryThrowsException() throws 
IOException {
         JobResultStore jobResultStore = createJobResultStore();
         assertThatThrownBy(
-                        () -> 
jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId()))
-                .isInstanceOf(NoSuchElementException.class);
+                        () ->
+                                jobResultStore
+                                        
.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+                                        .join())
+                .hasCauseInstanceOf(NoSuchElementException.class);

Review Comment:
   Here you asserted the cause properly. Well done :+1: 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/JobResultStoreContractTest.java:
##########
@@ -46,72 +47,97 @@ public interface JobResultStoreContractTest {
     @Test
     default void testStoreJobResultsWithDuplicateIDsThrowsException() throws 
IOException {
         JobResultStore jobResultStore = createJobResultStore();
-        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).join();
         final JobResultEntry otherEntryWithDuplicateId =
                 new JobResultEntry(
                         TestingJobResultStore.createSuccessfulJobResult(
                                 DUMMY_JOB_RESULT_ENTRY.getJobId()));
-        assertThatThrownBy(() -> 
jobResultStore.createDirtyResult(otherEntryWithDuplicateId))
-                .isInstanceOf(IllegalStateException.class);
+        assertThatThrownBy(
+                        () ->
+                                jobResultStore
+                                        
.createDirtyResultAsync(otherEntryWithDuplicateId)
+                                        .join())
+                .isInstanceOf(CompletionException.class);

Review Comment:
   we should also validate the cause of 
   ```suggestion
                   .isInstanceOf(CompletionException.class)
                   .hasCauseInstanceOf(IllegalStateException.class);
   ```
   We should also assert for the cause of the `CompletionException` because 
that's the actual business logic. This statement applies also to the other 
methods in this test class. I'm not gonna mark all of them individually.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -332,11 +332,16 @@ public void testJobBeingMarkedAsDirtyBeforeCleanup() 
throws Exception {
                                 TestingJobResultStore.builder()
                                         .withCreateDirtyResultConsumer(
                                                 ignoredJobResultEntry -> {
+                                                    CompletableFuture<Boolean> 
result =
+                                                            new 
CompletableFuture<>();
                                                     try {
                                                         
markAsDirtyLatch.await();
                                                     } catch 
(InterruptedException e) {
-                                                        throw new 
RuntimeException(e);
+                                                        
result.completeExceptionally(
+                                                                new 
RuntimeException(e));

Review Comment:
   We still should call `Thread.currentThread().interrupt()` before returning 
the exceptionally completed future. Something like that:
   ```
                                                       try {
                                                           
markAsDirtyLatch.await();
                                                       } catch 
(InterruptedException e) {
                                                           
Thread.currentThread().interrupt();
                                                           return 
FutureUtils.completedExceptionally(e);
                                                       }
                                                       return 
FutureUtils.completedVoidFuture();
   ```
   No local variable `result` is needed here.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java:
##########
@@ -116,58 +111,53 @@ public static TestingJobResultStore.Builder builder() {
     /** {@code Builder} for instantiating {@code TestingJobResultStore} 
instances. */
     public static class Builder {
 
-        private ThrowingConsumer<JobResultEntry, ? extends IOException> 
createDirtyResultConsumer =
-                ignored -> {};
-        private ThrowingConsumer<JobID, ? extends IOException> 
markResultAsCleanConsumer =
-                ignored -> {};
+        private Function<JobResultEntry, CompletableFuture<Void>> 
createDirtyResultConsumer =
+                jobResultEntry -> CompletableFuture.completedFuture(null);
+        private Function<JobID, CompletableFuture<Void>> 
markResultAsCleanConsumer =
+                jobID -> CompletableFuture.completedFuture(null);
 
-        private FunctionWithException<JobID, Boolean, ? extends IOException>
-                hasJobResultEntryFunction = ignored -> false;
-        private FunctionWithException<JobID, Boolean, ? extends IOException>
-                hasDirtyJobResultEntryFunction = ignored -> false;
-        private FunctionWithException<JobID, Boolean, ? extends IOException>
-                hasCleanJobResultEntryFunction = ignored -> false;
+        private Function<JobID, CompletableFuture<Boolean>> 
hasJobResultEntryFunction =
+                jobID -> CompletableFuture.completedFuture(false);
+        private Function<JobID, CompletableFuture<Boolean>> 
hasDirtyJobResultEntryFunction =
+                jobID -> CompletableFuture.completedFuture(false);
+        private Function<JobID, CompletableFuture<Boolean>> 
hasCleanJobResultEntryFunction =
+                jobID -> CompletableFuture.completedFuture(false);
 
-        private SupplierWithException<Set<JobResult>, ? extends IOException>
-                getDirtyResultsSupplier = Collections::emptySet;
+        private Supplier<Set<JobResult>> dirtyResultsSupplier = 
Collections::emptySet;
 
         public Builder withCreateDirtyResultConsumer(
-                ThrowingConsumer<JobResultEntry, ? extends IOException> 
createDirtyResultConsumer) {
+                Function<JobResultEntry, CompletableFuture<Void>> 
createDirtyResultConsumer) {
             this.createDirtyResultConsumer = createDirtyResultConsumer;
             return this;
         }
 
         public Builder withMarkResultAsCleanConsumer(
-                ThrowingConsumer<JobID, ? extends IOException> 
markResultAsCleanConsumer) {
+                Function<JobID, CompletableFuture<Void>> 
markResultAsCleanConsumer) {
             this.markResultAsCleanConsumer = markResultAsCleanConsumer;
             return this;
         }
 
         public Builder withHasJobResultEntryFunction(
-                FunctionWithException<JobID, Boolean, ? extends IOException>
-                        hasJobResultEntryFunction) {
+                Function<JobID, CompletableFuture<Boolean>> 
hasJobResultEntryFunction) {
             this.hasJobResultEntryFunction = hasJobResultEntryFunction;
             return this;
         }
 
         public Builder withHasDirtyJobResultEntryFunction(
-                FunctionWithException<JobID, Boolean, ? extends IOException>
-                        hasDirtyJobResultEntryFunction) {
+                Function<JobID, CompletableFuture<Boolean>> 
hasDirtyJobResultEntryFunction) {
             this.hasDirtyJobResultEntryFunction = 
hasDirtyJobResultEntryFunction;
             return this;
         }
 
         public Builder withHasCleanJobResultEntryFunction(
-                FunctionWithException<JobID, Boolean, ? extends IOException>
-                        hasCleanJobResultEntryFunction) {
+                Function<JobID, CompletableFuture<Boolean>> 
hasCleanJobResultEntryFunction) {
             this.hasCleanJobResultEntryFunction = 
hasCleanJobResultEntryFunction;
             return this;
         }
 
         public Builder withGetDirtyResultsSupplier(
-                SupplierWithException<Set<JobResult>, ? extends IOException>
-                        getDirtyResultsSupplier) {
-            this.getDirtyResultsSupplier = getDirtyResultsSupplier;
+                Supplier<Set<JobResult>> getDirtyResultsSupplier) {

Review Comment:
   I guess the `getDirtyResults` field doesn't need to change.



##########
flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java:
##########
@@ -381,9 +381,12 @@ public static void rethrowException(Throwable t) throws 
Exception {
      * @param e exception to throw if not null.
      * @throws Exception
      */
-    public static void tryRethrowException(@Nullable Exception e) throws 
Exception {
+    public static void tryRethrowException(@Nullable Throwable e) throws 
Exception {

Review Comment:
   This change might not be necessary with the proposed changes below. 
:thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to