WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1288141874
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java:
##########
@@ -43,66 +46,92 @@ public abstract class AbstractThreadsafeJobResultStore
implements JobResultStore
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- @Override
- public void createDirtyResult(JobResultEntry jobResultEntry) throws
IOException {
- Preconditions.checkState(
- !hasJobResultEntry(jobResultEntry.getJobId()),
- "Job result store already contains an entry for job %s",
- jobResultEntry.getJobId());
+ private final Executor ioExecutor;
- withWriteLock(() -> createDirtyResultInternal(jobResultEntry));
+ protected AbstractThreadsafeJobResultStore(Executor ioExecutor) {
+ this.ioExecutor = ioExecutor;
+ }
+
+ @Override
+ public CompletableFuture<Void> createDirtyResultAsync(JobResultEntry
jobResultEntry) {
+ return hasJobResultEntryAsync(jobResultEntry.getJobId())
+ .thenAccept(
+ hasJobResultEntry ->
+ Preconditions.checkState(
+ !hasJobResultEntry,
+ "Job result store already contains an
entry for job %s",
+ jobResultEntry.getJobId()))
+ .thenCompose(
+ ignoredVoid ->
+ withWriteLockAsync(
+ () ->
createDirtyResultInternal(jobResultEntry)));
}
@GuardedBy("readWriteLock")
protected abstract void createDirtyResultInternal(JobResultEntry
jobResultEntry)
throws IOException;
@Override
- public void markResultAsClean(JobID jobId) throws IOException,
NoSuchElementException {
- if (hasCleanJobResultEntry(jobId)) {
- LOG.debug("The job {} is already marked as clean. No action
required.", jobId);
- return;
- }
-
- withWriteLock(() -> markResultAsCleanInternal(jobId));
+ public CompletableFuture<Void> markResultAsCleanAsync(JobID jobId) {
+ return hasCleanJobResultEntryAsync(jobId)
+ .thenCompose(
+ hasCleanJobResultEntry -> {
+ if (hasCleanJobResultEntry) {
+ LOG.debug(
+ "The job {} is already marked as
clean. No action required.",
+ jobId);
+ return FutureUtils.completedVoidFuture();
+ }
+
+ return withWriteLockAsync(() ->
markResultAsCleanInternal(jobId));
+ });
}
@GuardedBy("readWriteLock")
protected abstract void markResultAsCleanInternal(JobID jobId)
throws IOException, NoSuchElementException;
@Override
- public boolean hasJobResultEntry(JobID jobId) throws IOException {
- return withReadLock(
+ public CompletableFuture<Boolean> hasJobResultEntryAsync(JobID jobId) {
+ return withReadLockAsync(
() ->
hasDirtyJobResultEntryInternal(jobId)
|| hasCleanJobResultEntryInternal(jobId));
}
@Override
- public boolean hasDirtyJobResultEntry(JobID jobId) throws IOException {
- return withReadLock(() -> hasDirtyJobResultEntryInternal(jobId));
+ public CompletableFuture<Boolean> hasDirtyJobResultEntryAsync(JobID jobId)
{
+ return withReadLockAsync(() -> hasDirtyJobResultEntryInternal(jobId));
}
@GuardedBy("readWriteLock")
protected abstract boolean hasDirtyJobResultEntryInternal(JobID jobId)
throws IOException;
@Override
- public boolean hasCleanJobResultEntry(JobID jobId) throws IOException {
- return withReadLock(() -> hasCleanJobResultEntryInternal(jobId));
+ public CompletableFuture<Boolean> hasCleanJobResultEntryAsync(JobID jobId)
{
+ return withReadLockAsync(() -> hasCleanJobResultEntryInternal(jobId));
}
@GuardedBy("readWriteLock")
protected abstract boolean hasCleanJobResultEntryInternal(JobID jobId)
throws IOException;
@Override
public Set<JobResult> getDirtyResults() throws IOException {
- return withReadLock(this::getDirtyResultsInternal);
+ return getDirtyResultsInternal();
Review Comment:
The lock is added now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]