XComp commented on a change in pull request #18644:
URL: https://github.com/apache/flink/pull/18644#discussion_r805312063



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
##########
@@ -73,24 +76,24 @@
     // we have to have two separate futures because closeAsync relies on the 
completion of
     // getResultFuture which is always already completed but the cleanupFuture 
is only
     // instantiated when calling start
-    private CompletableFuture<Void> cleanupFuture;
+    @Nullable private CompletableFuture<Void> cleanupFuture;

Review comment:
       This is necessary to synchronize the `start` and `closeAsync` methods of 
the `CheckpointResourcesCleanupRunner`. `closeAsync` would be possible to 
complete before calling `start` which is problematic because the `start` method 
is called outside of registering the `CheckpointResourcesCleanupRunner` in the 
`Dispatcher`'s `JobManagerRunnerRegistry` which gets cleaned after the 
`CheckpointResourcesCleanupRunner.getResultFuture` is finished (which is the 
case right away when initializing the runner).
   
   Explaining that, I realize that it's probably a cleaner approach where the 
resultfuture completes after the `start` method is called. That would enable us 
to merge the two future `closeFuture` and `cleanupFuture`. I will try that...

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java
##########
@@ -118,6 +122,93 @@ public static Builder builder() {
         return new Builder();
     }
 
+    /**
+     * Creates {@code Builder} that is initialized to similate a {@link 
JobManagerRunnerRegistry}
+     * implementation based on a single-{@link JobManagerRunner}. The default 
test implementation
+     * follows the {@code JobManagerRunnerRegistry} contract. This {@code 
JobManagerRunner} instance
+     * is held by the {@code AtomicReference}.
+     *
+     * @param singleRunnerReference the reference to the internally held 
{@code JobManagerRunner}.
+     */
+    public static Builder builderFromSingleEntry(

Review comment:
       I like that 👍 

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java
##########
@@ -118,6 +122,93 @@ public static Builder builder() {
         return new Builder();
     }
 
+    /**
+     * Creates {@code Builder} that is initialized to similate a {@link 
JobManagerRunnerRegistry}
+     * implementation based on a single-{@link JobManagerRunner}. The default 
test implementation
+     * follows the {@code JobManagerRunnerRegistry} contract. This {@code 
JobManagerRunner} instance
+     * is held by the {@code AtomicReference}.
+     *
+     * @param singleRunnerReference the reference to the internally held 
{@code JobManagerRunner}.
+     */
+    public static Builder builderFromSingleEntry(
+            AtomicReference<JobManagerRunner> singleRunnerReference) {
+        return builder()
+                .withRegisterConsumer(
+                        jobManagerRunner -> {
+                            
Preconditions.checkState(singleRunnerReference.get() == null);
+                            singleRunnerReference.set(jobManagerRunner);
+                        })
+                .withIsRegisteredFunction(
+                        jobId ->
+                                accessReference(
+                                        singleRunnerReference,
+                                        jobManagerRunner ->
+                                                
jobManagerRunner.getJobID().equals(jobId),
+                                        () -> false))
+                .withGetFunction(
+                        jobId ->
+                                accessReference(
+                                        singleRunnerReference,
+                                        Function.identity(),
+                                        () -> 
throwNoSuchElementException(jobId)))
+                .withGetJobManagerRunnersSupplier(
+                        () -> 
CollectionUtil.ofNullable(singleRunnerReference.get()))
+                .withSizeSupplier(() -> singleRunnerReference.get() == null ? 
0 : 1)
+                .withGetRunningJobIdsSupplier(
+                        () ->
+                                accessReference(
+                                        singleRunnerReference,
+                                        jobManagerRunner ->
+                                                
Collections.singleton(jobManagerRunner.getJobID()),
+                                        Collections::emptySet))
+                .withUnregisterFunction(
+                        jobId ->
+                                accessReference(
+                                        singleRunnerReference,
+                                        jobManagerRunner -> {
+                                            if 
(jobManagerRunner.getJobID().equals(jobId)) {
+                                                return 
singleRunnerReference.getAndSet(null);
+                                            }
+
+                                            return 
throwNoSuchElementException(jobId);
+                                        },
+                                        () -> 
throwNoSuchElementException(jobId)))

Review comment:
       Good suggestion 👍 

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java
##########
@@ -118,6 +122,93 @@ public static Builder builder() {
         return new Builder();
     }
 
+    /**
+     * Creates {@code Builder} that is initialized to similate a {@link 
JobManagerRunnerRegistry}
+     * implementation based on a single-{@link JobManagerRunner}. The default 
test implementation
+     * follows the {@code JobManagerRunnerRegistry} contract. This {@code 
JobManagerRunner} instance
+     * is held by the {@code AtomicReference}.
+     *
+     * @param singleRunnerReference the reference to the internally held 
{@code JobManagerRunner}.
+     */
+    public static Builder builderFromSingleEntry(
+            AtomicReference<JobManagerRunner> singleRunnerReference) {
+        return builder()
+                .withRegisterConsumer(
+                        jobManagerRunner -> {
+                            
Preconditions.checkState(singleRunnerReference.get() == null);
+                            singleRunnerReference.set(jobManagerRunner);
+                        })
+                .withIsRegisteredFunction(
+                        jobId ->
+                                accessReference(
+                                        singleRunnerReference,
+                                        jobManagerRunner ->
+                                                
jobManagerRunner.getJobID().equals(jobId),
+                                        () -> false))
+                .withGetFunction(
+                        jobId ->
+                                accessReference(
+                                        singleRunnerReference,
+                                        Function.identity(),
+                                        () -> 
throwNoSuchElementException(jobId)))
+                .withGetJobManagerRunnersSupplier(
+                        () -> 
CollectionUtil.ofNullable(singleRunnerReference.get()))
+                .withSizeSupplier(() -> singleRunnerReference.get() == null ? 
0 : 1)
+                .withGetRunningJobIdsSupplier(
+                        () ->
+                                accessReference(
+                                        singleRunnerReference,
+                                        jobManagerRunner ->
+                                                
Collections.singleton(jobManagerRunner.getJobID()),
+                                        Collections::emptySet))
+                .withUnregisterFunction(
+                        jobId ->
+                                accessReference(
+                                        singleRunnerReference,
+                                        jobManagerRunner -> {
+                                            if 
(jobManagerRunner.getJobID().equals(jobId)) {
+                                                return 
singleRunnerReference.getAndSet(null);
+                                            }
+
+                                            return 
throwNoSuchElementException(jobId);
+                                        },
+                                        () -> 
throwNoSuchElementException(jobId)))
+                .withGlobalCleanupAsyncFunction(
+                        (actualJobId, ignoredExecutor) ->
+                                cleanup(singleRunnerReference, actualJobId))
+                .withLocalCleanupAsyncFunction(
+                        (actualJobId, executor) -> 
cleanup(singleRunnerReference, actualJobId));
+    }
+
+    private static CompletableFuture<Void> cleanup(
+            AtomicReference<JobManagerRunner> singleRunnerReference, JobID 
actualJobId) {
+        return accessReference(
+                singleRunnerReference,
+                jobManagerRunner -> {
+                    if (jobManagerRunner.getJobID().equals(actualJobId)) {
+                        singleRunnerReference.getAndSet(null).closeAsync();
+                    }
+
+                    return FutureUtils.completedVoidFuture();
+                },
+                FutureUtils::completedVoidFuture);
+    }
+
+    private static JobManagerRunner throwNoSuchElementException(JobID jobId) {
+        throw new NoSuchElementException(
+                "JobManagerRunner with job ID " + jobId + " is not 
registered.");
+    }
+
+    private static <T> T accessReference(

Review comment:
       This method got removed using the `Optional.ofNullable` approach.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
##########
@@ -280,6 +343,21 @@ private JobGraphStore 
createAndStartJobGraphStoreWithCleanupFailures(
             AtomicInteger actualCleanupCallCount,
             OneShotLatch successfulCleanupLatch)
             throws Exception {
+        return createAndStartJobGraphStoreWithCleanupFailures(
+                numberOfCleanupFailures,
+                throwable,
+                actualCleanupCallCount,
+                successfulCleanupLatch,
+                FutureUtils.completedVoidFuture());
+    }
+
+    private JobGraphStore createAndStartJobGraphStoreWithCleanupFailures(

Review comment:
       true, that was missed during some intermediate refactoring 👍 

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
##########
@@ -179,6 +188,60 @@ public void testCleanupThroughRetries() throws Exception {
                 "The JobResultStore should have this job marked as clean.");
     }
 
+    @Test
+    public void testCleanupNotCancellable() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+        final JobID jobId = jobGraph.getJobID();
+
+        final JobResultStore jobResultStore = new EmbeddedJobResultStore();
+        jobResultStore.createDirtyResult(
+                new 
JobResultEntry(TestingJobResultStore.createSuccessfulJobResult(jobId)));
+        haServices.setJobResultStore(jobResultStore);
+
+        // Instantiates JobManagerRunner
+        final CompletableFuture<Void> jobManagerRunnerCleanupFuture = new 
CompletableFuture<>();
+        final AtomicReference<JobManagerRunner> jobManagerRunnerEntry = new 
AtomicReference<>();
+        final JobManagerRunnerRegistry jobManagerRunnerRegistry =
+                
TestingJobManagerRunnerRegistry.builderFromSingleEntry(jobManagerRunnerEntry)
+                        .withGlobalCleanupAsyncFunction(
+                                (actualJobId, executor) -> 
jobManagerRunnerCleanupFuture)
+                        .build();
+
+        final Dispatcher dispatcher =
+                createTestingDispatcherBuilder()
+                        .setJobManagerRunnerRegistry(jobManagerRunnerRegistry)
+                        .build();
+        dispatcher.start();
+
+        toTerminate.add(dispatcher);
+
+        CommonTestUtils.waitUntilCondition(
+                () -> jobManagerRunnerEntry.get() != null,
+                Deadline.fromNow(Duration.ofSeconds(10)),
+                "JobManagerRunner wasn't loaded in time.");
+
+        assertThat(
+                "The JobResultStore should have this job still marked as 
dirty.",
+                haServices.getJobResultStore().hasDirtyJobResultEntry(jobId),
+                CoreMatchers.is(true));
+
+        final DispatcherGateway dispatcherGateway =
+                dispatcher.getSelfGateway(DispatcherGateway.class);
+
+        try {
+            dispatcherGateway.cancelJob(jobId, TIMEOUT).get();
+            Assert.fail("Should fail because cancelling the cleanup is not 
allowed.");
+        } catch (Throwable e) {
+            assertThat(e, FlinkMatchers.containsCause(FlinkException.class));

Review comment:
       good point. I introduced a new `JobCancellationFailedException` to cover 
this specific use-case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to