dmvk commented on a change in pull request #18644:
URL: https://github.com/apache/flink/pull/18644#discussion_r805302662



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java
##########
@@ -118,6 +122,93 @@ public static Builder builder() {
         return new Builder();
     }
 
+    /**
+     * Creates {@code Builder} that is initialized to similate a {@link 
JobManagerRunnerRegistry}
+     * implementation based on a single-{@link JobManagerRunner}. The default 
test implementation
+     * follows the {@code JobManagerRunnerRegistry} contract. This {@code 
JobManagerRunner} instance
+     * is held by the {@code AtomicReference}.
+     *
+     * @param singleRunnerReference the reference to the internally held 
{@code JobManagerRunner}.
+     */
+    public static Builder builderFromSingleEntry(

Review comment:
       This took me some time to understand, maybe something along the lines of 
`newSingleJobBuilder`, could be more explicit

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
##########
@@ -73,24 +76,24 @@
     // we have to have two separate futures because closeAsync relies on the 
completion of
     // getResultFuture which is always already completed but the cleanupFuture 
is only
     // instantiated when calling start
-    private CompletableFuture<Void> cleanupFuture;
+    @Nullable private CompletableFuture<Void> cleanupFuture;

Review comment:
       Do we still need an extra future for cleanup? It seems that having the 
`closeFuture` only should be enough.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
##########
@@ -179,6 +188,60 @@ public void testCleanupThroughRetries() throws Exception {
                 "The JobResultStore should have this job marked as clean.");
     }
 
+    @Test
+    public void testCleanupNotCancellable() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+        final JobID jobId = jobGraph.getJobID();
+
+        final JobResultStore jobResultStore = new EmbeddedJobResultStore();
+        jobResultStore.createDirtyResult(
+                new 
JobResultEntry(TestingJobResultStore.createSuccessfulJobResult(jobId)));
+        haServices.setJobResultStore(jobResultStore);
+
+        // Instantiates JobManagerRunner
+        final CompletableFuture<Void> jobManagerRunnerCleanupFuture = new 
CompletableFuture<>();
+        final AtomicReference<JobManagerRunner> jobManagerRunnerEntry = new 
AtomicReference<>();
+        final JobManagerRunnerRegistry jobManagerRunnerRegistry =
+                
TestingJobManagerRunnerRegistry.builderFromSingleEntry(jobManagerRunnerEntry)
+                        .withGlobalCleanupAsyncFunction(
+                                (actualJobId, executor) -> 
jobManagerRunnerCleanupFuture)
+                        .build();
+
+        final Dispatcher dispatcher =
+                createTestingDispatcherBuilder()
+                        .setJobManagerRunnerRegistry(jobManagerRunnerRegistry)
+                        .build();
+        dispatcher.start();
+
+        toTerminate.add(dispatcher);
+
+        CommonTestUtils.waitUntilCondition(
+                () -> jobManagerRunnerEntry.get() != null,
+                Deadline.fromNow(Duration.ofSeconds(10)),
+                "JobManagerRunner wasn't loaded in time.");
+
+        assertThat(
+                "The JobResultStore should have this job still marked as 
dirty.",
+                haServices.getJobResultStore().hasDirtyJobResultEntry(jobId),
+                CoreMatchers.is(true));
+
+        final DispatcherGateway dispatcherGateway =
+                dispatcher.getSelfGateway(DispatcherGateway.class);
+
+        try {
+            dispatcherGateway.cancelJob(jobId, TIMEOUT).get();
+            Assert.fail("Should fail because cancelling the cleanup is not 
allowed.");
+        } catch (Throwable e) {
+            assertThat(e, FlinkMatchers.containsCause(FlinkException.class));

Review comment:
       nit: it would be great if we could also assert the exception message 
here, as the `FlinkException` could hide lot of things (I've recently had a 
hard time debugging an instability because of this -> the exception meant 
something else)

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java
##########
@@ -118,6 +122,93 @@ public static Builder builder() {
         return new Builder();
     }
 
+    /**
+     * Creates {@code Builder} that is initialized to similate a {@link 
JobManagerRunnerRegistry}
+     * implementation based on a single-{@link JobManagerRunner}. The default 
test implementation
+     * follows the {@code JobManagerRunnerRegistry} contract. This {@code 
JobManagerRunner} instance
+     * is held by the {@code AtomicReference}.
+     *
+     * @param singleRunnerReference the reference to the internally held 
{@code JobManagerRunner}.
+     */
+    public static Builder builderFromSingleEntry(
+            AtomicReference<JobManagerRunner> singleRunnerReference) {
+        return builder()
+                .withRegisterConsumer(
+                        jobManagerRunner -> {
+                            
Preconditions.checkState(singleRunnerReference.get() == null);
+                            singleRunnerReference.set(jobManagerRunner);
+                        })
+                .withIsRegisteredFunction(
+                        jobId ->
+                                accessReference(
+                                        singleRunnerReference,
+                                        jobManagerRunner ->
+                                                
jobManagerRunner.getJobID().equals(jobId),
+                                        () -> false))
+                .withGetFunction(
+                        jobId ->
+                                accessReference(
+                                        singleRunnerReference,
+                                        Function.identity(),
+                                        () -> 
throwNoSuchElementException(jobId)))
+                .withGetJobManagerRunnersSupplier(
+                        () -> 
CollectionUtil.ofNullable(singleRunnerReference.get()))
+                .withSizeSupplier(() -> singleRunnerReference.get() == null ? 
0 : 1)
+                .withGetRunningJobIdsSupplier(
+                        () ->
+                                accessReference(
+                                        singleRunnerReference,
+                                        jobManagerRunner ->
+                                                
Collections.singleton(jobManagerRunner.getJobID()),
+                                        Collections::emptySet))
+                .withUnregisterFunction(
+                        jobId ->
+                                accessReference(
+                                        singleRunnerReference,
+                                        jobManagerRunner -> {
+                                            if 
(jobManagerRunner.getJobID().equals(jobId)) {
+                                                return 
singleRunnerReference.getAndSet(null);
+                                            }
+
+                                            return 
throwNoSuchElementException(jobId);
+                                        },
+                                        () -> 
throwNoSuchElementException(jobId)))

Review comment:
       nit: we can use standard java api (optionals) here instead, which might 
be bit more readable as others are already familiar with that
   
   ```suggestion
                   .withIsRegisteredFunction(
                           jobId ->
                                   
Optional.ofNullable(singleRunnerReference.get())
                                           .map(JobManagerRunner::getJobID)
                                           .map(jobId::equals)
                                           .orElse(false))
                   .withGetFunction(
                           jobId ->
                                   
Optional.ofNullable(singleRunnerReference.get())
                                           .orElseThrow(() -> 
newNoSuchElementException(jobId)))
                   .withGetJobManagerRunnersSupplier(
                           () -> 
CollectionUtil.ofNullable(singleRunnerReference.get()))
                   .withSizeSupplier(() -> singleRunnerReference.get() == null 
? 0 : 1)
                   .withGetRunningJobIdsSupplier(
                           () ->
                                   
Optional.ofNullable(singleRunnerReference.get())
                                           .map(JobManagerRunner::getJobID)
                                           .map(Collections::singleton)
                                           .orElseGet(Collections::emptySet))
                   .withUnregisterFunction(
                           jobId ->
                                   
Optional.ofNullable(singleRunnerReference.get())
                                           .map(JobManagerRunner::getJobID)
                                           .filter(jobId::equals)
                                           .map(ignored -> 
singleRunnerReference.getAndSet(null))
                                           .orElseThrow(() -> 
newNoSuchElementException(jobId)))
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java
##########
@@ -118,6 +122,93 @@ public static Builder builder() {
         return new Builder();
     }
 
+    /**
+     * Creates {@code Builder} that is initialized to similate a {@link 
JobManagerRunnerRegistry}
+     * implementation based on a single-{@link JobManagerRunner}. The default 
test implementation
+     * follows the {@code JobManagerRunnerRegistry} contract. This {@code 
JobManagerRunner} instance
+     * is held by the {@code AtomicReference}.
+     *
+     * @param singleRunnerReference the reference to the internally held 
{@code JobManagerRunner}.
+     */
+    public static Builder builderFromSingleEntry(
+            AtomicReference<JobManagerRunner> singleRunnerReference) {
+        return builder()
+                .withRegisterConsumer(
+                        jobManagerRunner -> {
+                            
Preconditions.checkState(singleRunnerReference.get() == null);
+                            singleRunnerReference.set(jobManagerRunner);
+                        })
+                .withIsRegisteredFunction(
+                        jobId ->
+                                accessReference(
+                                        singleRunnerReference,
+                                        jobManagerRunner ->
+                                                
jobManagerRunner.getJobID().equals(jobId),
+                                        () -> false))
+                .withGetFunction(
+                        jobId ->
+                                accessReference(
+                                        singleRunnerReference,
+                                        Function.identity(),
+                                        () -> 
throwNoSuchElementException(jobId)))
+                .withGetJobManagerRunnersSupplier(
+                        () -> 
CollectionUtil.ofNullable(singleRunnerReference.get()))
+                .withSizeSupplier(() -> singleRunnerReference.get() == null ? 
0 : 1)
+                .withGetRunningJobIdsSupplier(
+                        () ->
+                                accessReference(
+                                        singleRunnerReference,
+                                        jobManagerRunner ->
+                                                
Collections.singleton(jobManagerRunner.getJobID()),
+                                        Collections::emptySet))
+                .withUnregisterFunction(
+                        jobId ->
+                                accessReference(
+                                        singleRunnerReference,
+                                        jobManagerRunner -> {
+                                            if 
(jobManagerRunner.getJobID().equals(jobId)) {
+                                                return 
singleRunnerReference.getAndSet(null);
+                                            }
+
+                                            return 
throwNoSuchElementException(jobId);
+                                        },
+                                        () -> 
throwNoSuchElementException(jobId)))
+                .withGlobalCleanupAsyncFunction(
+                        (actualJobId, ignoredExecutor) ->
+                                cleanup(singleRunnerReference, actualJobId))
+                .withLocalCleanupAsyncFunction(
+                        (actualJobId, executor) -> 
cleanup(singleRunnerReference, actualJobId));
+    }
+
+    private static CompletableFuture<Void> cleanup(
+            AtomicReference<JobManagerRunner> singleRunnerReference, JobID 
actualJobId) {
+        return accessReference(
+                singleRunnerReference,
+                jobManagerRunner -> {
+                    if (jobManagerRunner.getJobID().equals(actualJobId)) {
+                        singleRunnerReference.getAndSet(null).closeAsync();
+                    }
+
+                    return FutureUtils.completedVoidFuture();
+                },
+                FutureUtils::completedVoidFuture);
+    }
+
+    private static JobManagerRunner throwNoSuchElementException(JobID jobId) {
+        throw new NoSuchElementException(
+                "JobManagerRunner with job ID " + jobId + " is not 
registered.");
+    }
+
+    private static <T> T accessReference(

Review comment:
       ```suggestion
       private static <T> T applyToReferenceOrFallback(
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java
##########
@@ -118,6 +122,93 @@ public static Builder builder() {
         return new Builder();
     }
 
+    /**
+     * Creates {@code Builder} that is initialized to similate a {@link 
JobManagerRunnerRegistry}
+     * implementation based on a single-{@link JobManagerRunner}. The default 
test implementation
+     * follows the {@code JobManagerRunnerRegistry} contract. This {@code 
JobManagerRunner} instance
+     * is held by the {@code AtomicReference}.

Review comment:
       ```suggestion
        * Creates a {@code Builder} that simulates the {@link 
JobManagerRunnerRegistry}
        * with at most one underlying {@link JobManagerRunner}. The default 
test implementation
        * follows the {@code JobManagerRunnerRegistry} contract. The underlying 
{@code JobManagerRunner} instance
        * is held by the {@code AtomicReference}, which makes it accessible to 
the test code.
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
##########
@@ -280,6 +343,21 @@ private JobGraphStore 
createAndStartJobGraphStoreWithCleanupFailures(
             AtomicInteger actualCleanupCallCount,
             OneShotLatch successfulCleanupLatch)
             throws Exception {
+        return createAndStartJobGraphStoreWithCleanupFailures(
+                numberOfCleanupFailures,
+                throwable,
+                actualCleanupCallCount,
+                successfulCleanupLatch,
+                FutureUtils.completedVoidFuture());
+    }
+
+    private JobGraphStore createAndStartJobGraphStoreWithCleanupFailures(

Review comment:
       This seems to be only used by the method above 🤔 Please double check 
that this change is needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to