XComp commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1284412779


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -561,21 +563,28 @@ public CompletableFuture<Acknowledge> submitFailedJob(
         return archiveExecutionGraphToHistoryServer(executionGraphInfo);
     }
 
+    /**
+     * Checks whether the given job has already been submitted, executed, or 
awaiting termination.
+     *
+     * @param jobId identifying the submitted job
+     * @return true if the job has already been submitted (is running) or has 
been executed
+     * @throws Exception if the job scheduling status cannot be retrieved
+     */
+    private boolean isDuplicateJob(JobID jobId) throws Exception {
+        return isInGloballyTerminalState(jobId).get()
+                || jobManagerRunnerRegistry.isRegistered(jobId)
+                || submittedAndWaitingTerminationJobIDs.contains(jobId);
+    }
+

Review Comment:
   This was accidentally re-added when rebasing to most-recent `master`. It can 
be removed again.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java:
##########
@@ -112,10 +141,15 @@ private void withWriteLock(ThrowingRunnable<IOException> 
runnable) throws IOExce
         }
     }
 
-    private <T> T withReadLock(SupplierWithException<T, IOException> runnable) 
throws IOException {
+    private <T> CompletableFuture<T> withReadLockAsync(
+            SupplierWithException<T, IOException> runnable) {

Review Comment:
   ```suggestion
               SupplierWithException<T, IOException> supplier) {
   ```
   copy&paste error on my side. You changed it for `withReadLock` already. :+1: 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java:
##########


Review Comment:
   The commit message should have a `[hotfix][runtime]` prefix instead of being 
labeled with `[FLINK-27204]`. The change is indepenent of FLINK-27204 (i.e. 
more like a code cleanup that is shipped along with FLINK-27204). Think of it 
like that: The hotfix commit wouldn't need to be reverted when reverting 
FLINK-27204 because it's still a valid change to improve the code.
   
   Additionally, you only cleaned `markResultAsCleanInternal` into the hotfix 
commit. `hasDirtyJobResultEntryInternal`, `hasCleanJobResultEntryInternal` and 
`getDirtyResultsInternal` were cleaned as well but in the wrong commit. These 
three method changes should end up in the hotfix commit as well.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java:
##########
@@ -116,46 +123,74 @@ public void 
testBaseDirectoryCreationOnResultStoreInitialization() throws Except
         assertThat(emptyBaseDirectory).doesNotExist();
 
         fileSystemJobResultStore =
-                new FileSystemJobResultStore(basePath.getFileSystem(), 
basePath, false);
+                new FileSystemJobResultStore(
+                        basePath.getFileSystem(), basePath, false, 
manuallyTriggeredExecutor);
         // Result store operations are creating the base directory on-the-fly
         assertThat(emptyBaseDirectory).doesNotExist();
-        fileSystemJobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        CompletableFuture<Void> dirtyResultAsync =
+                
fileSystemJobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY);
+        manuallyTriggeredExecutor.triggerAll();
+        dirtyResultAsync.get();
         assertThat(emptyBaseDirectory).exists().isDirectory();

Review Comment:
   I noticed that we could extend the tests to check the async nature: We could 
put the same (but inverted) assert in front of the trigger statement. That way 
we check that no synchronous activity happens. The same also applies to the 
other tests.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/JobResultStoreContractTest.java:
##########
@@ -46,72 +47,97 @@ public interface JobResultStoreContractTest {
     @Test
     default void testStoreJobResultsWithDuplicateIDsThrowsException() throws 
IOException {
         JobResultStore jobResultStore = createJobResultStore();
-        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).join();
         final JobResultEntry otherEntryWithDuplicateId =
                 new JobResultEntry(
                         TestingJobResultStore.createSuccessfulJobResult(
                                 DUMMY_JOB_RESULT_ENTRY.getJobId()));
-        assertThatThrownBy(() -> 
jobResultStore.createDirtyResult(otherEntryWithDuplicateId))
-                .isInstanceOf(IllegalStateException.class);
+        assertThatThrownBy(
+                        () ->
+                                jobResultStore
+                                        
.createDirtyResultAsync(otherEntryWithDuplicateId)
+                                        .join())
+                .isInstanceOf(CompletionException.class);

Review Comment:
   You didn't apply this in all appearances in `JobResultStoreContractTest`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##########
@@ -43,62 +44,62 @@ public interface JobResultStore {
      * Registers the passed {@link JobResultEntry} instance as {@code dirty} 
which indicates that
      * clean-up operations still need to be performed. Once the job resource 
cleanup has been
      * finalized, we can mark the {@code JobResultEntry} as {@code clean} 
result using {@link
-     * #markResultAsClean(JobID)}.
+     * #markResultAsCleanAsync(JobID)}.
      *
      * @param jobResultEntry The job result we wish to persist.
-     * @throws IOException if the creation of the dirty result failed for IO 
reasons.
-     * @throws IllegalStateException if the passed {@code jobResultEntry} has 
a {@code JobID}
-     *     attached that is already registered in this {@code JobResultStore}.
+     * @return a successfully completed future with {@code true} if the dirty 
result is created
+     *     successfully. The future will be completed with {@link 
IllegalStateException} if the
+     *     passed {@code jobResultEntry} has a {@code JobID} attached that is 
already registered in
+     *     this {@code JobResultStore}.
      */
-    void createDirtyResult(JobResultEntry jobResultEntry) throws IOException, 
IllegalStateException;
+    CompletableFuture<Void> createDirtyResultAsync(JobResultEntry 
jobResultEntry);
 
     /**
      * Marks an existing {@link JobResultEntry} as {@code clean}. This 
indicates that no more
      * resource cleanup steps need to be performed. No actions should be 
triggered if the passed
      * {@code JobID} belongs to a job that was already marked as clean.
      *
      * @param jobId Ident of the job we wish to mark as clean.
-     * @throws IOException if marking the {@code dirty} {@code JobResultEntry} 
as {@code clean}
-     *     failed for IO reasons.
-     * @throws NoSuchElementException if there is no corresponding {@code 
dirty} job present in the
-     *     store for the given {@code JobID}.
+     * @return a successfully completed future if the result is marked 
successfully, The future can
+     *     also completed with {@link NoSuchElementException}. i.e. there is 
no corresponding {@code

Review Comment:
   ```suggestion
        * @return a successfully completed future if the result is marked 
successfully. The future can
        *     complete exceptionally with a {@link NoSuchElementException}. 
i.e. there is no corresponding {@code
   ```
   nitty nit: we might want to make it more explicit that the future completes 
"exceptionally"



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java:
##########
@@ -43,66 +46,92 @@ public abstract class AbstractThreadsafeJobResultStore 
implements JobResultStore
 
     private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
 
-    @Override
-    public void createDirtyResult(JobResultEntry jobResultEntry) throws 
IOException {
-        Preconditions.checkState(
-                !hasJobResultEntry(jobResultEntry.getJobId()),
-                "Job result store already contains an entry for job %s",
-                jobResultEntry.getJobId());
+    private final Executor ioExecutor;
 
-        withWriteLock(() -> createDirtyResultInternal(jobResultEntry));
+    protected AbstractThreadsafeJobResultStore(Executor ioExecutor) {
+        this.ioExecutor = ioExecutor;
+    }
+
+    @Override
+    public CompletableFuture<Void> createDirtyResultAsync(JobResultEntry 
jobResultEntry) {
+        return hasJobResultEntryAsync(jobResultEntry.getJobId())
+                .thenAccept(
+                        hasJobResultEntry ->
+                                Preconditions.checkState(
+                                        !hasJobResultEntry,
+                                        "Job result store already contains an 
entry for job %s",
+                                        jobResultEntry.getJobId()))
+                .thenCompose(
+                        ignoredVoid ->
+                                withWriteLockAsync(
+                                        () -> 
createDirtyResultInternal(jobResultEntry)));
     }
 
     @GuardedBy("readWriteLock")
     protected abstract void createDirtyResultInternal(JobResultEntry 
jobResultEntry)
             throws IOException;
 
     @Override
-    public void markResultAsClean(JobID jobId) throws IOException, 
NoSuchElementException {
-        if (hasCleanJobResultEntry(jobId)) {
-            LOG.debug("The job {} is already marked as clean. No action 
required.", jobId);
-            return;
-        }
-
-        withWriteLock(() -> markResultAsCleanInternal(jobId));
+    public CompletableFuture<Void> markResultAsCleanAsync(JobID jobId) {
+        return hasCleanJobResultEntryAsync(jobId)
+                .thenCompose(
+                        hasCleanJobResultEntry -> {
+                            if (hasCleanJobResultEntry) {
+                                LOG.debug(
+                                        "The job {} is already marked as 
clean. No action required.",
+                                        jobId);
+                                return FutureUtils.completedVoidFuture();
+                            }
+
+                            return withWriteLockAsync(() -> 
markResultAsCleanInternal(jobId));
+                        });
     }
 
     @GuardedBy("readWriteLock")
     protected abstract void markResultAsCleanInternal(JobID jobId)
             throws IOException, NoSuchElementException;
 
     @Override
-    public boolean hasJobResultEntry(JobID jobId) throws IOException {
-        return withReadLock(
+    public CompletableFuture<Boolean> hasJobResultEntryAsync(JobID jobId) {
+        return withReadLockAsync(
                 () ->
                         hasDirtyJobResultEntryInternal(jobId)
                                 || hasCleanJobResultEntryInternal(jobId));
     }
 
     @Override
-    public boolean hasDirtyJobResultEntry(JobID jobId) throws IOException {
-        return withReadLock(() -> hasDirtyJobResultEntryInternal(jobId));
+    public CompletableFuture<Boolean> hasDirtyJobResultEntryAsync(JobID jobId) 
{
+        return withReadLockAsync(() -> hasDirtyJobResultEntryInternal(jobId));
     }
 
     @GuardedBy("readWriteLock")
     protected abstract boolean hasDirtyJobResultEntryInternal(JobID jobId) 
throws IOException;
 
     @Override
-    public boolean hasCleanJobResultEntry(JobID jobId) throws IOException {
-        return withReadLock(() -> hasCleanJobResultEntryInternal(jobId));
+    public CompletableFuture<Boolean> hasCleanJobResultEntryAsync(JobID jobId) 
{
+        return withReadLockAsync(() -> hasCleanJobResultEntryInternal(jobId));
     }
 
     @GuardedBy("readWriteLock")
     protected abstract boolean hasCleanJobResultEntryInternal(JobID jobId) 
throws IOException;
 
     @Override
     public Set<JobResult> getDirtyResults() throws IOException {
-        return withReadLock(this::getDirtyResultsInternal);
+        return getDirtyResultsInternal();

Review Comment:
   ```suggestion
           return withReadLock(this::getDirtyResultsInternal());
   ```
   Yikes, that's my bad: We should remove the lock here.



##########
flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java:
##########
@@ -381,9 +381,12 @@ public static void rethrowException(Throwable t) throws 
Exception {
      * @param e exception to throw if not null.
      * @throws Exception
      */
-    public static void tryRethrowException(@Nullable Exception e) throws 
Exception {
+    public static void tryRethrowException(@Nullable Throwable e) throws 
Exception {

Review Comment:
   We can revert the change of `ExceptionUtils`. We're not using it anymore, as 
far as I can tell.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java:
##########
@@ -116,46 +123,74 @@ public void 
testBaseDirectoryCreationOnResultStoreInitialization() throws Except
         assertThat(emptyBaseDirectory).doesNotExist();
 
         fileSystemJobResultStore =
-                new FileSystemJobResultStore(basePath.getFileSystem(), 
basePath, false);
+                new FileSystemJobResultStore(
+                        basePath.getFileSystem(), basePath, false, 
manuallyTriggeredExecutor);
         // Result store operations are creating the base directory on-the-fly
         assertThat(emptyBaseDirectory).doesNotExist();
-        fileSystemJobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        CompletableFuture<Void> dirtyResultAsync =
+                
fileSystemJobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY);
+        manuallyTriggeredExecutor.triggerAll();
+        dirtyResultAsync.get();

Review Comment:
   ```suggestion
       FlinkAssertions.assertThatFuture(dirtyResultAsync).eventuallySucceeds();
   ```
   nit: `FlinkAssertions.assertThatFuture` is a handy utility to make 
future-based testing more readable.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java:
##########
@@ -54,11 +57,15 @@ public class FileSystemJobResultStoreFileOperationsTest {
 
     private Path basePath;
 
+    final ManuallyTriggeredScheduledExecutor manuallyTriggeredExecutor =

Review Comment:
   ```suggestion
       private final ManuallyTriggeredScheduledExecutor 
manuallyTriggeredExecutor =
   ```
   Looks like that one slipped through in the previous review. Is there a 
reason why we don't make that one `private`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java:
##########
@@ -43,66 +46,92 @@ public abstract class AbstractThreadsafeJobResultStore 
implements JobResultStore
 
     private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
 
-    @Override
-    public void createDirtyResult(JobResultEntry jobResultEntry) throws 
IOException {
-        Preconditions.checkState(
-                !hasJobResultEntry(jobResultEntry.getJobId()),
-                "Job result store already contains an entry for job %s",
-                jobResultEntry.getJobId());
+    private final Executor ioExecutor;
 
-        withWriteLock(() -> createDirtyResultInternal(jobResultEntry));
+    protected AbstractThreadsafeJobResultStore(Executor ioExecutor) {
+        this.ioExecutor = ioExecutor;
+    }
+
+    @Override
+    public CompletableFuture<Void> createDirtyResultAsync(JobResultEntry 
jobResultEntry) {
+        return hasJobResultEntryAsync(jobResultEntry.getJobId())
+                .thenAccept(
+                        hasJobResultEntry ->
+                                Preconditions.checkState(
+                                        !hasJobResultEntry,
+                                        "Job result store already contains an 
entry for job %s",
+                                        jobResultEntry.getJobId()))
+                .thenCompose(
+                        ignoredVoid ->
+                                withWriteLockAsync(
+                                        () -> 
createDirtyResultInternal(jobResultEntry)));
     }
 
     @GuardedBy("readWriteLock")
     protected abstract void createDirtyResultInternal(JobResultEntry 
jobResultEntry)
             throws IOException;
 
     @Override
-    public void markResultAsClean(JobID jobId) throws IOException, 
NoSuchElementException {
-        if (hasCleanJobResultEntry(jobId)) {
-            LOG.debug("The job {} is already marked as clean. No action 
required.", jobId);
-            return;
-        }
-
-        withWriteLock(() -> markResultAsCleanInternal(jobId));
+    public CompletableFuture<Void> markResultAsCleanAsync(JobID jobId) {
+        return hasCleanJobResultEntryAsync(jobId)
+                .thenCompose(
+                        hasCleanJobResultEntry -> {
+                            if (hasCleanJobResultEntry) {
+                                LOG.debug(
+                                        "The job {} is already marked as 
clean. No action required.",
+                                        jobId);
+                                return FutureUtils.completedVoidFuture();
+                            }
+
+                            return withWriteLockAsync(() -> 
markResultAsCleanInternal(jobId));
+                        });
     }
 
     @GuardedBy("readWriteLock")
     protected abstract void markResultAsCleanInternal(JobID jobId)
             throws IOException, NoSuchElementException;
 
     @Override
-    public boolean hasJobResultEntry(JobID jobId) throws IOException {
-        return withReadLock(
+    public CompletableFuture<Boolean> hasJobResultEntryAsync(JobID jobId) {
+        return withReadLockAsync(
                 () ->
                         hasDirtyJobResultEntryInternal(jobId)
                                 || hasCleanJobResultEntryInternal(jobId));
     }
 
     @Override
-    public boolean hasDirtyJobResultEntry(JobID jobId) throws IOException {
-        return withReadLock(() -> hasDirtyJobResultEntryInternal(jobId));
+    public CompletableFuture<Boolean> hasDirtyJobResultEntryAsync(JobID jobId) 
{
+        return withReadLockAsync(() -> hasDirtyJobResultEntryInternal(jobId));
     }
 
     @GuardedBy("readWriteLock")
     protected abstract boolean hasDirtyJobResultEntryInternal(JobID jobId) 
throws IOException;
 
     @Override
-    public boolean hasCleanJobResultEntry(JobID jobId) throws IOException {
-        return withReadLock(() -> hasCleanJobResultEntryInternal(jobId));
+    public CompletableFuture<Boolean> hasCleanJobResultEntryAsync(JobID jobId) 
{
+        return withReadLockAsync(() -> hasCleanJobResultEntryInternal(jobId));
     }
 
     @GuardedBy("readWriteLock")
     protected abstract boolean hasCleanJobResultEntryInternal(JobID jobId) 
throws IOException;
 
     @Override
     public Set<JobResult> getDirtyResults() throws IOException {
-        return withReadLock(this::getDirtyResultsInternal);
+        return getDirtyResultsInternal();
     }
 
     @GuardedBy("readWriteLock")
     protected abstract Set<JobResult> getDirtyResultsInternal() throws 
IOException;
 
+    private CompletableFuture<Void> 
withWriteLockAsync(ThrowingRunnable<IOException> runnable) {
+        return FutureUtils.supplyAsync(

Review Comment:
   ```suggestion
           return FutureUtils.runAsync(
   ```
   `runAsync` is good enough here. We're not returning any value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to