XComp commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1206775003


##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##########
@@ -43,69 +44,69 @@ public interface JobResultStore {
      * Registers the passed {@link JobResultEntry} instance as {@code dirty} 
which indicates that
      * clean-up operations still need to be performed. Once the job resource 
cleanup has been
      * finalized, we can mark the {@code JobResultEntry} as {@code clean} 
result using {@link
-     * #markResultAsClean(JobID)}.
+     * #markResultAsCleanAsync(JobID)}.
      *
      * @param jobResultEntry The job result we wish to persist.
+     * @return CompletableFuture with the completed state.
      * @throws IOException if the creation of the dirty result failed for IO 
reasons.
      * @throws IllegalStateException if the passed {@code jobResultEntry} has 
a {@code JobID}
      *     attached that is already registered in this {@code JobResultStore}.
      */
-    void createDirtyResult(JobResultEntry jobResultEntry) throws IOException, 
IllegalStateException;
+    CompletableFuture<Void> createDirtyResultAsync(JobResultEntry 
jobResultEntry);
 
     /**
      * Marks an existing {@link JobResultEntry} as {@code clean}. This 
indicates that no more
      * resource cleanup steps need to be performed. No actions should be 
triggered if the passed
      * {@code JobID} belongs to a job that was already marked as clean.
      *
      * @param jobId Ident of the job we wish to mark as clean.
+     * @return CompletableFuture with the completed state.
      * @throws IOException if marking the {@code dirty} {@code JobResultEntry} 
as {@code clean}
      *     failed for IO reasons.
      * @throws NoSuchElementException if there is no corresponding {@code 
dirty} job present in the
      *     store for the given {@code JobID}.

Review Comment:
   The `IOException` can be removed from the javaDoc now. It's going to be 
wrapped by the `CompletableFuture`. But we might want to mentioned the 
`NoSuchElementException` as part of the return value's documentation, still.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##########
@@ -43,69 +44,69 @@ public interface JobResultStore {
      * Registers the passed {@link JobResultEntry} instance as {@code dirty} 
which indicates that
      * clean-up operations still need to be performed. Once the job resource 
cleanup has been
      * finalized, we can mark the {@code JobResultEntry} as {@code clean} 
result using {@link
-     * #markResultAsClean(JobID)}.
+     * #markResultAsCleanAsync(JobID)}.
      *
      * @param jobResultEntry The job result we wish to persist.
+     * @return CompletableFuture with the completed state.
      * @throws IOException if the creation of the dirty result failed for IO 
reasons.
      * @throws IllegalStateException if the passed {@code jobResultEntry} has 
a {@code JobID}
      *     attached that is already registered in this {@code JobResultStore}.

Review Comment:
   The IOException can be removed from the javaDoc now. It's going to be 
wrapped by the CompletableFuture. But we might want to mentioned the 
`IllegalStateException` as part of the return value's documentation, still.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java:
##########
@@ -44,64 +45,84 @@ public abstract class AbstractThreadsafeJobResultStore 
implements JobResultStore
     private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
 
     @Override
-    public void createDirtyResult(JobResultEntry jobResultEntry) throws 
IOException {
-        Preconditions.checkState(
-                !hasJobResultEntry(jobResultEntry.getJobId()),
-                "Job result store already contains an entry for job %s",
-                jobResultEntry.getJobId());
-
-        withWriteLock(() -> createDirtyResultInternal(jobResultEntry));
+    public CompletableFuture<Void> createDirtyResultAsync(JobResultEntry 
jobResultEntry) {
+        return hasJobResultEntryAsync(jobResultEntry.getJobId())
+                .handle(
+                        (hasResult, error) -> {
+                            if (error != null || hasResult) {
+                                ExceptionUtils.rethrow(error);
+                            }
+                            try {
+                                withWriteLock(() -> 
createDirtyResultInternal(jobResultEntry));

Review Comment:
   You have to be careful here: The internal methods are now only triggering 
the actual logic in the `ioExecutor`. Therefore, only submitting the task is 
guarded under the lock. You're losing the synchronization of the IO tasks which 
will run in multiple threads of the ioExecutor for the 
`FileSystemJobResultStore` implementation.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1361,28 +1363,35 @@ private CompletableFuture<CleanupJobState> 
registerGloballyTerminatedJobInJobRes
                 "Job %s is in state %s which is not globally terminal.",
                 jobId,
                 terminalJobStatus);
-
-        ioExecutor.execute(
-                () -> {
-                    try {
-                        if (jobResultStore.hasCleanJobResultEntry(jobId)) {
-                            log.warn(
-                                    "Job {} is already marked as clean but 
clean up was triggered again.",
-                                    jobId);
-                        } else if 
(!jobResultStore.hasDirtyJobResultEntry(jobId)) {
-                            jobResultStore.createDirtyResult(
-                                    new JobResultEntry(
-                                            
JobResult.createFrom(archivedExecutionGraph)));
-                            log.info(
-                                    "Job {} has been registered for cleanup in 
the JobResultStore after reaching a terminal state.",
-                                    jobId);
-                        }
-                    } catch (IOException e) {
-                        writeFuture.completeExceptionally(e);
-                        return;
-                    }
-                    writeFuture.complete(null);
-                });
+        jobResultStore
+                .hasCleanJobResultEntryAsync(jobId)
+                .handleAsync(

Review Comment:
   That's just a thought without having proven that, actuall, but: Would the 
code become more readible if we utilize the `CompletableFuture` methods (e.g. 
`thenCompose`)? :thinking: 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -575,12 +576,13 @@ private boolean isDuplicateJob(JobID jobId) throws 
FlinkException {
      */
     private boolean isInGloballyTerminalState(JobID jobId) throws 
FlinkException {
         try {
-            return jobResultStore.hasJobResultEntry(jobId);
-        } catch (IOException e) {
-            throw new FlinkException(
-                    String.format("Failed to retrieve job scheduling status 
for job %s.", jobId),
-                    e);
+            return jobResultStore.hasJobResultEntryAsync(jobId).get();

Review Comment:
   We're not gaining much here because the call is still blocking (essentially, 
changing the return value of `isInGloballyTerminalState` from `boolean` to 
CompletableFuture<Boolean>`): We would have to propagate the CompletableFuture 
down the call hierarchy to make use of it. 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/JobResultStoreContractTest.java:
##########
@@ -44,106 +45,140 @@ public interface JobResultStoreContractTest {
     JobResultStore createJobResultStore() throws IOException;
 
     @Test
-    default void testStoreJobResultsWithDuplicateIDsThrowsException() throws 
IOException {
+    default void testStoreJobResultsWithDuplicateIDsThrowsException()
+            throws IOException, ExecutionException, InterruptedException {
         JobResultStore jobResultStore = createJobResultStore();
-        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
         final JobResultEntry otherEntryWithDuplicateId =
                 new JobResultEntry(
                         TestingJobResultStore.createSuccessfulJobResult(
                                 DUMMY_JOB_RESULT_ENTRY.getJobId()));
-        assertThatThrownBy(() -> 
jobResultStore.createDirtyResult(otherEntryWithDuplicateId))
-                .isInstanceOf(IllegalStateException.class);
+        assertThatThrownBy(
+                        () ->
+                                jobResultStore
+                                        
.createDirtyResultAsync(otherEntryWithDuplicateId)
+                                        .get())
+                .hasCauseInstanceOf(RuntimeException.class);
     }
 
     @Test
-    default void 
testStoreDirtyEntryForAlreadyCleanedJobResultThrowsException() throws 
IOException {
+    default void testStoreDirtyEntryForAlreadyCleanedJobResultThrowsException()
+            throws IOException, ExecutionException, InterruptedException {
         JobResultStore jobResultStore = createJobResultStore();
-        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
-        jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
-        assertThatThrownBy(() -> 
jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY))
-                .isInstanceOf(IllegalStateException.class);
+        jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
+        
jobResultStore.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).get();
+        assertThatThrownBy(
+                        () -> 
jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get())
+                .hasCauseInstanceOf(RuntimeException.class);
     }
 
     @Test
-    default void testCleaningDuplicateEntryThrowsNoException() throws 
IOException {
+    default void testCleaningDuplicateEntryThrowsNoException()
+            throws IOException, ExecutionException, InterruptedException {
         JobResultStore jobResultStore = createJobResultStore();
-        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
-        jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
+        jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
+        
jobResultStore.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).get();
         assertThatNoException()
                 .isThrownBy(
-                        () -> 
jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId()));
+                        () ->
+                                jobResultStore
+                                        
.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+                                        .get());
     }
 
     @Test
     default void testCleaningNonExistentEntryThrowsException() throws 
IOException {
         JobResultStore jobResultStore = createJobResultStore();
         assertThatThrownBy(
-                        () -> 
jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId()))
-                .isInstanceOf(NoSuchElementException.class);
+                        () ->
+                                jobResultStore
+                                        
.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+                                        .get())
+                .hasCauseInstanceOf(NoSuchElementException.class);
     }
 
     @Test
-    default void testHasJobResultEntryWithDirtyEntry() throws IOException {
+    default void testHasJobResultEntryWithDirtyEntry()
+            throws IOException, ExecutionException, InterruptedException {
         JobResultStore jobResultStore = createJobResultStore();
-        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
-        
assertThat(jobResultStore.hasDirtyJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+        jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
+        assertThat(
+                        jobResultStore
+                                
.hasDirtyJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+                                .get())
                 .isTrue();
-        
assertThat(jobResultStore.hasCleanJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+        assertThat(
+                        jobResultStore
+                                
.hasCleanJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+                                .get())
                 .isFalse();
-        
assertThat(jobResultStore.hasJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId())).isTrue();
+        
assertThat(jobResultStore.hasJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).get())
+                .isTrue();
     }
 
     @Test
-    default void testHasJobResultEntryWithCleanEntry() throws IOException {
+    default void testHasJobResultEntryWithCleanEntry()
+            throws IOException, ExecutionException, InterruptedException {
         JobResultStore jobResultStore = createJobResultStore();
-        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
-        jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
-        
assertThat(jobResultStore.hasDirtyJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+        jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
+        
jobResultStore.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).get();
+        assertThat(
+                        jobResultStore
+                                
.hasDirtyJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+                                .get())
                 .isFalse();
-        
assertThat(jobResultStore.hasCleanJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+        assertThat(
+                        jobResultStore
+                                
.hasCleanJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+                                .get())
+                .isTrue();
+        
assertThat(jobResultStore.hasJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).get())
                 .isTrue();
-        
assertThat(jobResultStore.hasJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId())).isTrue();
     }
 
     @Test
-    default void testHasJobResultEntryWithEmptyStore() throws IOException {
+    default void testHasJobResultEntryWithEmptyStore()
+            throws IOException, ExecutionException, InterruptedException {
         JobResultStore jobResultStore = createJobResultStore();
         JobID jobId = new JobID();
-        assertThat(jobResultStore.hasDirtyJobResultEntry(jobId)).isFalse();
-        assertThat(jobResultStore.hasCleanJobResultEntry(jobId)).isFalse();
-        assertThat(jobResultStore.hasJobResultEntry(jobId)).isFalse();
+        
assertThat(jobResultStore.hasDirtyJobResultEntryAsync(jobId).get()).isFalse();
+        
assertThat(jobResultStore.hasCleanJobResultEntryAsync(jobId).get()).isFalse();
+        
assertThat(jobResultStore.hasJobResultEntryAsync(jobId).get()).isFalse();
     }
 
     @Test
-    default void testGetDirtyResultsWithNoEntry() throws IOException {
+    default void testGetDirtyResultsWithNoEntry()
+            throws IOException, ExecutionException, InterruptedException {
         JobResultStore jobResultStore = createJobResultStore();
-        assertThat(jobResultStore.getDirtyResults()).isEmpty();
+        assertThat(jobResultStore.getDirtyResultsAsync().get()).isEmpty();
     }
 
     @Test
-    default void testGetDirtyResultsWithDirtyEntry() throws IOException {
+    default void testGetDirtyResultsWithDirtyEntry()
+            throws IOException, ExecutionException, InterruptedException {
         JobResultStore jobResultStore = createJobResultStore();
-        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+        jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
         assertThat(
-                        jobResultStore.getDirtyResults().stream()
+                        jobResultStore.getDirtyResultsAsync().get().stream()
                                 .map(JobResult::getJobId)
                                 .collect(Collectors.toList()))
                 .singleElement()
                 .isEqualTo(DUMMY_JOB_RESULT_ENTRY.getJobId());
     }
 
     @Test
-    default void testGetDirtyResultsWithDirtyAndCleanEntry() throws 
IOException {
+    default void testGetDirtyResultsWithDirtyAndCleanEntry()
+            throws IOException, ExecutionException, InterruptedException {
         JobResultStore jobResultStore = createJobResultStore();
-        jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
-        jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
+        jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();

Review Comment:
   one tip: Using `join()` instead of `get()` reduces the diff for test methods 
because you wouldn't have to specifying `ExecutionException` and 
`InterruptedException` in the test method's signature.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java:
##########
@@ -191,8 +191,8 @@ private Collection<JobResult> getDirtyJobResultsIfRunning() 
{
 
     private Collection<JobResult> getDirtyJobResults() {
         try {
-            return jobResultStore.getDirtyResults();
-        } catch (IOException e) {
+            return jobResultStore.getDirtyResultsAsync().get();

Review Comment:
   Here, it's more obvious. We're making this call blocking again just to 
generate a CompletableFuture out of it again in line 124. Instead, we could 
propagate the `CompletableFuture`. WDYT?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java:
##########
@@ -44,64 +45,84 @@ public abstract class AbstractThreadsafeJobResultStore 
implements JobResultStore
     private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
 
     @Override
-    public void createDirtyResult(JobResultEntry jobResultEntry) throws 
IOException {
-        Preconditions.checkState(
-                !hasJobResultEntry(jobResultEntry.getJobId()),
-                "Job result store already contains an entry for job %s",
-                jobResultEntry.getJobId());
-
-        withWriteLock(() -> createDirtyResultInternal(jobResultEntry));
+    public CompletableFuture<Void> createDirtyResultAsync(JobResultEntry 
jobResultEntry) {
+        return hasJobResultEntryAsync(jobResultEntry.getJobId())
+                .handle(
+                        (hasResult, error) -> {
+                            if (error != null || hasResult) {
+                                ExceptionUtils.rethrow(error);
+                            }
+                            try {
+                                withWriteLock(() -> 
createDirtyResultInternal(jobResultEntry));
+                            } catch (IOException e) {
+                                ExceptionUtils.rethrow(error);
+                            }
+                            return null;
+                        });
     }
 
     @GuardedBy("readWriteLock")
     protected abstract void createDirtyResultInternal(JobResultEntry 
jobResultEntry)
             throws IOException;
 
     @Override
-    public void markResultAsClean(JobID jobId) throws IOException, 
NoSuchElementException {
-        if (hasCleanJobResultEntry(jobId)) {
-            LOG.debug("The job {} is already marked as clean. No action 
required.", jobId);

Review Comment:
   Removing the debug log makes the `LOG` field unused.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java:
##########
@@ -103,8 +103,8 @@ public static JobDispatcherLeaderProcessFactoryFactory 
create(
 
     private static Collection<JobResult> getDirtyJobResults(JobResultStore 
jobResultStore) {
         try {
-            return jobResultStore.getDirtyResults();
-        } catch (IOException e) {
+            return jobResultStore.getDirtyResultsAsync().get();

Review Comment:
   Same as in the `Dispatcher` code: It might be better to propagate the 
`CompletableFuture` for readability purposes. The calling code should decide 
whether it wants to wait for the async operation or not. It doesn't make that 
much of a difference in that case because we want to block and wait for the 
completion in that specific case. Therefore, feel free to counter-argue. :-)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -575,12 +576,13 @@ private boolean isDuplicateJob(JobID jobId) throws 
FlinkException {
      */
     private boolean isInGloballyTerminalState(JobID jobId) throws 
FlinkException {
         try {
-            return jobResultStore.hasJobResultEntry(jobId);
-        } catch (IOException e) {
-            throw new FlinkException(
-                    String.format("Failed to retrieve job scheduling status 
for job %s.", jobId),
-                    e);
+            return jobResultStore.hasJobResultEntryAsync(jobId).get();

Review Comment:
   The `CompletableFuture` would then be utilized in `Dispatcher.submitJob`. 
FYI: There's another issue with the `Dispatcher` that is related and might be 
worth being resolved beforehand: FLINK-32098



##########
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java:
##########
@@ -80,33 +82,65 @@ private TestingJobResultStore(
     }
 
     @Override
-    public void createDirtyResult(JobResultEntry jobResultEntry) throws 
IOException {
-        createDirtyResultConsumer.accept(jobResultEntry);
+    public CompletableFuture<Void> createDirtyResultAsync(JobResultEntry 
jobResultEntry) {
+        try {
+            createDirtyResultConsumer.accept(jobResultEntry);
+        } catch (IOException e) {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            future.completeExceptionally(e);
+            return future;
+        }
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override
-    public void markResultAsClean(JobID jobId) throws IOException {
-        markResultAsCleanConsumer.accept(jobId);
+    public CompletableFuture<Void> markResultAsCleanAsync(JobID jobId) {
+        try {
+            markResultAsCleanConsumer.accept(jobId);
+        } catch (IOException e) {

Review Comment:
   You're not following the contract here, I guess. :thinking: Usually for this 
`Testing*` implementations, we would change the callback type (e.g. a 
`ThrowingConsumer<T>` becomes a `Function<T, CompletableFuture<Void>`). That 
enables us to have the entire logic being specified in the test method.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1257,10 +1259,10 @@ private CompletableFuture<Void> removeJob(JobID jobId, 
CleanupJobState cleanupJo
 
     private void markJobAsClean(JobID jobId) {
         try {
-            jobResultStore.markResultAsClean(jobId);
+            jobResultStore.markResultAsCleanAsync(jobId).get();

Review Comment:
   Here, it's not really an issue because we're calling it asynchronously in 
[Dispatcher:1249](https://github.com/apache/flink/blob/3547aaacd8aa2e72fe227c9ac97389e803bdb460/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1249),
 anyway. But returning a future here would make the `markJobAsClean` method 
would make the method signature align closer to its purpose (and maybe even 
adding the `Async` suffix here?).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to