XComp commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1281945188
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -516,7 +517,7 @@ public CompletableFuture<Acknowledge> submitJob(JobGraph
jobGraph, Time timeout)
try {
if (isDuplicateJob(jobGraph.getJobID())) {
- if (isInGloballyTerminalState(jobGraph.getJobID())) {
+ if (isInGloballyTerminalState(jobGraph.getJobID()).get()) {
Review Comment:
Reminder: That's going to create conflicts when rebasing this branch to
master the next time.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/JobDispatcherLeaderProcessFactoryFactory.java:
##########
@@ -77,7 +77,14 @@ public JobDispatcherLeaderProcessFactory createFactory(
}
final JobResultStore jobResultStore =
jobPersistenceComponentFactory.createJobResultStore();
- final Collection<JobResult> recoveredDirtyJobResults =
getDirtyJobResults(jobResultStore);
+ final Collection<JobResult> recoveredDirtyJobResults;
Review Comment:
This class/file doesn't need to be changed anymore after moving back to
`JobResultStore#getDirtyResult`
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -516,7 +517,7 @@ public CompletableFuture<Acknowledge> submitJob(JobGraph
jobGraph, Time timeout)
try {
if (isDuplicateJob(jobGraph.getJobID())) {
- if (isInGloballyTerminalState(jobGraph.getJobID())) {
+ if (isInGloballyTerminalState(jobGraph.getJobID()).get()) {
Review Comment:
Additionally, we shouldn't call `.get()` here where the error is the caught
in a `catch` class returning a separate `CompletableFuture` again. Can't we use
the Future methods here? Or is that harder to read? :thinking:
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -580,17 +581,13 @@ private boolean isDuplicateJob(JobID jobId) throws
FlinkException {
* Checks whether the given job has already been executed.
*
* @param jobId identifying the submitted job
- * @return true if the job has already finished, either successfully or as
a failure
+ * @return a successfully completed future with {@code true} if the job
has already finished,
+ * either successfully or as a failure
* @throws FlinkException if the job scheduling status cannot be retrieved
Review Comment:
```suggestion
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1390,29 +1392,71 @@ private CompletableFuture<CleanupJobState>
registerGloballyTerminatedJobInJobRes
"Job %s is in state %s which is not globally terminal.",
jobId,
terminalJobStatus);
-
- ioExecutor.execute(
- () -> {
- try {
- if (jobResultStore.hasCleanJobResultEntry(jobId)) {
- log.warn(
- "Job {} is already marked as clean but
clean up was triggered again.",
- jobId);
- } else if
(!jobResultStore.hasDirtyJobResultEntry(jobId)) {
- jobResultStore.createDirtyResult(
- new JobResultEntry(
-
JobResult.createFrom(archivedExecutionGraph)));
- log.info(
- "Job {} has been registered for cleanup in
the JobResultStore after reaching a terminal state.",
- jobId);
- }
- } catch (IOException e) {
- writeFuture.completeExceptionally(e);
+ CompletableFuture<Boolean> shouldCheckDirtyJobResult =
+ jobResultStore
+ .hasCleanJobResultEntryAsync(jobId)
+ .handleAsync(
+ (hasCleanJobResultEntry, throwable) -> {
+ if (throwable != null) {
+
writeFuture.completeExceptionally(throwable);
+ return false;
+ } else {
+ if (hasCleanJobResultEntry) {
+ log.warn(
+ "Job {} is already marked
as clean but "
+ + "clean up was
triggered again.",
+ jobId);
+ writeFuture.complete(null);
+ return false;
+ } else {
+ return true;
+ }
+ }
+ });
+ shouldCheckDirtyJobResult.whenCompleteAsync(
+ (shouldCheck, throwable1) -> {
+ if (throwable1 != null) {
+ writeFuture.completeExceptionally(throwable1);
return;
}
- writeFuture.complete(null);
+ if (shouldCheck) {
+ jobResultStore
+ .hasDirtyJobResultEntryAsync(jobId)
+ .whenCompleteAsync(
+ (hasDirtyJobResultEntry, throwable2)
-> {
+ if (throwable2 != null) {
+
writeFuture.completeExceptionally(throwable2);
+ return;
+ }
+ if (!hasDirtyJobResultEntry) {
+ jobResultStore
+
.createDirtyResultAsync(
+ new
JobResultEntry(
+
JobResult.createFrom(
+
archivedExecutionGraph)))
+ .whenCompleteAsync(
+ (unused,
throwable3) -> {
+ if
(throwable3 != null) {
+
writeFuture
+
.completeExceptionally(
+
throwable3);
+ return;
+ }
+ log.info(
+
"Job {} has been registered "
+
+ "for cleanup in "
+
+ "the JobResultStore "
+
+ "after reaching a "
+
+ "terminal state.",
+
jobId);
+
writeFuture.complete(null);
+ });
+ } else {
+ writeFuture.complete(null);
+ }
+ });
+ }
Review Comment:
This code block can be simplified (we don't need an extra `writeFuture`
anymore) in the following way:
```
return jobResultStore
.hasCleanJobResultEntryAsync(jobId)
.thenCompose(
hasCleanJobResultEntry -> {
if (hasCleanJobResultEntry) {
log.warn(
"Job {} is already marked as clean
but clean up was triggered again.",
jobId);
return FutureUtils.completedVoidFuture();
} else {
return jobResultStore
.hasDirtyJobResultEntryAsync(jobId)
.thenCompose(
hasDirtyJobResultEntry -> {
if
(hasDirtyJobResultEntry) {
return
FutureUtils.completedVoidFuture();
}
return
jobResultStore.createDirtyResultAsync(
new
JobResultEntry(
JobResult.createFrom(
archivedExecutionGraph)));
});
}
})
.handleAsync(
(ignored, error) -> {
if (error != null) {
fatalErrorHandler.onFatalError(
new FlinkException(
String.format(
"The job %s couldn't
be marked as pre-cleanup finished in JobResultStore.",
executionGraphInfo.getJobId()),
error));
}
return
CleanupJobState.globalCleanup(terminalJobStatus);
},
getMainThreadExecutor());
```
WDYT?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##########
@@ -43,69 +44,70 @@ public interface JobResultStore {
* Registers the passed {@link JobResultEntry} instance as {@code dirty}
which indicates that
* clean-up operations still need to be performed. Once the job resource
cleanup has been
* finalized, we can mark the {@code JobResultEntry} as {@code clean}
result using {@link
- * #markResultAsClean(JobID)}.
+ * #markResultAsCleanAsync(JobID)}.
*
* @param jobResultEntry The job result we wish to persist.
- * @throws IOException if the creation of the dirty result failed for IO
reasons.
- * @throws IllegalStateException if the passed {@code jobResultEntry} has
a {@code JobID}
- * attached that is already registered in this {@code JobResultStore}.
+ * @return a successfully completed future with {@code true} if the dirty
result is created
+ * successfully. The method will throw {@link IllegalStateException}
if the passed {@code
Review Comment:
It's not the method that will throw the exception. The future will complete
exceptionally.
##########
flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java:
##########
@@ -381,9 +381,12 @@ public static void rethrowException(Throwable t) throws
Exception {
* @param e exception to throw if not null.
* @throws Exception
*/
- public static void tryRethrowException(@Nullable Exception e) throws
Exception {
+ public static void tryRethrowException(@Nullable Throwable e) throws
Exception {
Review Comment:
```suggestion
public static void tryRethrow(@Nullable Throwable e) throws Throwable {
```
I'm wondering whether we should refactor this properly in a separate hotfix
commit. WDYT?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##########
@@ -43,69 +44,70 @@ public interface JobResultStore {
* Registers the passed {@link JobResultEntry} instance as {@code dirty}
which indicates that
* clean-up operations still need to be performed. Once the job resource
cleanup has been
* finalized, we can mark the {@code JobResultEntry} as {@code clean}
result using {@link
- * #markResultAsClean(JobID)}.
+ * #markResultAsCleanAsync(JobID)}.
*
* @param jobResultEntry The job result we wish to persist.
- * @throws IOException if the creation of the dirty result failed for IO
reasons.
- * @throws IllegalStateException if the passed {@code jobResultEntry} has
a {@code JobID}
- * attached that is already registered in this {@code JobResultStore}.
+ * @return a successfully completed future with {@code true} if the dirty
result is created
+ * successfully. The method will throw {@link IllegalStateException}
if the passed {@code
+ * jobResultEntry} has a {@code JobID} attached that is already
registered in this {@code
+ * JobResultStore}.
*/
- void createDirtyResult(JobResultEntry jobResultEntry) throws IOException,
IllegalStateException;
+ CompletableFuture<Void> createDirtyResultAsync(JobResultEntry
jobResultEntry);
/**
* Marks an existing {@link JobResultEntry} as {@code clean}. This
indicates that no more
* resource cleanup steps need to be performed. No actions should be
triggered if the passed
* {@code JobID} belongs to a job that was already marked as clean.
*
* @param jobId Ident of the job we wish to mark as clean.
- * @throws IOException if marking the {@code dirty} {@code JobResultEntry}
as {@code clean}
- * failed for IO reasons.
- * @throws NoSuchElementException if there is no corresponding {@code
dirty} job present in the
- * store for the given {@code JobID}.
+ * @return a successfully completed future if the result is marked
successfully, The future will
+ * completed with {@link NoSuchElementException} if there is no
corresponding {@code dirty}
+ * job present in the store for the given {@code JobID}.
*/
- void markResultAsClean(JobID jobId) throws IOException,
NoSuchElementException;
+ CompletableFuture<Void> markResultAsCleanAsync(JobID jobId);
/**
- * Returns whether the store already contains an entry for a job.
+ * Returns the future of whether the store already contains an entry for a
job.
*
* @param jobId Ident of the job we wish to check the store for.
- * @return {@code true} if a {@code dirty} or {@code clean} {@link
JobResultEntry} exists for
- * the given {@code JobID}; otherwise {@code false}.
- * @throws IOException if determining whether a job entry is present in
the store failed for IO
- * reasons.
+ * @return a successfully completed future with {@code true} if a {@code
dirty} or {@code clean}
+ * {@link JobResultEntry} exists for the given {@code JobID};
otherwise a successfully
+ * completed future with {@code false}.
*/
- default boolean hasJobResultEntry(JobID jobId) throws IOException {
- return hasDirtyJobResultEntry(jobId) || hasCleanJobResultEntry(jobId);
+ default CompletableFuture<Boolean> hasJobResultEntryAsync(JobID jobId) {
+ return hasDirtyJobResultEntryAsync(jobId)
+ .thenCombine(
+ hasCleanJobResultEntryAsync(jobId),
+ (result1, result2) -> result1 || result2);
}
/**
- * Returns whether the store already contains a {@code dirty} entry for
the given {@code JobID}.
+ * Returns the future of whether the store contains a {@code dirty} entry
for the given {@code
+ * JobID}.
*
* @param jobId Ident of the job we wish to check the store for.
- * @return {@code true}, if a {@code dirty} entry exists for the given
{@code JobID}; otherwise
- * {@code false}.
- * @throws IOException if determining whether a job entry is present in
the store failed for IO
- * reasons.
+ * @return a successfully completed future with {@code true}, if a {@code
dirty} entry exists
+ * for the given {@code JobID}; otherwise a successfully completed
future with {@code
Review Comment:
```suggestion
* for the given {@code JobID}; otherwise {@code
```
nit: no need to add this - it should be understood
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java:
##########
@@ -57,17 +61,17 @@ public void markResultAsCleanInternal(JobID jobId) throws
IOException, NoSuchEle
}
@Override
- public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws
IOException {
+ public Boolean hasDirtyJobResultEntryInternal(JobID jobId) {
return dirtyJobResults.containsKey(jobId);
}
@Override
- public boolean hasCleanJobResultEntryInternal(JobID jobId) throws
IOException {
+ public Boolean hasCleanJobResultEntryInternal(JobID jobId) {
Review Comment:
```suggestion
public boolean hasCleanJobResultEntryInternal(JobID jobId) {
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -277,18 +277,11 @@ private void startJobMasterServiceProcessAsync(UUID
leaderSessionId) {
@GuardedBy("lock")
private void
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID leaderSessionId)
- throws FlinkException {
- try {
- if (jobResultStore.hasJobResultEntry(getJobID())) {
- jobAlreadyDone(leaderSessionId);
- } else {
- createNewJobMasterServiceProcess(leaderSessionId);
- }
- } catch (IOException e) {
- throw new FlinkException(
- String.format(
- "Could not retrieve the job scheduling status for
job %s.", getJobID()),
- e);
+ throws FlinkException, ExecutionException, InterruptedException {
+ if (jobResultStore.hasJobResultEntryAsync(getJobID()).get()) {
Review Comment:
This one, I feel like addressing in a separate review because it requires
some deeper changes. But we could leverage the async nature here as well.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java:
##########
@@ -116,46 +123,74 @@ public void
testBaseDirectoryCreationOnResultStoreInitialization() throws Except
assertThat(emptyBaseDirectory).doesNotExist();
fileSystemJobResultStore =
- new FileSystemJobResultStore(basePath.getFileSystem(),
basePath, false);
+ new FileSystemJobResultStore(
+ basePath.getFileSystem(), basePath, false,
manuallyTriggeredExecutor);
// Result store operations are creating the base directory on-the-fly
assertThat(emptyBaseDirectory).doesNotExist();
- fileSystemJobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+ CompletableFuture<Void> dirtyResultAsync =
+
fileSystemJobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY);
+ manuallyTriggeredExecutor.trigger();
Review Comment:
hint: With my proposal of the `AbstractThreadsafeJobResultStore` these kind
of tests will start to timeout because we're having two separate async
operations being performed (checking that no entry exist and the actual
operation of creating the file). Therefore, if you go with my proposal (or
something similar), you would have to change from `trigger()` to `triggerAll()`
here and in other test methods.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java:
##########
@@ -116,58 +111,53 @@ public static TestingJobResultStore.Builder builder() {
/** {@code Builder} for instantiating {@code TestingJobResultStore}
instances. */
public static class Builder {
- private ThrowingConsumer<JobResultEntry, ? extends IOException>
createDirtyResultConsumer =
- ignored -> {};
- private ThrowingConsumer<JobID, ? extends IOException>
markResultAsCleanConsumer =
- ignored -> {};
+ private Function<JobResultEntry, CompletableFuture<Void>>
createDirtyResultConsumer =
+ jobResultEntry -> CompletableFuture.completedFuture(null);
Review Comment:
```suggestion
jobResultEntry -> FutureUtils.completedVoidFuture();
```
That's a really nitty thing. Feel free to ignore it. I just wanted to point
out that `FutureUtils` has a utility method for that (that would also apply to
some of the other fields if you want to change that).
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -332,11 +332,16 @@ public void testJobBeingMarkedAsDirtyBeforeCleanup()
throws Exception {
TestingJobResultStore.builder()
.withCreateDirtyResultConsumer(
ignoredJobResultEntry -> {
+ CompletableFuture<Boolean>
result =
+ new
CompletableFuture<>();
try {
markAsDirtyLatch.await();
} catch
(InterruptedException e) {
- throw new
RuntimeException(e);
+
result.completeExceptionally(
+ new
RuntimeException(e));
Review Comment:
That is correct. But we don't need the `RuntimeException` anymore. We can
just use `FutureUtils.completeExceptionally(e)` with the `InterruptedException`
here
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java:
##########
@@ -181,21 +185,19 @@ public void markResultAsCleanInternal(JobID jobId) throws
IOException, NoSuchEle
}
@Override
- public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws
IOException {
+ public Boolean hasDirtyJobResultEntryInternal(JobID jobId) throws
IOException {
return fileSystem.exists(constructDirtyPath(jobId));
}
@Override
- public boolean hasCleanJobResultEntryInternal(JobID jobId) throws
IOException {
+ public Boolean hasCleanJobResultEntryInternal(JobID jobId) throws
IOException {
return fileSystem.exists(constructCleanPath(jobId));
}
@Override
public Set<JobResult> getDirtyResultsInternal() throws IOException {
createBasePathIfNeeded();
-
final FileStatus[] statuses = fileSystem.listStatus(this.basePath);
-
Review Comment:
Is there a reason why you removed these lines?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java:
##########
@@ -181,21 +185,19 @@ public void markResultAsCleanInternal(JobID jobId) throws
IOException, NoSuchEle
}
@Override
- public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws
IOException {
+ public Boolean hasDirtyJobResultEntryInternal(JobID jobId) throws
IOException {
return fileSystem.exists(constructDirtyPath(jobId));
}
@Override
- public boolean hasCleanJobResultEntryInternal(JobID jobId) throws
IOException {
+ public Boolean hasCleanJobResultEntryInternal(JobID jobId) throws
IOException {
Review Comment:
```suggestion
public boolean hasCleanJobResultEntryInternal(JobID jobId) throws
IOException {
```
This change is not necessary
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java:
##########
@@ -43,61 +47,126 @@ public abstract class AbstractThreadsafeJobResultStore
implements JobResultStore
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- @Override
- public void createDirtyResult(JobResultEntry jobResultEntry) throws
IOException {
- Preconditions.checkState(
- !hasJobResultEntry(jobResultEntry.getJobId()),
- "Job result store already contains an entry for job %s",
- jobResultEntry.getJobId());
+ private final Executor ioExecutor;
+
+ public AbstractThreadsafeJobResultStore(Executor ioExecutor) {
Review Comment:
```suggestion
protected AbstractThreadsafeJobResultStore(Executor ioExecutor) {
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java:
##########
Review Comment:
Maybe it helps to compare your approach with the proposal above to get a
better understanding of the `CompletableFuture` usage. Feel free to ask
questions about it. And don't hesitate to question my approach! See it as a
discussion item. :-)
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractThreadsafeJobResultStore.java:
##########
Review Comment:
I'm gonna provide a general proposal for this class. It already looks good.
I just think that we could leverage the CompletableFuture utility methods in a
more efficient way. PTAL:
```
package org.apache.flink.runtime.highavailability;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** An abstract class for threadsafe implementations of the {@link
JobResultStore}. */
public abstract class AbstractThreadsafeJobResultStore implements
JobResultStore {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractThreadsafeJobResultStore.class);
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Executor ioExecutor;
protected AbstractThreadsafeJobResultStore(Executor ioExecutor) {
this.ioExecutor = ioExecutor;
}
@Override
public CompletableFuture<Void> createDirtyResultAsync(JobResultEntry
jobResultEntry) {
return hasJobResultEntryAsync(jobResultEntry.getJobId())
.thenAccept(
hasJobResultEntry ->
Preconditions.checkState(
!hasJobResultEntry,
"Job result store already contains
an entry for job %s",
jobResultEntry.getJobId()))
.thenCompose(
ignoredVoid ->
withWriteLockAsync(
() ->
createDirtyResultInternal(jobResultEntry)));
}
@GuardedBy("readWriteLock")
protected abstract void createDirtyResultInternal(JobResultEntry
jobResultEntry)
throws IOException;
@Override
public CompletableFuture<Void> markResultAsCleanAsync(JobID jobId) {
return hasCleanJobResultEntryAsync(jobId)
.thenCompose(
hasCleanJobResultEntry -> {
if (hasCleanJobResultEntry) {
LOG.debug(
"The job {} is already marked as
clean. No action required.",
jobId);
return FutureUtils.completedVoidFuture();
}
return withWriteLockAsync(() ->
markResultAsCleanInternal(jobId));
});
}
@GuardedBy("readWriteLock")
protected abstract void markResultAsCleanInternal(JobID jobId)
throws IOException, NoSuchElementException;
@Override
public CompletableFuture<Boolean> hasJobResultEntryAsync(JobID jobId) {
return withReadLockAsync(
() ->
hasDirtyJobResultEntryInternal(jobId)
|| hasCleanJobResultEntryInternal(jobId));
}
@Override
public CompletableFuture<Boolean> hasDirtyJobResultEntryAsync(JobID
jobId) {
return withReadLockAsync(() ->
hasDirtyJobResultEntryInternal(jobId));
}
@GuardedBy("readWriteLock")
protected abstract Boolean hasDirtyJobResultEntryInternal(JobID jobId)
throws IOException;
@Override
public CompletableFuture<Boolean> hasCleanJobResultEntryAsync(JobID
jobId) {
return withReadLockAsync(() ->
hasCleanJobResultEntryInternal(jobId));
}
@GuardedBy("readWriteLock")
protected abstract Boolean hasCleanJobResultEntryInternal(JobID jobId)
throws IOException;
@Override
public Set<JobResult> getDirtyResults() throws IOException {
return getDirtyResultsInternal();
}
@GuardedBy("readWriteLock")
protected abstract Set<JobResult> getDirtyResultsInternal() throws
IOException;
private CompletableFuture<Void>
withWriteLockAsync(ThrowingRunnable<IOException> runnable) {
return FutureUtils.runAsync(() -> withWriteLock(runnable),
ioExecutor);
}
private void withWriteLock(ThrowingRunnable<IOException> runnable)
throws IOException {
readWriteLock.writeLock().lock();
try {
runnable.run();
} finally {
readWriteLock.writeLock().unlock();
}
}
private <T> CompletableFuture<T> withReadLockAsync(
SupplierWithException<T, IOException> runnable) {
return FutureUtils.supplyAsync(() -> withReadLock(runnable),
ioExecutor);
}
private <T> T withReadLock(SupplierWithException<T, IOException>
supplier) throws IOException {
readWriteLock.readLock().lock();
try {
return supplier.get();
} finally {
readWriteLock.readLock().unlock();
}
}
}
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##########
@@ -43,69 +44,70 @@ public interface JobResultStore {
* Registers the passed {@link JobResultEntry} instance as {@code dirty}
which indicates that
* clean-up operations still need to be performed. Once the job resource
cleanup has been
* finalized, we can mark the {@code JobResultEntry} as {@code clean}
result using {@link
- * #markResultAsClean(JobID)}.
+ * #markResultAsCleanAsync(JobID)}.
*
* @param jobResultEntry The job result we wish to persist.
- * @throws IOException if the creation of the dirty result failed for IO
reasons.
- * @throws IllegalStateException if the passed {@code jobResultEntry} has
a {@code JobID}
- * attached that is already registered in this {@code JobResultStore}.
+ * @return a successfully completed future with {@code true} if the dirty
result is created
+ * successfully. The method will throw {@link IllegalStateException}
if the passed {@code
+ * jobResultEntry} has a {@code JobID} attached that is already
registered in this {@code
+ * JobResultStore}.
*/
- void createDirtyResult(JobResultEntry jobResultEntry) throws IOException,
IllegalStateException;
+ CompletableFuture<Void> createDirtyResultAsync(JobResultEntry
jobResultEntry);
/**
* Marks an existing {@link JobResultEntry} as {@code clean}. This
indicates that no more
* resource cleanup steps need to be performed. No actions should be
triggered if the passed
* {@code JobID} belongs to a job that was already marked as clean.
*
* @param jobId Ident of the job we wish to mark as clean.
- * @throws IOException if marking the {@code dirty} {@code JobResultEntry}
as {@code clean}
- * failed for IO reasons.
- * @throws NoSuchElementException if there is no corresponding {@code
dirty} job present in the
- * store for the given {@code JobID}.
+ * @return a successfully completed future if the result is marked
successfully, The future will
+ * completed with {@link NoSuchElementException} if there is no
corresponding {@code dirty}
+ * job present in the store for the given {@code JobID}.
*/
- void markResultAsClean(JobID jobId) throws IOException,
NoSuchElementException;
+ CompletableFuture<Void> markResultAsCleanAsync(JobID jobId);
/**
- * Returns whether the store already contains an entry for a job.
+ * Returns the future of whether the store already contains an entry for a
job.
*
* @param jobId Ident of the job we wish to check the store for.
- * @return {@code true} if a {@code dirty} or {@code clean} {@link
JobResultEntry} exists for
- * the given {@code JobID}; otherwise {@code false}.
- * @throws IOException if determining whether a job entry is present in
the store failed for IO
- * reasons.
+ * @return a successfully completed future with {@code true} if a {@code
dirty} or {@code clean}
+ * {@link JobResultEntry} exists for the given {@code JobID};
otherwise a successfully
+ * completed future with {@code false}.
*/
- default boolean hasJobResultEntry(JobID jobId) throws IOException {
- return hasDirtyJobResultEntry(jobId) || hasCleanJobResultEntry(jobId);
+ default CompletableFuture<Boolean> hasJobResultEntryAsync(JobID jobId) {
+ return hasDirtyJobResultEntryAsync(jobId)
+ .thenCombine(
+ hasCleanJobResultEntryAsync(jobId),
+ (result1, result2) -> result1 || result2);
}
/**
- * Returns whether the store already contains a {@code dirty} entry for
the given {@code JobID}.
+ * Returns the future of whether the store contains a {@code dirty} entry
for the given {@code
+ * JobID}.
*
* @param jobId Ident of the job we wish to check the store for.
- * @return {@code true}, if a {@code dirty} entry exists for the given
{@code JobID}; otherwise
- * {@code false}.
- * @throws IOException if determining whether a job entry is present in
the store failed for IO
- * reasons.
+ * @return a successfully completed future with {@code true}, if a {@code
dirty} entry exists
+ * for the given {@code JobID}; otherwise a successfully completed
future with {@code
+ * false}.
*/
- boolean hasDirtyJobResultEntry(JobID jobId) throws IOException;
+ CompletableFuture<Boolean> hasDirtyJobResultEntryAsync(JobID jobId);
/**
- * Returns whether the store already contains a {@code clean} entry for
the given {@code JobID}.
+ * Returns the future of whether the store contains a {@code clean} entry
for the given {@code
+ * JobID}.
*
* @param jobId Ident of the job we wish to check the store for.
- * @return {@code true}, if a {@code clean} entry exists for the given
{@code JobID}; otherwise
- * {@code false}.
- * @throws IOException if determining whether a job entry is present in
the store failed for IO
- * reasons.
+ * @return a successfully completed future with {@code true}, if a {@code
clean} entry exists
+ * for the given {@code JobID}; otherwise a successfully completed
future with {@code
+ * false}.
*/
- boolean hasCleanJobResultEntry(JobID jobId) throws IOException;
+ CompletableFuture<Boolean> hasCleanJobResultEntryAsync(JobID jobId);
/**
- * Get the persisted {@link JobResult} instances that are marked as {@code
dirty}. This is
+ * Returns persisted {@link JobResult} instances that are marked as {@code
dirty}. This is
* useful for recovery of finalization steps.
*
- * @return A set of dirty {@code JobResults} from the store.
- * @throws IOException if collecting the set of dirty results failed for
IO reasons.
+ * @return a set of dirty {@code JobResults} from the store.
Review Comment:
The JavaDoc should be reverted as well.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1273,7 +1270,22 @@ private CompletableFuture<Void> removeJob(JobID jobId,
CleanupJobState cleanupJo
if (cleanupJobState.isGlobalCleanup()) {
return globalResourceCleaner
.cleanupAsync(jobId)
- .thenRunAsync(() -> markJobAsClean(jobId), ioExecutor)
+ .thenCompose(unused ->
jobResultStore.markResultAsCleanAsync(jobId))
+ .handle(
+ (BiFunction<Void, Throwable, Void>)
+ (unused, e) -> {
Review Comment:
```suggestion
.thenCompose(unusedVoid ->
jobResultStore.markResultAsCleanAsync(jobId))
.handle(
(unusedVoid, error) -> {
```
The cast is not necessary. Instead, you could use meaningful parameter names
that explain the purpose of each parameter.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1390,29 +1392,71 @@ private CompletableFuture<CleanupJobState>
registerGloballyTerminatedJobInJobRes
"Job %s is in state %s which is not globally terminal.",
jobId,
terminalJobStatus);
-
- ioExecutor.execute(
- () -> {
- try {
- if (jobResultStore.hasCleanJobResultEntry(jobId)) {
- log.warn(
- "Job {} is already marked as clean but
clean up was triggered again.",
- jobId);
- } else if
(!jobResultStore.hasDirtyJobResultEntry(jobId)) {
- jobResultStore.createDirtyResult(
- new JobResultEntry(
-
JobResult.createFrom(archivedExecutionGraph)));
- log.info(
- "Job {} has been registered for cleanup in
the JobResultStore after reaching a terminal state.",
- jobId);
- }
- } catch (IOException e) {
- writeFuture.completeExceptionally(e);
+ CompletableFuture<Boolean> shouldCheckDirtyJobResult =
+ jobResultStore
+ .hasCleanJobResultEntryAsync(jobId)
+ .handleAsync(
+ (hasCleanJobResultEntry, throwable) -> {
+ if (throwable != null) {
+
writeFuture.completeExceptionally(throwable);
+ return false;
+ } else {
+ if (hasCleanJobResultEntry) {
+ log.warn(
+ "Job {} is already marked
as clean but "
+ + "clean up was
triggered again.",
+ jobId);
+ writeFuture.complete(null);
+ return false;
+ } else {
+ return true;
+ }
+ }
+ });
+ shouldCheckDirtyJobResult.whenCompleteAsync(
+ (shouldCheck, throwable1) -> {
+ if (throwable1 != null) {
+ writeFuture.completeExceptionally(throwable1);
return;
}
- writeFuture.complete(null);
+ if (shouldCheck) {
+ jobResultStore
+ .hasDirtyJobResultEntryAsync(jobId)
+ .whenCompleteAsync(
+ (hasDirtyJobResultEntry, throwable2)
-> {
+ if (throwable2 != null) {
+
writeFuture.completeExceptionally(throwable2);
+ return;
+ }
+ if (!hasDirtyJobResultEntry) {
+ jobResultStore
+
.createDirtyResultAsync(
+ new
JobResultEntry(
+
JobResult.createFrom(
+
archivedExecutionGraph)))
+ .whenCompleteAsync(
+ (unused,
throwable3) -> {
+ if
(throwable3 != null) {
+
writeFuture
+
.completeExceptionally(
+
throwable3);
+ return;
+ }
+ log.info(
+
"Job {} has been registered "
+
+ "for cleanup in "
+
+ "the JobResultStore "
+
+ "after reaching a "
+
+ "terminal state.",
+
jobId);
+
writeFuture.complete(null);
+ });
+ } else {
+ writeFuture.complete(null);
+ }
+ });
+ }
Review Comment:
This could be split up into meaningful private methods to handle the
individual if branches:
```
private CompletableFuture<CleanupJobState>
registerGloballyTerminatedJobInJobResultStore(
ExecutionGraphInfo executionGraphInfo) {
final JobID jobId = executionGraphInfo.getJobId();
final AccessExecutionGraph archivedExecutionGraph =
executionGraphInfo.getArchivedExecutionGraph();
final JobStatus terminalJobStatus =
archivedExecutionGraph.getState();
Preconditions.checkArgument(
terminalJobStatus.isGloballyTerminalState(),
"Job %s is in state %s which is not globally terminal.",
jobId,
terminalJobStatus);
return jobResultStore
.hasCleanJobResultEntryAsync(jobId)
.thenCompose(
hasCleanJobResultEntry ->
createDirtyJobResultEntryIfMissingAsync(
archivedExecutionGraph,
hasCleanJobResultEntry))
.handleAsync(
(ignored, error) -> {
if (error != null) {
fatalErrorHandler.onFatalError(
new FlinkException(
String.format(
"The job %s couldn't
be marked as pre-cleanup finished in JobResultStore.",
executionGraphInfo.getJobId()),
error));
}
return
CleanupJobState.globalCleanup(terminalJobStatus);
},
getMainThreadExecutor());
}
/**
* Creates a dirty entry in the {@link #jobResultStore} if there's no
entry at all for the given
* {@code executionGraph} in the {@code JobResultStore}.
*
* @param executionGraph The {@link AccessExecutionGraph} for which the
{@link JobResult} shall
* be persisted.
* @param hasCleanJobResultEntry The decision the dirty entry check is
based on.
* @return {@code CompletableFuture} that completes as soon as the entry
exists.
*/
private CompletableFuture<Void> createDirtyJobResultEntryIfMissingAsync(
AccessExecutionGraph executionGraph, boolean
hasCleanJobResultEntry) {
final JobID jobId = executionGraph.getJobID();
if (hasCleanJobResultEntry) {
log.warn("Job {} is already marked as clean but clean up was
triggered again.", jobId);
return FutureUtils.completedVoidFuture();
} else {
return jobResultStore
.hasDirtyJobResultEntryAsync(jobId)
.thenCompose(
hasDirtyJobResultEntry ->
createDirtyJobResultEntryAsync(
executionGraph,
hasDirtyJobResultEntry));
}
}
/**
* Creates a dirty entry in the {@link #jobResultStore} based on the
passed {@code
* hasDirtyJobResultEntry} flag.
*
* @param executionGraph The {@link AccessExecutionGraph} that is used
to generate the entry.
* @param hasDirtyJobResultEntry The decision the entry creation is
based on.
* @return {@code CompletableFuture} that completes as soon as the entry
exists.
*/
private CompletableFuture<Void> createDirtyJobResultEntryAsync(
AccessExecutionGraph executionGraph, boolean
hasDirtyJobResultEntry) {
if (hasDirtyJobResultEntry) {
return FutureUtils.completedVoidFuture();
}
return jobResultStore.createDirtyResultAsync(
new JobResultEntry(JobResult.createFrom(executionGraph)));
}
```
Having the individual methods should help to understand the actual subtasks
when reading the code. WDYT?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##########
@@ -43,69 +44,70 @@ public interface JobResultStore {
* Registers the passed {@link JobResultEntry} instance as {@code dirty}
which indicates that
* clean-up operations still need to be performed. Once the job resource
cleanup has been
* finalized, we can mark the {@code JobResultEntry} as {@code clean}
result using {@link
- * #markResultAsClean(JobID)}.
+ * #markResultAsCleanAsync(JobID)}.
*
* @param jobResultEntry The job result we wish to persist.
- * @throws IOException if the creation of the dirty result failed for IO
reasons.
- * @throws IllegalStateException if the passed {@code jobResultEntry} has
a {@code JobID}
- * attached that is already registered in this {@code JobResultStore}.
+ * @return a successfully completed future with {@code true} if the dirty
result is created
+ * successfully. The method will throw {@link IllegalStateException}
if the passed {@code
+ * jobResultEntry} has a {@code JobID} attached that is already
registered in this {@code
+ * JobResultStore}.
*/
- void createDirtyResult(JobResultEntry jobResultEntry) throws IOException,
IllegalStateException;
+ CompletableFuture<Void> createDirtyResultAsync(JobResultEntry
jobResultEntry);
/**
* Marks an existing {@link JobResultEntry} as {@code clean}. This
indicates that no more
* resource cleanup steps need to be performed. No actions should be
triggered if the passed
* {@code JobID} belongs to a job that was already marked as clean.
*
* @param jobId Ident of the job we wish to mark as clean.
- * @throws IOException if marking the {@code dirty} {@code JobResultEntry}
as {@code clean}
- * failed for IO reasons.
- * @throws NoSuchElementException if there is no corresponding {@code
dirty} job present in the
- * store for the given {@code JobID}.
+ * @return a successfully completed future if the result is marked
successfully, The future will
+ * completed with {@link NoSuchElementException} if there is no
corresponding {@code dirty}
+ * job present in the store for the given {@code JobID}.
*/
- void markResultAsClean(JobID jobId) throws IOException,
NoSuchElementException;
+ CompletableFuture<Void> markResultAsCleanAsync(JobID jobId);
/**
- * Returns whether the store already contains an entry for a job.
+ * Returns the future of whether the store already contains an entry for a
job.
*
* @param jobId Ident of the job we wish to check the store for.
- * @return {@code true} if a {@code dirty} or {@code clean} {@link
JobResultEntry} exists for
- * the given {@code JobID}; otherwise {@code false}.
- * @throws IOException if determining whether a job entry is present in
the store failed for IO
- * reasons.
+ * @return a successfully completed future with {@code true} if a {@code
dirty} or {@code clean}
+ * {@link JobResultEntry} exists for the given {@code JobID};
otherwise a successfully
+ * completed future with {@code false}.
Review Comment:
```suggestion
* {@link JobResultEntry} exists for the given {@code JobID};
otherwise {@code false}.
```
nit: no need to add this - it should be understood
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java:
##########
@@ -181,21 +185,19 @@ public void markResultAsCleanInternal(JobID jobId) throws
IOException, NoSuchEle
}
@Override
- public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws
IOException {
+ public Boolean hasDirtyJobResultEntryInternal(JobID jobId) throws
IOException {
Review Comment:
```suggestion
public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws
IOException {
```
This change is not necessary
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -580,17 +581,13 @@ private boolean isDuplicateJob(JobID jobId) throws
FlinkException {
* Checks whether the given job has already been executed.
*
* @param jobId identifying the submitted job
- * @return true if the job has already finished, either successfully or as
a failure
+ * @return a successfully completed future with {@code true} if the job
has already finished,
+ * either successfully or as a failure
* @throws FlinkException if the job scheduling status cannot be retrieved
*/
- private boolean isInGloballyTerminalState(JobID jobId) throws
FlinkException {
- try {
- return jobResultStore.hasJobResultEntry(jobId);
- } catch (IOException e) {
- throw new FlinkException(
- String.format("Failed to retrieve job scheduling status
for job %s.", jobId),
- e);
- }
+ private CompletableFuture<Boolean> isInGloballyTerminalState(JobID jobId)
Review Comment:
Here it makes sense to keep the method (even though it's also just calling a
single other method) because `isInGloballyTerminalState` is more descriptive
than `jobResultStore.hasJobResultEntryAsync` :+1:
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java:
##########
@@ -57,17 +61,17 @@ public void markResultAsCleanInternal(JobID jobId) throws
IOException, NoSuchEle
}
@Override
- public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws
IOException {
+ public Boolean hasDirtyJobResultEntryInternal(JobID jobId) {
Review Comment:
```suggestion
public boolean hasDirtyJobResultEntryInternal(JobID jobId) {
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -579,10 +587,15 @@ public void
testErrorHandlingIfJobCannotBeMarkedAsCleanInJobResultStore() throws
final CompletableFuture<JobResultEntry> dirtyJobFuture = new
CompletableFuture<>();
final JobResultStore jobResultStore =
TestingJobResultStore.builder()
-
.withCreateDirtyResultConsumer(dirtyJobFuture::complete)
+ .withCreateDirtyResultConsumer(
+ jobResultEntry -> {
+ dirtyJobFuture.complete(jobResultEntry);
+ return FutureUtils.completedVoidFuture();
+ })
.withMarkResultAsCleanConsumer(
jobId -> {
- throw new IOException("Expected
IOException.");
+ return FutureUtils.completedExceptionally(
Review Comment:
This can be shortened (the brackets are obsolete in a oneliner). Analogously
to your change above.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##########
@@ -43,69 +44,70 @@ public interface JobResultStore {
* Registers the passed {@link JobResultEntry} instance as {@code dirty}
which indicates that
* clean-up operations still need to be performed. Once the job resource
cleanup has been
* finalized, we can mark the {@code JobResultEntry} as {@code clean}
result using {@link
- * #markResultAsClean(JobID)}.
+ * #markResultAsCleanAsync(JobID)}.
*
* @param jobResultEntry The job result we wish to persist.
- * @throws IOException if the creation of the dirty result failed for IO
reasons.
- * @throws IllegalStateException if the passed {@code jobResultEntry} has
a {@code JobID}
- * attached that is already registered in this {@code JobResultStore}.
+ * @return a successfully completed future with {@code true} if the dirty
result is created
+ * successfully. The method will throw {@link IllegalStateException}
if the passed {@code
+ * jobResultEntry} has a {@code JobID} attached that is already
registered in this {@code
+ * JobResultStore}.
*/
- void createDirtyResult(JobResultEntry jobResultEntry) throws IOException,
IllegalStateException;
+ CompletableFuture<Void> createDirtyResultAsync(JobResultEntry
jobResultEntry);
/**
* Marks an existing {@link JobResultEntry} as {@code clean}. This
indicates that no more
* resource cleanup steps need to be performed. No actions should be
triggered if the passed
* {@code JobID} belongs to a job that was already marked as clean.
*
* @param jobId Ident of the job we wish to mark as clean.
- * @throws IOException if marking the {@code dirty} {@code JobResultEntry}
as {@code clean}
- * failed for IO reasons.
- * @throws NoSuchElementException if there is no corresponding {@code
dirty} job present in the
- * store for the given {@code JobID}.
+ * @return a successfully completed future if the result is marked
successfully, The future will
+ * completed with {@link NoSuchElementException} if there is no
corresponding {@code dirty}
+ * job present in the store for the given {@code JobID}.
Review Comment:
We should remove the documentation for the `NoSuchElementException`. That
should be still part of the contract and, therefore, mentioned in the JavaDoc.
Could you add this as part of the returned future documentation as an
additional sentence?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java:
##########
Review Comment:
Removing the `IOException` from the method signatures in
`EmbeddedJobResultStore` could happen in a separate hotfix commit.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/JobResultStoreContractTest.java:
##########
@@ -46,72 +47,97 @@ public interface JobResultStoreContractTest {
@Test
default void testStoreJobResultsWithDuplicateIDsThrowsException() throws
IOException {
JobResultStore jobResultStore = createJobResultStore();
- jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+ jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).join();
final JobResultEntry otherEntryWithDuplicateId =
new JobResultEntry(
TestingJobResultStore.createSuccessfulJobResult(
DUMMY_JOB_RESULT_ENTRY.getJobId()));
- assertThatThrownBy(() ->
jobResultStore.createDirtyResult(otherEntryWithDuplicateId))
- .isInstanceOf(IllegalStateException.class);
+ assertThatThrownBy(
+ () ->
+ jobResultStore
+
.createDirtyResultAsync(otherEntryWithDuplicateId)
+ .join())
+ .isInstanceOf(CompletionException.class);
}
@Test
default void
testStoreDirtyEntryForAlreadyCleanedJobResultThrowsException() throws
IOException {
JobResultStore jobResultStore = createJobResultStore();
- jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
- jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
- assertThatThrownBy(() ->
jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY))
- .isInstanceOf(IllegalStateException.class);
+ jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).join();
+
jobResultStore.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).join();
+ assertThatThrownBy(
+ () ->
jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).join())
+ .isInstanceOf(CompletionException.class);
}
@Test
default void testCleaningDuplicateEntryThrowsNoException() throws
IOException {
JobResultStore jobResultStore = createJobResultStore();
- jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
- jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId());
+ jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).join();
+
jobResultStore.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId()).join();
assertThatNoException()
.isThrownBy(
- () ->
jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId()));
+ () ->
+ jobResultStore
+
.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+ .join());
}
@Test
default void testCleaningNonExistentEntryThrowsException() throws
IOException {
JobResultStore jobResultStore = createJobResultStore();
assertThatThrownBy(
- () ->
jobResultStore.markResultAsClean(DUMMY_JOB_RESULT_ENTRY.getJobId()))
- .isInstanceOf(NoSuchElementException.class);
+ () ->
+ jobResultStore
+
.markResultAsCleanAsync(DUMMY_JOB_RESULT_ENTRY.getJobId())
+ .join())
+ .hasCauseInstanceOf(NoSuchElementException.class);
Review Comment:
Here you asserted the cause properly. Well done :+1:
##########
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/JobResultStoreContractTest.java:
##########
@@ -46,72 +47,97 @@ public interface JobResultStoreContractTest {
@Test
default void testStoreJobResultsWithDuplicateIDsThrowsException() throws
IOException {
JobResultStore jobResultStore = createJobResultStore();
- jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+ jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).join();
final JobResultEntry otherEntryWithDuplicateId =
new JobResultEntry(
TestingJobResultStore.createSuccessfulJobResult(
DUMMY_JOB_RESULT_ENTRY.getJobId()));
- assertThatThrownBy(() ->
jobResultStore.createDirtyResult(otherEntryWithDuplicateId))
- .isInstanceOf(IllegalStateException.class);
+ assertThatThrownBy(
+ () ->
+ jobResultStore
+
.createDirtyResultAsync(otherEntryWithDuplicateId)
+ .join())
+ .isInstanceOf(CompletionException.class);
Review Comment:
we should also validate the cause of
```suggestion
.isInstanceOf(CompletionException.class)
.hasCauseInstanceOf(IllegalStateException.class);
```
We should also assert for the cause of the `CompletionException` because
that's the actual business logic. This statement applies also to the other
methods in this test class. I'm not gonna mark all of them individually.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##########
@@ -332,11 +332,16 @@ public void testJobBeingMarkedAsDirtyBeforeCleanup()
throws Exception {
TestingJobResultStore.builder()
.withCreateDirtyResultConsumer(
ignoredJobResultEntry -> {
+ CompletableFuture<Boolean>
result =
+ new
CompletableFuture<>();
try {
markAsDirtyLatch.await();
} catch
(InterruptedException e) {
- throw new
RuntimeException(e);
+
result.completeExceptionally(
+ new
RuntimeException(e));
Review Comment:
We still should call `Thread.currentThread().interrupt()` before returning
the exceptionally completed future. Something like that:
```
try {
markAsDirtyLatch.await();
} catch
(InterruptedException e) {
Thread.currentThread().interrupt();
return
FutureUtils.completedExceptionally(e);
}
return
FutureUtils.completedVoidFuture();
```
No local variable `result` is needed here.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java:
##########
@@ -116,58 +111,53 @@ public static TestingJobResultStore.Builder builder() {
/** {@code Builder} for instantiating {@code TestingJobResultStore}
instances. */
public static class Builder {
- private ThrowingConsumer<JobResultEntry, ? extends IOException>
createDirtyResultConsumer =
- ignored -> {};
- private ThrowingConsumer<JobID, ? extends IOException>
markResultAsCleanConsumer =
- ignored -> {};
+ private Function<JobResultEntry, CompletableFuture<Void>>
createDirtyResultConsumer =
+ jobResultEntry -> CompletableFuture.completedFuture(null);
+ private Function<JobID, CompletableFuture<Void>>
markResultAsCleanConsumer =
+ jobID -> CompletableFuture.completedFuture(null);
- private FunctionWithException<JobID, Boolean, ? extends IOException>
- hasJobResultEntryFunction = ignored -> false;
- private FunctionWithException<JobID, Boolean, ? extends IOException>
- hasDirtyJobResultEntryFunction = ignored -> false;
- private FunctionWithException<JobID, Boolean, ? extends IOException>
- hasCleanJobResultEntryFunction = ignored -> false;
+ private Function<JobID, CompletableFuture<Boolean>>
hasJobResultEntryFunction =
+ jobID -> CompletableFuture.completedFuture(false);
+ private Function<JobID, CompletableFuture<Boolean>>
hasDirtyJobResultEntryFunction =
+ jobID -> CompletableFuture.completedFuture(false);
+ private Function<JobID, CompletableFuture<Boolean>>
hasCleanJobResultEntryFunction =
+ jobID -> CompletableFuture.completedFuture(false);
- private SupplierWithException<Set<JobResult>, ? extends IOException>
- getDirtyResultsSupplier = Collections::emptySet;
+ private Supplier<Set<JobResult>> dirtyResultsSupplier =
Collections::emptySet;
public Builder withCreateDirtyResultConsumer(
- ThrowingConsumer<JobResultEntry, ? extends IOException>
createDirtyResultConsumer) {
+ Function<JobResultEntry, CompletableFuture<Void>>
createDirtyResultConsumer) {
this.createDirtyResultConsumer = createDirtyResultConsumer;
return this;
}
public Builder withMarkResultAsCleanConsumer(
- ThrowingConsumer<JobID, ? extends IOException>
markResultAsCleanConsumer) {
+ Function<JobID, CompletableFuture<Void>>
markResultAsCleanConsumer) {
this.markResultAsCleanConsumer = markResultAsCleanConsumer;
return this;
}
public Builder withHasJobResultEntryFunction(
- FunctionWithException<JobID, Boolean, ? extends IOException>
- hasJobResultEntryFunction) {
+ Function<JobID, CompletableFuture<Boolean>>
hasJobResultEntryFunction) {
this.hasJobResultEntryFunction = hasJobResultEntryFunction;
return this;
}
public Builder withHasDirtyJobResultEntryFunction(
- FunctionWithException<JobID, Boolean, ? extends IOException>
- hasDirtyJobResultEntryFunction) {
+ Function<JobID, CompletableFuture<Boolean>>
hasDirtyJobResultEntryFunction) {
this.hasDirtyJobResultEntryFunction =
hasDirtyJobResultEntryFunction;
return this;
}
public Builder withHasCleanJobResultEntryFunction(
- FunctionWithException<JobID, Boolean, ? extends IOException>
- hasCleanJobResultEntryFunction) {
+ Function<JobID, CompletableFuture<Boolean>>
hasCleanJobResultEntryFunction) {
this.hasCleanJobResultEntryFunction =
hasCleanJobResultEntryFunction;
return this;
}
public Builder withGetDirtyResultsSupplier(
- SupplierWithException<Set<JobResult>, ? extends IOException>
- getDirtyResultsSupplier) {
- this.getDirtyResultsSupplier = getDirtyResultsSupplier;
+ Supplier<Set<JobResult>> getDirtyResultsSupplier) {
Review Comment:
I guess the `getDirtyResults` field doesn't need to change.
##########
flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java:
##########
@@ -381,9 +381,12 @@ public static void rethrowException(Throwable t) throws
Exception {
* @param e exception to throw if not null.
* @throws Exception
*/
- public static void tryRethrowException(@Nullable Exception e) throws
Exception {
+ public static void tryRethrowException(@Nullable Throwable e) throws
Exception {
Review Comment:
This change might not be necessary with the proposed changes below.
:thinking:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]