XComp commented on a change in pull request #18644:
URL: https://github.com/apache/flink/pull/18644#discussion_r805312063
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
##########
@@ -73,24 +76,24 @@
// we have to have two separate futures because closeAsync relies on the
completion of
// getResultFuture which is always already completed but the cleanupFuture
is only
// instantiated when calling start
- private CompletableFuture<Void> cleanupFuture;
+ @Nullable private CompletableFuture<Void> cleanupFuture;
Review comment:
This is necessary to synchronize the `start` and `closeAsync` methods of
the `CheckpointResourcesCleanupRunner`. `closeAsync` would be possible to
complete before calling `start` which is problematic because the `start` method
is called outside of registering the `CheckpointResourcesCleanupRunner` in the
`Dispatcher`'s `JobManagerRunnerRegistry` which gets cleaned after the
`CheckpointResourcesCleanupRunner.getResultFuture` is finished (which is the
case right away when initializing the runner).
Explaining that, I realize that it's probably a cleaner approach where the
resultfuture completes after the `start` method is called. That would enable us
to merge the two future `closeFuture` and `cleanupFuture`. I will try that...
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java
##########
@@ -118,6 +122,93 @@ public static Builder builder() {
return new Builder();
}
+ /**
+ * Creates {@code Builder} that is initialized to similate a {@link
JobManagerRunnerRegistry}
+ * implementation based on a single-{@link JobManagerRunner}. The default
test implementation
+ * follows the {@code JobManagerRunnerRegistry} contract. This {@code
JobManagerRunner} instance
+ * is held by the {@code AtomicReference}.
+ *
+ * @param singleRunnerReference the reference to the internally held
{@code JobManagerRunner}.
+ */
+ public static Builder builderFromSingleEntry(
Review comment:
I like that 👍
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java
##########
@@ -118,6 +122,93 @@ public static Builder builder() {
return new Builder();
}
+ /**
+ * Creates {@code Builder} that is initialized to similate a {@link
JobManagerRunnerRegistry}
+ * implementation based on a single-{@link JobManagerRunner}. The default
test implementation
+ * follows the {@code JobManagerRunnerRegistry} contract. This {@code
JobManagerRunner} instance
+ * is held by the {@code AtomicReference}.
+ *
+ * @param singleRunnerReference the reference to the internally held
{@code JobManagerRunner}.
+ */
+ public static Builder builderFromSingleEntry(
+ AtomicReference<JobManagerRunner> singleRunnerReference) {
+ return builder()
+ .withRegisterConsumer(
+ jobManagerRunner -> {
+
Preconditions.checkState(singleRunnerReference.get() == null);
+ singleRunnerReference.set(jobManagerRunner);
+ })
+ .withIsRegisteredFunction(
+ jobId ->
+ accessReference(
+ singleRunnerReference,
+ jobManagerRunner ->
+
jobManagerRunner.getJobID().equals(jobId),
+ () -> false))
+ .withGetFunction(
+ jobId ->
+ accessReference(
+ singleRunnerReference,
+ Function.identity(),
+ () ->
throwNoSuchElementException(jobId)))
+ .withGetJobManagerRunnersSupplier(
+ () ->
CollectionUtil.ofNullable(singleRunnerReference.get()))
+ .withSizeSupplier(() -> singleRunnerReference.get() == null ?
0 : 1)
+ .withGetRunningJobIdsSupplier(
+ () ->
+ accessReference(
+ singleRunnerReference,
+ jobManagerRunner ->
+
Collections.singleton(jobManagerRunner.getJobID()),
+ Collections::emptySet))
+ .withUnregisterFunction(
+ jobId ->
+ accessReference(
+ singleRunnerReference,
+ jobManagerRunner -> {
+ if
(jobManagerRunner.getJobID().equals(jobId)) {
+ return
singleRunnerReference.getAndSet(null);
+ }
+
+ return
throwNoSuchElementException(jobId);
+ },
+ () ->
throwNoSuchElementException(jobId)))
Review comment:
Good suggestion 👍
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java
##########
@@ -118,6 +122,93 @@ public static Builder builder() {
return new Builder();
}
+ /**
+ * Creates {@code Builder} that is initialized to similate a {@link
JobManagerRunnerRegistry}
+ * implementation based on a single-{@link JobManagerRunner}. The default
test implementation
+ * follows the {@code JobManagerRunnerRegistry} contract. This {@code
JobManagerRunner} instance
+ * is held by the {@code AtomicReference}.
+ *
+ * @param singleRunnerReference the reference to the internally held
{@code JobManagerRunner}.
+ */
+ public static Builder builderFromSingleEntry(
+ AtomicReference<JobManagerRunner> singleRunnerReference) {
+ return builder()
+ .withRegisterConsumer(
+ jobManagerRunner -> {
+
Preconditions.checkState(singleRunnerReference.get() == null);
+ singleRunnerReference.set(jobManagerRunner);
+ })
+ .withIsRegisteredFunction(
+ jobId ->
+ accessReference(
+ singleRunnerReference,
+ jobManagerRunner ->
+
jobManagerRunner.getJobID().equals(jobId),
+ () -> false))
+ .withGetFunction(
+ jobId ->
+ accessReference(
+ singleRunnerReference,
+ Function.identity(),
+ () ->
throwNoSuchElementException(jobId)))
+ .withGetJobManagerRunnersSupplier(
+ () ->
CollectionUtil.ofNullable(singleRunnerReference.get()))
+ .withSizeSupplier(() -> singleRunnerReference.get() == null ?
0 : 1)
+ .withGetRunningJobIdsSupplier(
+ () ->
+ accessReference(
+ singleRunnerReference,
+ jobManagerRunner ->
+
Collections.singleton(jobManagerRunner.getJobID()),
+ Collections::emptySet))
+ .withUnregisterFunction(
+ jobId ->
+ accessReference(
+ singleRunnerReference,
+ jobManagerRunner -> {
+ if
(jobManagerRunner.getJobID().equals(jobId)) {
+ return
singleRunnerReference.getAndSet(null);
+ }
+
+ return
throwNoSuchElementException(jobId);
+ },
+ () ->
throwNoSuchElementException(jobId)))
+ .withGlobalCleanupAsyncFunction(
+ (actualJobId, ignoredExecutor) ->
+ cleanup(singleRunnerReference, actualJobId))
+ .withLocalCleanupAsyncFunction(
+ (actualJobId, executor) ->
cleanup(singleRunnerReference, actualJobId));
+ }
+
+ private static CompletableFuture<Void> cleanup(
+ AtomicReference<JobManagerRunner> singleRunnerReference, JobID
actualJobId) {
+ return accessReference(
+ singleRunnerReference,
+ jobManagerRunner -> {
+ if (jobManagerRunner.getJobID().equals(actualJobId)) {
+ singleRunnerReference.getAndSet(null).closeAsync();
+ }
+
+ return FutureUtils.completedVoidFuture();
+ },
+ FutureUtils::completedVoidFuture);
+ }
+
+ private static JobManagerRunner throwNoSuchElementException(JobID jobId) {
+ throw new NoSuchElementException(
+ "JobManagerRunner with job ID " + jobId + " is not
registered.");
+ }
+
+ private static <T> T accessReference(
Review comment:
This method got removed using the `Optional.ofNullable` approach.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
##########
@@ -280,6 +343,21 @@ private JobGraphStore
createAndStartJobGraphStoreWithCleanupFailures(
AtomicInteger actualCleanupCallCount,
OneShotLatch successfulCleanupLatch)
throws Exception {
+ return createAndStartJobGraphStoreWithCleanupFailures(
+ numberOfCleanupFailures,
+ throwable,
+ actualCleanupCallCount,
+ successfulCleanupLatch,
+ FutureUtils.completedVoidFuture());
+ }
+
+ private JobGraphStore createAndStartJobGraphStoreWithCleanupFailures(
Review comment:
true, that was missed during some intermediate refactoring 👍
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
##########
@@ -179,6 +188,60 @@ public void testCleanupThroughRetries() throws Exception {
"The JobResultStore should have this job marked as clean.");
}
+ @Test
+ public void testCleanupNotCancellable() throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+ final JobID jobId = jobGraph.getJobID();
+
+ final JobResultStore jobResultStore = new EmbeddedJobResultStore();
+ jobResultStore.createDirtyResult(
+ new
JobResultEntry(TestingJobResultStore.createSuccessfulJobResult(jobId)));
+ haServices.setJobResultStore(jobResultStore);
+
+ // Instantiates JobManagerRunner
+ final CompletableFuture<Void> jobManagerRunnerCleanupFuture = new
CompletableFuture<>();
+ final AtomicReference<JobManagerRunner> jobManagerRunnerEntry = new
AtomicReference<>();
+ final JobManagerRunnerRegistry jobManagerRunnerRegistry =
+
TestingJobManagerRunnerRegistry.builderFromSingleEntry(jobManagerRunnerEntry)
+ .withGlobalCleanupAsyncFunction(
+ (actualJobId, executor) ->
jobManagerRunnerCleanupFuture)
+ .build();
+
+ final Dispatcher dispatcher =
+ createTestingDispatcherBuilder()
+ .setJobManagerRunnerRegistry(jobManagerRunnerRegistry)
+ .build();
+ dispatcher.start();
+
+ toTerminate.add(dispatcher);
+
+ CommonTestUtils.waitUntilCondition(
+ () -> jobManagerRunnerEntry.get() != null,
+ Deadline.fromNow(Duration.ofSeconds(10)),
+ "JobManagerRunner wasn't loaded in time.");
+
+ assertThat(
+ "The JobResultStore should have this job still marked as
dirty.",
+ haServices.getJobResultStore().hasDirtyJobResultEntry(jobId),
+ CoreMatchers.is(true));
+
+ final DispatcherGateway dispatcherGateway =
+ dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ try {
+ dispatcherGateway.cancelJob(jobId, TIMEOUT).get();
+ Assert.fail("Should fail because cancelling the cleanup is not
allowed.");
+ } catch (Throwable e) {
+ assertThat(e, FlinkMatchers.containsCause(FlinkException.class));
Review comment:
good point. I introduced a new `JobCancellationFailedException` to cover
this specific use-case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]