XComp commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1284412779
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -561,21 +563,28 @@ public CompletableFuture<Acknowledge> submitFailedJob(
return archiveExecutionGraphToHistoryServer(executionGraphInfo);
}
+ /**
+ * Checks whether the given job has already been submitted, executed, or
awaiting termination.
+ *
+ * @param jobId identifying the submitted job
+ * @return true if the job has already been submitted (is running) or has
been executed
+ * @throws Exception if the job scheduling status cannot be retrieved
+ */
+ private boolean isDuplicateJob(JobID jobId) throws Exception {
+ return isInGloballyTerminalState(jobId).get()
+ || jobManagerRunnerRegistry.isRegistered(jobId)
+ || submittedAndWaitingTerminationJobIDs.contains(jobId);
+ }
+
Review Comment:
This was accidentally re-added when rebasing to most-recent `master`. It can
be removed again.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java:
##########
@@ -112,10 +141,15 @@ private void withWriteLock(ThrowingRunnable<IOException>
runnable) throws IOExce
}
}
- private <T> T withReadLock(SupplierWithException<T, IOException> runnable)
throws IOException {
+ private <T> CompletableFuture<T> withReadLockAsync(
+ SupplierWithException<T, IOException> runnable) {
Review Comment:
```suggestion
SupplierWithException<T, IOException> supplier) {
```
copy&paste error on my side. You changed it for `withReadLock` already. :+1:
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java:
##########
Review Comment:
The commit message should have a `[hotfix][runtime]` prefix instead of being
labeled with `[FLINK-27204]`. The change is indepenent of FLINK-27204 (i.e.
more like a code cleanup that is shipped along with FLINK-27204). Think of it
like that: The hotfix commit wouldn't need to be reverted when reverting
FLINK-27204 because it's still a valid change to improve the code.
Additionally, you only cleaned `markResultAsCleanInternal` into the hotfix
commit. `hasDirtyJobResultEntryInternal`, `hasCleanJobResultEntryInternal` and
`getDirtyResultsInternal` were cleaned as well but in the wrong commit. These
three method changes should end up in the hotfix commit as well.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java:
##########
@@ -116,46 +123,74 @@ public void
testBaseDirectoryCreationOnResultStoreInitialization() throws Except
assertThat(emptyBaseDirectory).doesNotExist();
fileSystemJobResultStore =
- new FileSystemJobResultStore(basePath.getFileSystem(),
basePath, false);
+ new FileSystemJobResultStore(
+ basePath.getFileSystem(), basePath, false,
manuallyTriggeredExecutor);
// Result store operations are creating the base directory on-the-fly
assertThat(emptyBaseDirectory).doesNotExist();
- fileSystemJobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+ CompletableFuture<Void> dirtyResultAsync =
+
fileSystemJobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY);
+ manuallyTriggeredExecutor.triggerAll();
+ dirtyResultAsync.get();
assertThat(emptyBaseDirectory).exists().isDirectory();
Review Comment:
I noticed that we could extend the tests to check the async nature: We could
put the same (but inverted) assert in front of the trigger statement. That way
we check that no synchronous activity happens. The same also applies to the
other tests.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/JobResultStoreContractTest.java:
##########
@@ -46,72 +47,97 @@ public interface JobResultStoreContractTest {
@Test
default void testStoreJobResultsWithDuplicateIDsThrowsException() throws
IOException {
JobResultStore jobResultStore = createJobResultStore();
- jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+ jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).join();
final JobResultEntry otherEntryWithDuplicateId =
new JobResultEntry(
TestingJobResultStore.createSuccessfulJobResult(
DUMMY_JOB_RESULT_ENTRY.getJobId()));
- assertThatThrownBy(() ->
jobResultStore.createDirtyResult(otherEntryWithDuplicateId))
- .isInstanceOf(IllegalStateException.class);
+ assertThatThrownBy(
+ () ->
+ jobResultStore
+
.createDirtyResultAsync(otherEntryWithDuplicateId)
+ .join())
+ .isInstanceOf(CompletionException.class);
Review Comment:
You didn't apply this in all appearances in `JobResultStoreContractTest`
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##########
@@ -43,62 +44,62 @@ public interface JobResultStore {
* Registers the passed {@link JobResultEntry} instance as {@code dirty}
which indicates that
* clean-up operations still need to be performed. Once the job resource
cleanup has been
* finalized, we can mark the {@code JobResultEntry} as {@code clean}
result using {@link
- * #markResultAsClean(JobID)}.
+ * #markResultAsCleanAsync(JobID)}.
*
* @param jobResultEntry The job result we wish to persist.
- * @throws IOException if the creation of the dirty result failed for IO
reasons.
- * @throws IllegalStateException if the passed {@code jobResultEntry} has
a {@code JobID}
- * attached that is already registered in this {@code JobResultStore}.
+ * @return a successfully completed future with {@code true} if the dirty
result is created
+ * successfully. The future will be completed with {@link
IllegalStateException} if the
+ * passed {@code jobResultEntry} has a {@code JobID} attached that is
already registered in
+ * this {@code JobResultStore}.
*/
- void createDirtyResult(JobResultEntry jobResultEntry) throws IOException,
IllegalStateException;
+ CompletableFuture<Void> createDirtyResultAsync(JobResultEntry
jobResultEntry);
/**
* Marks an existing {@link JobResultEntry} as {@code clean}. This
indicates that no more
* resource cleanup steps need to be performed. No actions should be
triggered if the passed
* {@code JobID} belongs to a job that was already marked as clean.
*
* @param jobId Ident of the job we wish to mark as clean.
- * @throws IOException if marking the {@code dirty} {@code JobResultEntry}
as {@code clean}
- * failed for IO reasons.
- * @throws NoSuchElementException if there is no corresponding {@code
dirty} job present in the
- * store for the given {@code JobID}.
+ * @return a successfully completed future if the result is marked
successfully, The future can
+ * also completed with {@link NoSuchElementException}. i.e. there is
no corresponding {@code
Review Comment:
```suggestion
* @return a successfully completed future if the result is marked
successfully. The future can
* complete exceptionally with a {@link NoSuchElementException}.
i.e. there is no corresponding {@code
```
nitty nit: we might want to make it more explicit that the future completes
"exceptionally"
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java:
##########
@@ -43,66 +46,92 @@ public abstract class AbstractThreadsafeJobResultStore
implements JobResultStore
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- @Override
- public void createDirtyResult(JobResultEntry jobResultEntry) throws
IOException {
- Preconditions.checkState(
- !hasJobResultEntry(jobResultEntry.getJobId()),
- "Job result store already contains an entry for job %s",
- jobResultEntry.getJobId());
+ private final Executor ioExecutor;
- withWriteLock(() -> createDirtyResultInternal(jobResultEntry));
+ protected AbstractThreadsafeJobResultStore(Executor ioExecutor) {
+ this.ioExecutor = ioExecutor;
+ }
+
+ @Override
+ public CompletableFuture<Void> createDirtyResultAsync(JobResultEntry
jobResultEntry) {
+ return hasJobResultEntryAsync(jobResultEntry.getJobId())
+ .thenAccept(
+ hasJobResultEntry ->
+ Preconditions.checkState(
+ !hasJobResultEntry,
+ "Job result store already contains an
entry for job %s",
+ jobResultEntry.getJobId()))
+ .thenCompose(
+ ignoredVoid ->
+ withWriteLockAsync(
+ () ->
createDirtyResultInternal(jobResultEntry)));
}
@GuardedBy("readWriteLock")
protected abstract void createDirtyResultInternal(JobResultEntry
jobResultEntry)
throws IOException;
@Override
- public void markResultAsClean(JobID jobId) throws IOException,
NoSuchElementException {
- if (hasCleanJobResultEntry(jobId)) {
- LOG.debug("The job {} is already marked as clean. No action
required.", jobId);
- return;
- }
-
- withWriteLock(() -> markResultAsCleanInternal(jobId));
+ public CompletableFuture<Void> markResultAsCleanAsync(JobID jobId) {
+ return hasCleanJobResultEntryAsync(jobId)
+ .thenCompose(
+ hasCleanJobResultEntry -> {
+ if (hasCleanJobResultEntry) {
+ LOG.debug(
+ "The job {} is already marked as
clean. No action required.",
+ jobId);
+ return FutureUtils.completedVoidFuture();
+ }
+
+ return withWriteLockAsync(() ->
markResultAsCleanInternal(jobId));
+ });
}
@GuardedBy("readWriteLock")
protected abstract void markResultAsCleanInternal(JobID jobId)
throws IOException, NoSuchElementException;
@Override
- public boolean hasJobResultEntry(JobID jobId) throws IOException {
- return withReadLock(
+ public CompletableFuture<Boolean> hasJobResultEntryAsync(JobID jobId) {
+ return withReadLockAsync(
() ->
hasDirtyJobResultEntryInternal(jobId)
|| hasCleanJobResultEntryInternal(jobId));
}
@Override
- public boolean hasDirtyJobResultEntry(JobID jobId) throws IOException {
- return withReadLock(() -> hasDirtyJobResultEntryInternal(jobId));
+ public CompletableFuture<Boolean> hasDirtyJobResultEntryAsync(JobID jobId)
{
+ return withReadLockAsync(() -> hasDirtyJobResultEntryInternal(jobId));
}
@GuardedBy("readWriteLock")
protected abstract boolean hasDirtyJobResultEntryInternal(JobID jobId)
throws IOException;
@Override
- public boolean hasCleanJobResultEntry(JobID jobId) throws IOException {
- return withReadLock(() -> hasCleanJobResultEntryInternal(jobId));
+ public CompletableFuture<Boolean> hasCleanJobResultEntryAsync(JobID jobId)
{
+ return withReadLockAsync(() -> hasCleanJobResultEntryInternal(jobId));
}
@GuardedBy("readWriteLock")
protected abstract boolean hasCleanJobResultEntryInternal(JobID jobId)
throws IOException;
@Override
public Set<JobResult> getDirtyResults() throws IOException {
- return withReadLock(this::getDirtyResultsInternal);
+ return getDirtyResultsInternal();
Review Comment:
```suggestion
return withReadLock(this::getDirtyResultsInternal());
```
Yikes, that's my bad: We should remove the lock here.
##########
flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java:
##########
@@ -381,9 +381,12 @@ public static void rethrowException(Throwable t) throws
Exception {
* @param e exception to throw if not null.
* @throws Exception
*/
- public static void tryRethrowException(@Nullable Exception e) throws
Exception {
+ public static void tryRethrowException(@Nullable Throwable e) throws
Exception {
Review Comment:
We can revert the change of `ExceptionUtils`. We're not using it anymore, as
far as I can tell.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java:
##########
@@ -116,46 +123,74 @@ public void
testBaseDirectoryCreationOnResultStoreInitialization() throws Except
assertThat(emptyBaseDirectory).doesNotExist();
fileSystemJobResultStore =
- new FileSystemJobResultStore(basePath.getFileSystem(),
basePath, false);
+ new FileSystemJobResultStore(
+ basePath.getFileSystem(), basePath, false,
manuallyTriggeredExecutor);
// Result store operations are creating the base directory on-the-fly
assertThat(emptyBaseDirectory).doesNotExist();
- fileSystemJobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+ CompletableFuture<Void> dirtyResultAsync =
+
fileSystemJobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY);
+ manuallyTriggeredExecutor.triggerAll();
+ dirtyResultAsync.get();
Review Comment:
```suggestion
FlinkAssertions.assertThatFuture(dirtyResultAsync).eventuallySucceeds();
```
nit: `FlinkAssertions.assertThatFuture` is a handy utility to make
future-based testing more readable.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java:
##########
@@ -54,11 +57,15 @@ public class FileSystemJobResultStoreFileOperationsTest {
private Path basePath;
+ final ManuallyTriggeredScheduledExecutor manuallyTriggeredExecutor =
Review Comment:
```suggestion
private final ManuallyTriggeredScheduledExecutor
manuallyTriggeredExecutor =
```
Looks like that one slipped through in the previous review. Is there a
reason why we don't make that one `private`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java:
##########
@@ -43,66 +46,92 @@ public abstract class AbstractThreadsafeJobResultStore
implements JobResultStore
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- @Override
- public void createDirtyResult(JobResultEntry jobResultEntry) throws
IOException {
- Preconditions.checkState(
- !hasJobResultEntry(jobResultEntry.getJobId()),
- "Job result store already contains an entry for job %s",
- jobResultEntry.getJobId());
+ private final Executor ioExecutor;
- withWriteLock(() -> createDirtyResultInternal(jobResultEntry));
+ protected AbstractThreadsafeJobResultStore(Executor ioExecutor) {
+ this.ioExecutor = ioExecutor;
+ }
+
+ @Override
+ public CompletableFuture<Void> createDirtyResultAsync(JobResultEntry
jobResultEntry) {
+ return hasJobResultEntryAsync(jobResultEntry.getJobId())
+ .thenAccept(
+ hasJobResultEntry ->
+ Preconditions.checkState(
+ !hasJobResultEntry,
+ "Job result store already contains an
entry for job %s",
+ jobResultEntry.getJobId()))
+ .thenCompose(
+ ignoredVoid ->
+ withWriteLockAsync(
+ () ->
createDirtyResultInternal(jobResultEntry)));
}
@GuardedBy("readWriteLock")
protected abstract void createDirtyResultInternal(JobResultEntry
jobResultEntry)
throws IOException;
@Override
- public void markResultAsClean(JobID jobId) throws IOException,
NoSuchElementException {
- if (hasCleanJobResultEntry(jobId)) {
- LOG.debug("The job {} is already marked as clean. No action
required.", jobId);
- return;
- }
-
- withWriteLock(() -> markResultAsCleanInternal(jobId));
+ public CompletableFuture<Void> markResultAsCleanAsync(JobID jobId) {
+ return hasCleanJobResultEntryAsync(jobId)
+ .thenCompose(
+ hasCleanJobResultEntry -> {
+ if (hasCleanJobResultEntry) {
+ LOG.debug(
+ "The job {} is already marked as
clean. No action required.",
+ jobId);
+ return FutureUtils.completedVoidFuture();
+ }
+
+ return withWriteLockAsync(() ->
markResultAsCleanInternal(jobId));
+ });
}
@GuardedBy("readWriteLock")
protected abstract void markResultAsCleanInternal(JobID jobId)
throws IOException, NoSuchElementException;
@Override
- public boolean hasJobResultEntry(JobID jobId) throws IOException {
- return withReadLock(
+ public CompletableFuture<Boolean> hasJobResultEntryAsync(JobID jobId) {
+ return withReadLockAsync(
() ->
hasDirtyJobResultEntryInternal(jobId)
|| hasCleanJobResultEntryInternal(jobId));
}
@Override
- public boolean hasDirtyJobResultEntry(JobID jobId) throws IOException {
- return withReadLock(() -> hasDirtyJobResultEntryInternal(jobId));
+ public CompletableFuture<Boolean> hasDirtyJobResultEntryAsync(JobID jobId)
{
+ return withReadLockAsync(() -> hasDirtyJobResultEntryInternal(jobId));
}
@GuardedBy("readWriteLock")
protected abstract boolean hasDirtyJobResultEntryInternal(JobID jobId)
throws IOException;
@Override
- public boolean hasCleanJobResultEntry(JobID jobId) throws IOException {
- return withReadLock(() -> hasCleanJobResultEntryInternal(jobId));
+ public CompletableFuture<Boolean> hasCleanJobResultEntryAsync(JobID jobId)
{
+ return withReadLockAsync(() -> hasCleanJobResultEntryInternal(jobId));
}
@GuardedBy("readWriteLock")
protected abstract boolean hasCleanJobResultEntryInternal(JobID jobId)
throws IOException;
@Override
public Set<JobResult> getDirtyResults() throws IOException {
- return withReadLock(this::getDirtyResultsInternal);
+ return getDirtyResultsInternal();
}
@GuardedBy("readWriteLock")
protected abstract Set<JobResult> getDirtyResultsInternal() throws
IOException;
+ private CompletableFuture<Void>
withWriteLockAsync(ThrowingRunnable<IOException> runnable) {
+ return FutureUtils.supplyAsync(
Review Comment:
```suggestion
return FutureUtils.runAsync(
```
`runAsync` is good enough here. We're not returning any value.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]