XComp commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1206775003
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##########
@@ -43,69 +44,69 @@ public interface JobResultStore {
* Registers the passed {@link JobResultEntry} instance as {@code dirty}
which indicates that
* clean-up operations still need to be performed. Once the job resource
cleanup has been
* finalized, we can mark the {@code JobResultEntry} as {@code clean}
result using {@link
- * #markResultAsClean(JobID)}.
+ * #markResultAsCleanAsync(JobID)}.
*
* @param jobResultEntry The job result we wish to persist.
+ * @return CompletableFuture with the completed state.
* @throws IOException if the creation of the dirty result failed for IO
reasons.
* @throws IllegalStateException if the passed {@code jobResultEntry} has
a {@code JobID}
* attached that is already registered in this {@code JobResultStore}.
*/
- void createDirtyResult(JobResultEntry jobResultEntry) throws IOException,
IllegalStateException;
+ CompletableFuture<Void> createDirtyResultAsync(JobResultEntry
jobResultEntry);
/**
* Marks an existing {@link JobResultEntry} as {@code clean}. This
indicates that no more
* resource cleanup steps need to be performed. No actions should be
triggered if the passed
* {@code JobID} belongs to a job that was already marked as clean.
*
* @param jobId Ident of the job we wish to mark as clean.
+ * @return CompletableFuture with the completed state.
* @throws IOException if marking the {@code dirty} {@code JobResultEntry}
as {@code clean}
* failed for IO reasons.
* @throws NoSuchElementException if there is no corresponding {@code
dirty} job present in the
* store for the given {@code JobID}.
Review Comment:
The `IOException` can be removed from the javaDoc now. It's going to be
wrapped by the `CompletableFuture`. But we might want to mentioned the
`NoSuchElementException` as part of the return value's documentation, still.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##########
@@ -43,69 +44,69 @@ public interface JobResultStore {
* Registers the passed {@link JobResultEntry} instance as {@code dirty}
which indicates that
* clean-up operations still need to be performed. Once the job resource
cleanup has been
* finalized, we can mark the {@code JobResultEntry} as {@code clean}
result using {@link
- * #markResultAsClean(JobID)}.
+ * #markResultAsCleanAsync(JobID)}.
*
* @param jobResultEntry The job result we wish to persist.
+ * @return CompletableFuture with the completed state.
* @throws IOException if the creation of the dirty result failed for IO
reasons.
* @throws IllegalStateException if the passed {@code jobResultEntry} has
a {@code JobID}
* attached that is already registered in this {@code JobResultStore}.
Review Comment:
The IOException can be removed from the javaDoc now. It's going to be
wrapped by the CompletableFuture. But we might want to mentioned the
`IllegalStateException` as part of the return value's documentation, still.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java:
##########
@@ -44,64 +45,84 @@ public abstract class AbstractThreadsafeJobResultStore
implements JobResultStore
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@Override
- public void createDirtyResult(JobResultEntry jobResultEntry) throws
IOException {
- Preconditions.checkState(
- !hasJobResultEntry(jobResultEntry.getJobId()),
- "Job result store already contains an entry for job %s",
- jobResultEntry.getJobId());
-
- withWriteLock(() -> createDirtyResultInternal(jobResultEntry));
+ public CompletableFuture<Void> createDirtyResultAsync(JobResultEntry
jobResultEntry) {
+ return hasJobResultEntryAsync(jobResultEntry.getJobId())
+ .handle(
+ (hasResult, error) -> {
+ if (error != null || hasResult) {
+ ExceptionUtils.rethrow(error);
+ }
+ try {
+ withWriteLock(() ->
createDirtyResultInternal(jobResultEntry));
Review Comment:
You have to be careful here: The internal methods are now only triggering
the actual logic in the `ioExecutor`. Therefore, only submitting the task is
guarded under the lock. You're losing the synchronization of the IO tasks which
will run in multiple threads of the ioExecutor for the
`FileSystemJobResultStore` implementation.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1361,28 +1363,35 @@ private CompletableFuture<CleanupJobState>
registerGloballyTerminatedJobInJobRes
"Job %s is in state %s which is not globally terminal.",
jobId,
terminalJobStatus);
-
- ioExecutor.execute(
- () -> {
- try {
- if (jobResultStore.hasCleanJobResultEntry(jobId)) {
- log.warn(
- "Job {} is already marked as clean but
clean up was triggered again.",
- jobId);
- } else if
(!jobResultStore.hasDirtyJobResultEntry(jobId)) {
- jobResultStore.createDirtyResult(
- new JobResultEntry(
-
JobResult.createFrom(archivedExecutionGraph)));
- log.info(
- "Job {} has been registered for cleanup in
the JobResultStore after reaching a terminal state.",
- jobId);
- }
- } catch (IOException e) {
- writeFuture.completeExceptionally(e);
- return;
- }
- writeFuture.complete(null);
- });
+ jobResultStore
+ .hasCleanJobResultEntryAsync(jobId)
+ .handleAsync(
Review Comment:
That's just a thought without having proven that, actuall, but: Would the
code become more readible if we utilize the `CompletableFuture` methods (e.g.
`thenCompose`)? :thinking:
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -575,12 +576,13 @@ private boolean isDuplicateJob(JobID jobId) throws
FlinkException {
*/
private boolean isInGloballyTerminalState(JobID jobId) throws
FlinkException {
try {
- return jobResultStore.hasJobResultEntry(jobId);
- } catch (IOException e) {
- throw new FlinkException(
- String.format("Failed to retrieve job scheduling status
for job %s.", jobId),
- e);
+ return jobResultStore.hasJobResultEntryAsync(jobId).get();
Review Comment:
We're not gaining much here because the call is still blocking (essentially,
changing the return value of `isInGloballyTerminalState` from `boolean` to
CompletableFuture<Boolean>`): We would have to propagate the CompletableFuture
down the call hierarchy to make use of it.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/JobResultStoreContractTest.java:
##########
@@ -44,106 +45,140 @@ public interface JobResultStoreContractTest {
JobResultStore createJobResultStore() throws IOException;
@Test
- default void testStoreJobResultsWithDuplicateIDsThrowsException() throws
IOException {
+ default void testStoreJobResultsWithDuplicateIDsThrowsException()
+ throws IOException, ExecutionException, InterruptedException {
JobResultStore jobResultStore = createJobResultStore();
- jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+ jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
final JobResultEntry otherEntryWithDuplicateId =
new JobResultEntry(
TestingJobResultStore.createSuccessfulJobResult(
DUMMY_JOB_RESULT_ENTRY.getJobId()));
- assertThatThrownBy(() ->
jobResultStore.createDirtyResult(otherEntryWithDuplicateId))
- .isInstanceOf(IllegalStateException.class);
+ assertThatThrownBy(
+ () ->
+ jobResultStore
+
.createDirtyResultAsync(otherEntryWithDuplicateId)
+ .get())
+ .hasCauseInstanceOf(RuntimeException.class);
}
@Test
- default void
testStoreDirtyEntryForAlreadyCleanedJobResultThrowsException() throws
IOException {
+ default void testStoreDirtyEntryForAlreadyCleanedJobResultThrowsException()
+ throws IOException, ExecutionException, InterruptedException {
JobResultStore jobResultStore = createJobResultStore();
- jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
- jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
- assertThatThrownBy(() ->
jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY))
- .isInstanceOf(IllegalStateException.class);
+ jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
+
jobResultStore.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).get();
+ assertThatThrownBy(
+ () ->
jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get())
+ .hasCauseInstanceOf(RuntimeException.class);
}
@Test
- default void testCleaningDuplicateEntryThrowsNoException() throws
IOException {
+ default void testCleaningDuplicateEntryThrowsNoException()
+ throws IOException, ExecutionException, InterruptedException {
JobResultStore jobResultStore = createJobResultStore();
- jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
- jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
+ jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
+
jobResultStore.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).get();
assertThatNoException()
.isThrownBy(
- () ->
jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId()));
+ () ->
+ jobResultStore
+
.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+ .get());
}
@Test
default void testCleaningNonExistentEntryThrowsException() throws
IOException {
JobResultStore jobResultStore = createJobResultStore();
assertThatThrownBy(
- () ->
jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId()))
- .isInstanceOf(NoSuchElementException.class);
+ () ->
+ jobResultStore
+
.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+ .get())
+ .hasCauseInstanceOf(NoSuchElementException.class);
}
@Test
- default void testHasJobResultEntryWithDirtyEntry() throws IOException {
+ default void testHasJobResultEntryWithDirtyEntry()
+ throws IOException, ExecutionException, InterruptedException {
JobResultStore jobResultStore = createJobResultStore();
- jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
-
assertThat(jobResultStore.hasDirtyJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+ jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
+ assertThat(
+ jobResultStore
+
.hasDirtyJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+ .get())
.isTrue();
-
assertThat(jobResultStore.hasCleanJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+ assertThat(
+ jobResultStore
+
.hasCleanJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+ .get())
.isFalse();
-
assertThat(jobResultStore.hasJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId())).isTrue();
+
assertThat(jobResultStore.hasJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).get())
+ .isTrue();
}
@Test
- default void testHasJobResultEntryWithCleanEntry() throws IOException {
+ default void testHasJobResultEntryWithCleanEntry()
+ throws IOException, ExecutionException, InterruptedException {
JobResultStore jobResultStore = createJobResultStore();
- jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
- jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
-
assertThat(jobResultStore.hasDirtyJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+ jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
+
jobResultStore.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).get();
+ assertThat(
+ jobResultStore
+
.hasDirtyJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+ .get())
.isFalse();
-
assertThat(jobResultStore.hasCleanJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId()))
+ assertThat(
+ jobResultStore
+
.hasCleanJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+ .get())
+ .isTrue();
+
assertThat(jobResultStore.hasJobResultEntryAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).get())
.isTrue();
-
assertThat(jobResultStore.hasJobResultEntry(DUMMY_JOB_RESULT_ENTRY.getJobId())).isTrue();
}
@Test
- default void testHasJobResultEntryWithEmptyStore() throws IOException {
+ default void testHasJobResultEntryWithEmptyStore()
+ throws IOException, ExecutionException, InterruptedException {
JobResultStore jobResultStore = createJobResultStore();
JobID jobId = new JobID();
- assertThat(jobResultStore.hasDirtyJobResultEntry(jobId)).isFalse();
- assertThat(jobResultStore.hasCleanJobResultEntry(jobId)).isFalse();
- assertThat(jobResultStore.hasJobResultEntry(jobId)).isFalse();
+
assertThat(jobResultStore.hasDirtyJobResultEntryAsync(jobId).get()).isFalse();
+
assertThat(jobResultStore.hasCleanJobResultEntryAsync(jobId).get()).isFalse();
+
assertThat(jobResultStore.hasJobResultEntryAsync(jobId).get()).isFalse();
}
@Test
- default void testGetDirtyResultsWithNoEntry() throws IOException {
+ default void testGetDirtyResultsWithNoEntry()
+ throws IOException, ExecutionException, InterruptedException {
JobResultStore jobResultStore = createJobResultStore();
- assertThat(jobResultStore.getDirtyResults()).isEmpty();
+ assertThat(jobResultStore.getDirtyResultsAsync().get()).isEmpty();
}
@Test
- default void testGetDirtyResultsWithDirtyEntry() throws IOException {
+ default void testGetDirtyResultsWithDirtyEntry()
+ throws IOException, ExecutionException, InterruptedException {
JobResultStore jobResultStore = createJobResultStore();
- jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+ jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
assertThat(
- jobResultStore.getDirtyResults().stream()
+ jobResultStore.getDirtyResultsAsync().get().stream()
.map(JobResult::getJobId)
.collect(Collectors.toList()))
.singleElement()
.isEqualTo(DUMMY_JOB_RESULT_ENTRY.getJobId());
}
@Test
- default void testGetDirtyResultsWithDirtyAndCleanEntry() throws
IOException {
+ default void testGetDirtyResultsWithDirtyAndCleanEntry()
+ throws IOException, ExecutionException, InterruptedException {
JobResultStore jobResultStore = createJobResultStore();
- jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
- jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
+ jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).get();
Review Comment:
one tip: Using `join()` instead of `get()` reduces the diff for test methods
because you wouldn't have to specifying `ExecutionException` and
`InterruptedException` in the test method's signature.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java:
##########
@@ -191,8 +191,8 @@ private Collection<JobResult> getDirtyJobResultsIfRunning()
{
private Collection<JobResult> getDirtyJobResults() {
try {
- return jobResultStore.getDirtyResults();
- } catch (IOException e) {
+ return jobResultStore.getDirtyResultsAsync().get();
Review Comment:
Here, it's more obvious. We're making this call blocking again just to
generate a CompletableFuture out of it again in line 124. Instead, we could
propagate the `CompletableFuture`. WDYT?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java:
##########
@@ -44,64 +45,84 @@ public abstract class AbstractThreadsafeJobResultStore
implements JobResultStore
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@Override
- public void createDirtyResult(JobResultEntry jobResultEntry) throws
IOException {
- Preconditions.checkState(
- !hasJobResultEntry(jobResultEntry.getJobId()),
- "Job result store already contains an entry for job %s",
- jobResultEntry.getJobId());
-
- withWriteLock(() -> createDirtyResultInternal(jobResultEntry));
+ public CompletableFuture<Void> createDirtyResultAsync(JobResultEntry
jobResultEntry) {
+ return hasJobResultEntryAsync(jobResultEntry.getJobId())
+ .handle(
+ (hasResult, error) -> {
+ if (error != null || hasResult) {
+ ExceptionUtils.rethrow(error);
+ }
+ try {
+ withWriteLock(() ->
createDirtyResultInternal(jobResultEntry));
+ } catch (IOException e) {
+ ExceptionUtils.rethrow(error);
+ }
+ return null;
+ });
}
@GuardedBy("readWriteLock")
protected abstract void createDirtyResultInternal(JobResultEntry
jobResultEntry)
throws IOException;
@Override
- public void markResultAsClean(JobID jobId) throws IOException,
NoSuchElementException {
- if (hasCleanJobResultEntry(jobId)) {
- LOG.debug("The job {} is already marked as clean. No action
required.", jobId);
Review Comment:
Removing the debug log makes the `LOG` field unused.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java:
##########
@@ -103,8 +103,8 @@ public static JobDispatcherLeaderProcessFactoryFactory
create(
private static Collection<JobResult> getDirtyJobResults(JobResultStore
jobResultStore) {
try {
- return jobResultStore.getDirtyResults();
- } catch (IOException e) {
+ return jobResultStore.getDirtyResultsAsync().get();
Review Comment:
Same as in the `Dispatcher` code: It might be better to propagate the
`CompletableFuture` for readability purposes. The calling code should decide
whether it wants to wait for the async operation or not. It doesn't make that
much of a difference in that case because we want to block and wait for the
completion in that specific case. Therefore, feel free to counter-argue. :-)
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -575,12 +576,13 @@ private boolean isDuplicateJob(JobID jobId) throws
FlinkException {
*/
private boolean isInGloballyTerminalState(JobID jobId) throws
FlinkException {
try {
- return jobResultStore.hasJobResultEntry(jobId);
- } catch (IOException e) {
- throw new FlinkException(
- String.format("Failed to retrieve job scheduling status
for job %s.", jobId),
- e);
+ return jobResultStore.hasJobResultEntryAsync(jobId).get();
Review Comment:
The `CompletableFuture` would then be utilized in `Dispatcher.submitJob`.
FYI: There's another issue with the `Dispatcher` that is related and might be
worth being resolved beforehand: FLINK-32098
##########
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java:
##########
@@ -80,33 +82,65 @@ private TestingJobResultStore(
}
@Override
- public void createDirtyResult(JobResultEntry jobResultEntry) throws
IOException {
- createDirtyResultConsumer.accept(jobResultEntry);
+ public CompletableFuture<Void> createDirtyResultAsync(JobResultEntry
jobResultEntry) {
+ try {
+ createDirtyResultConsumer.accept(jobResultEntry);
+ } catch (IOException e) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ future.completeExceptionally(e);
+ return future;
+ }
+ return CompletableFuture.completedFuture(null);
}
@Override
- public void markResultAsClean(JobID jobId) throws IOException {
- markResultAsCleanConsumer.accept(jobId);
+ public CompletableFuture<Void> markResultAsCleanAsync(JobID jobId) {
+ try {
+ markResultAsCleanConsumer.accept(jobId);
+ } catch (IOException e) {
Review Comment:
You're not following the contract here, I guess. :thinking: Usually for this
`Testing*` implementations, we would change the callback type (e.g. a
`ThrowingConsumer<T>` becomes a `Function<T, CompletableFuture<Void>`). That
enables us to have the entire logic being specified in the test method.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1257,10 +1259,10 @@ private CompletableFuture<Void> removeJob(JobID jobId,
CleanupJobState cleanupJo
private void markJobAsClean(JobID jobId) {
try {
- jobResultStore.markResultAsClean(jobId);
+ jobResultStore.markResultAsCleanAsync(jobId).get();
Review Comment:
Here, it's not really an issue because we're calling it asynchronously in
[Dispatcher:1249](https://github.com/apache/flink/blob/3547aaacd8aa2e72fe227c9ac97389e803bdb460/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1249),
anyway. But returning a future here would make the `markJobAsClean` method
would make the method signature align closer to its purpose (and maybe even
adding the `Async` suffix here?).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]