WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1288141874


##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java:
##########
@@ -43,66 +46,92 @@ public abstract class AbstractThreadsafeJobResultStore 
implements JobResultStore
 
     private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
 
-    @Override
-    public void createDirtyResult(JobResultEntry jobResultEntry) throws 
IOException {
-        Preconditions.checkState(
-                !hasJobResultEntry(jobResultEntry.getJobId()),
-                "Job result store already contains an entry for job %s",
-                jobResultEntry.getJobId());
+    private final Executor ioExecutor;
 
-        withWriteLock(() -> createDirtyResultInternal(jobResultEntry));
+    protected AbstractThreadsafeJobResultStore(Executor ioExecutor) {
+        this.ioExecutor = ioExecutor;
+    }
+
+    @Override
+    public CompletableFuture<Void> createDirtyResultAsync(JobResultEntry 
jobResultEntry) {
+        return hasJobResultEntryAsync(jobResultEntry.getJobId())
+                .thenAccept(
+                        hasJobResultEntry ->
+                                Preconditions.checkState(
+                                        !hasJobResultEntry,
+                                        "Job result store already contains an 
entry for job %s",
+                                        jobResultEntry.getJobId()))
+                .thenCompose(
+                        ignoredVoid ->
+                                withWriteLockAsync(
+                                        () -> 
createDirtyResultInternal(jobResultEntry)));
     }
 
     @GuardedBy("readWriteLock")
     protected abstract void createDirtyResultInternal(JobResultEntry 
jobResultEntry)
             throws IOException;
 
     @Override
-    public void markResultAsClean(JobID jobId) throws IOException, 
NoSuchElementException {
-        if (hasCleanJobResultEntry(jobId)) {
-            LOG.debug("The job {} is already marked as clean. No action 
required.", jobId);
-            return;
-        }
-
-        withWriteLock(() -> markResultAsCleanInternal(jobId));
+    public CompletableFuture<Void> markResultAsCleanAsync(JobID jobId) {
+        return hasCleanJobResultEntryAsync(jobId)
+                .thenCompose(
+                        hasCleanJobResultEntry -> {
+                            if (hasCleanJobResultEntry) {
+                                LOG.debug(
+                                        "The job {} is already marked as 
clean. No action required.",
+                                        jobId);
+                                return FutureUtils.completedVoidFuture();
+                            }
+
+                            return withWriteLockAsync(() -> 
markResultAsCleanInternal(jobId));
+                        });
     }
 
     @GuardedBy("readWriteLock")
     protected abstract void markResultAsCleanInternal(JobID jobId)
             throws IOException, NoSuchElementException;
 
     @Override
-    public boolean hasJobResultEntry(JobID jobId) throws IOException {
-        return withReadLock(
+    public CompletableFuture<Boolean> hasJobResultEntryAsync(JobID jobId) {
+        return withReadLockAsync(
                 () ->
                         hasDirtyJobResultEntryInternal(jobId)
                                 || hasCleanJobResultEntryInternal(jobId));
     }
 
     @Override
-    public boolean hasDirtyJobResultEntry(JobID jobId) throws IOException {
-        return withReadLock(() -> hasDirtyJobResultEntryInternal(jobId));
+    public CompletableFuture<Boolean> hasDirtyJobResultEntryAsync(JobID jobId) 
{
+        return withReadLockAsync(() -> hasDirtyJobResultEntryInternal(jobId));
     }
 
     @GuardedBy("readWriteLock")
     protected abstract boolean hasDirtyJobResultEntryInternal(JobID jobId) 
throws IOException;
 
     @Override
-    public boolean hasCleanJobResultEntry(JobID jobId) throws IOException {
-        return withReadLock(() -> hasCleanJobResultEntryInternal(jobId));
+    public CompletableFuture<Boolean> hasCleanJobResultEntryAsync(JobID jobId) 
{
+        return withReadLockAsync(() -> hasCleanJobResultEntryInternal(jobId));
     }
 
     @GuardedBy("readWriteLock")
     protected abstract boolean hasCleanJobResultEntryInternal(JobID jobId) 
throws IOException;
 
     @Override
     public Set<JobResult> getDirtyResults() throws IOException {
-        return withReadLock(this::getDirtyResultsInternal);
+        return getDirtyResultsInternal();

Review Comment:
   The lock is added now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to