XComp commented on code in PR #25027:
URL: https://github.com/apache/flink/pull/25027#discussion_r1703145088


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableInMainThreadResource.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * {@code LocallyCleanableInMainThreadResource} is supposed to be implemented 
by any class that
+ * provides artifacts for a given job that need to be cleaned up in the main 
thread after the job
+ * reached a local terminal state. Local cleanups that needs to be triggered 
for a global terminal
+ * state as well, need to be implemented using the {@link 
GloballyCleanableResource}.
+ *
+ * <p>The {@link DispatcherResourceCleanerFactory} provides a workaround to 
trigger some {@code
+ * LocallyCleanableInMainThreadResource} as globally cleanable. FLINK-26175 is 
created to cover a
+ * refactoring and straighten things out.
+ *
+ * @see org.apache.flink.api.common.JobStatus
+ */
+@FunctionalInterface
+public interface LocallyCleanableInMainThreadResource {

Review Comment:
   I would suggest a slightly different name: 
`LocallyCleanableResourceWithMainThread`. 
   
   All cleanable resources are cleaned in the main thread (i.e. the interface 
method is called from the main thread). The difference with this interface is 
that the main thread reference is also passed in.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/OnMainThreadJobManagerRunnerRegistry.java:
##########
@@ -84,9 +84,10 @@ public Collection<JobManagerRunner> getJobManagerRunners() {
     }
 
     @Override
-    public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor 
executor) {
+    public CompletableFuture<Void> localCleanupAsync(
+            JobID jobId, Executor cleanupExecutor, Executor executor) {

Review Comment:
   ```suggestion
               JobID jobId, Executor cleanupExecutor, Executor 
mainThreadExecutor) {
   ```
   we should have the role of the two executors being reflected in the 
parameter names. Otherwise, it might become quite confusing. 
   
   I'm wondering whether we should even use `ComponentMainThreadExecutor` as a 
type for the main thread executor to make things clear. WDYT?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java:
##########
@@ -124,10 +127,39 @@ JOB_MANAGER_METRIC_GROUP_LABEL, 
ofLocalResource(jobManagerMetricGroup))
      * state when we terminate globally.
      *
      * @param localResource Local resource that we want to clean during a 
global cleanup.
+     * @param mainThreadExecutor Main thread executor for cleanup.
      * @return Globally cleanable resource.
      */
-    private static GloballyCleanableResource ofLocalResource(
+    private static GloballyCleanableResource toGloballyCleanableResource(
+            LocallyCleanableInMainThreadResource localResource,
+            ComponentMainThreadExecutor mainThreadExecutor) {
+        return (jobId, cleanupExecutor) ->
+                localResource.localCleanupAsync(jobId, cleanupExecutor, 
mainThreadExecutor);
+    }
+
+    /**
+     * A simple wrapper for the resources that don't have any artifacts that 
can outlive the {@link
+     * org.apache.flink.runtime.dispatcher.Dispatcher}, but we still want to 
clean up their local
+     * state when we terminate globally.
+     *
+     * @param localResource Local resource that we want to clean during a 
global cleanup.
+     * @return Globally cleanable resource.
+     */
+    private static GloballyCleanableResource toGloballyCleanableResource(
             LocallyCleanableResource localResource) {
         return localResource::localCleanupAsync;
     }
+
+    /**
+     * Converts a LocallyCleanableInMainThreadResource object to a 
LocallyCleanableResource object.
+     *
+     * @param localResource LocallyCleanableInMainThreadResource that we want 
to translate.
+     * @return A LocallyCleanableResource.
+     */
+    public static LocallyCleanableResource toLocallyCleanableResource(

Review Comment:
   ```suggestion
       @VisibleForTesting
       public static LocallyCleanableResource toLocallyCleanableResource(
   ```
   If we really need this one in test code, we should mark it as 
`@VisibleForTesting`. But I'd argue that this is an implementation detail of 
the `DispatcherResourceCleanerFactory` which shouldn't be exposed. WDYT?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java:
##########
@@ -188,11 +197,26 @@ public static class Builder {
         private Supplier<Collection<JobManagerRunner>> 
getJobManagerRunnersSupplier =
                 Collections::emptyList;
         private Function<JobID, JobManagerRunner> unregisterFunction = 
ignoredJobId -> null;
-        private BiFunction<JobID, Executor, CompletableFuture<Void>> 
localCleanupAsyncFunction =
-                (ignoredJobId, ignoredExecutor) -> 
FutureUtils.completedVoidFuture();
+        private TriFunction<JobID, Executor, Executor, CompletableFuture<Void>>
+                localCleanupAsyncFunction =
+                        (ignoredJobId, ignoredExecutor, mainThreadExecutor) ->
+                                FutureUtils.completedVoidFuture();
         private BiFunction<JobID, Executor, CompletableFuture<Void>> 
globalCleanupAsyncFunction =
                 (ignoredJobId, ignoredExecutor) -> 
FutureUtils.completedVoidFuture();

Review Comment:
   ```suggestion
   ```
   I guess you could remove this one in a hotfix commit if you like. That seems 
to be some legacy code from when the JobManagerRunnerRegistry implemented both, 
the `LocallyCleanableResource` and the `GloballyCleanableResource` interfaces.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableInMainThreadResource.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * {@code LocallyCleanableInMainThreadResource} is supposed to be implemented 
by any class that
+ * provides artifacts for a given job that need to be cleaned up in the main 
thread after the job
+ * reached a local terminal state. Local cleanups that needs to be triggered 
for a global terminal
+ * state as well, need to be implemented using the {@link 
GloballyCleanableResource}.
+ *
+ * <p>The {@link DispatcherResourceCleanerFactory} provides a workaround to 
trigger some {@code
+ * LocallyCleanableInMainThreadResource} as globally cleanable. FLINK-26175 is 
created to cover a
+ * refactoring and straighten things out.
+ *
+ * @see org.apache.flink.api.common.JobStatus

Review Comment:
   ```suggestion
    * {@code LocallyCleanableInMainThreadResource} is an extension of the {@link
    * LocallyCleanableResource} interface. It allows the {@link 
DispatcherResourceCleanerFactory} to
    * inject the main thread as part of the cleanup procedure. along the 
cleanup procedure.
    *
    * <p>See {@code LocallyCleanableResource} for further context on the proper 
contract of the two
    * interfaces.
   ```
   I would propose a more purpose-focused JavaDoc here. WDYT?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java:
##########
@@ -157,25 +144,33 @@ void testFailingLocalCleanup() {
 
         assertThatFuture(
                         testInstance.localCleanupAsync(
-                                jobManagerRunner.getJobID(), 
Executors.directExecutor()))
+                                jobManagerRunner.getJobID(),
+                                Executors.directExecutor(),
+                                Executors.directExecutor()))
                 .isCompletedExceptionally()
                 .eventuallyFailsWith(ExecutionException.class)
                 .extracting(FlinkAssertions::chainOfCauses, 
FlinkAssertions.STREAM_THROWABLE)
                 .hasExactlyElementsOfTypes(ExecutionException.class, 
expectedException.getClass())
                 .last()
                 .isEqualTo(expectedException);
-        
assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+        assertThat(testInstance.isRegistered(jobManagerRunner.getJobID()))
+                .isTrue()
+                .as(
+                        "Since the cleanup failed, the JobManagerRunner is 
expected to not have been unregistered.");
     }
 
     @Test
-    void testSuccessfulLocalCleanupAsync() {
+    void testSuccessfulLocalCleanupAsync() throws InterruptedException, 
ExecutionException {
         final TestingJobManagerRunner jobManagerRunner = 
registerTestingJobManagerRunner();
-
         final CompletableFuture<Void> cleanupResult =
                 testInstance.localCleanupAsync(
-                        jobManagerRunner.getJobID(), 
Executors.directExecutor());
+                        jobManagerRunner.getJobID(),
+                        Executors.directExecutor(),
+                        Executors.directExecutor());
+
+        // Wait for the unregister future to complete
+        cleanupResult.get();

Review Comment:
   ```suggestion
            FlinkAssertions.assertThatFuture(cleanupFuture)
                   .as("Wait for the unregistration to complete")
                   .eventuallySucceeds();
   ```
   nit: there are utility methods for asserting `CompletableFuture` instances



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java:
##########
@@ -369,7 +377,9 @@ private MiniDispatcher createMiniDispatcher(
                         // JobManagerRunnerRegistry needs to be added 
explicitly
                         // because cleaning it will trigger the closeAsync 
latch
                         // provided by TestingJobManagerRunner
-                        .withLocallyCleanableResource(jobManagerRunnerRegistry)
+                        .withLocallyCleanableResource(

Review Comment:
   Here we are inconsistent. The `TestingResourceCleanerFactory` has a 
dedicated `withLocallyCleanableInMainThreadResource` method for this purpose. 
Or am I missing something? :thinking: 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java:
##########
@@ -202,23 +202,33 @@ void testFailingLocalCleanupAsync() {
     }
 
     @Test
-    void testLocalCleanupAsyncNonBlocking() {
+    void testLocalCleanupAsyncNonBlocking() throws ExecutionException, 
InterruptedException {
         final TestingJobManagerRunner jobManagerRunner =
                 
TestingJobManagerRunner.newBuilder().setBlockingTermination(true).build();
         testInstance.register(jobManagerRunner);
 
         // this call shouldn't block
         final CompletableFuture<Void> cleanupFuture =
                 testInstance.localCleanupAsync(
-                        jobManagerRunner.getJobID(), 
UnsupportedOperationExecutor.INSTANCE);
-
-        
assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+                        jobManagerRunner.getJobID(),
+                        Executors.directExecutor(),
+                        Executors.directExecutor());
+
+        assertThat(testInstance.isRegistered(jobManagerRunner.getJobID()))
+                .isTrue()
+                .as(
+                        "Since the cleanup future hasn't completed yet, the 
JobManagerRunner is expected to not have been unregistered.");
         assertThat(jobManagerRunner.getTerminationFuture()).isNotCompleted();
         assertThat(cleanupFuture).isNotCompleted();
 
+        // Complete the closeAsync() future
         jobManagerRunner.getTerminationFuture().complete(null);
 
-        assertThat(cleanupFuture).isCompleted();
+        // Wait for the unregistration to complete
+        cleanupFuture.get();
+        assertThat(testInstance.isRegistered(jobManagerRunner.getJobID()))
+                .isFalse()
+                .as("The unregister future is complete now.");

Review Comment:
   ```suggestion
                   .as("The unregister future is complete now.")
                   .isFalse();
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java:
##########
@@ -83,9 +83,16 @@ public Collection<JobManagerRunner> getJobManagerRunners() {
     }
 
     @Override
-    public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor 
unusedExecutor) {
+    public CompletableFuture<Void> localCleanupAsync(
+            JobID jobId, Executor ignoredExecutor, Executor 
mainThreadExecutor) {
         if (isRegistered(jobId)) {
-            return unregister(jobId).closeAsync();
+            CompletableFuture<Void> resultFuture = 
this.jobManagerRunners.get(jobId).closeAsync();
+
+            return resultFuture.thenApplyAsync(
+                    result -> {
+                        mainThreadExecutor.execute(() -> unregister(jobId));
+                        return result;
+                    });

Review Comment:
   ```suggestion
               return get(jobId)
                       .closeAsync()
                       .thenRunAsync(() -> unregister(jobId), 
mainThreadExecutor);
   ```
   We can shorten this part of the code. WDYT?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java:
##########
@@ -188,11 +197,26 @@ public static class Builder {
         private Supplier<Collection<JobManagerRunner>> 
getJobManagerRunnersSupplier =
                 Collections::emptyList;
         private Function<JobID, JobManagerRunner> unregisterFunction = 
ignoredJobId -> null;
-        private BiFunction<JobID, Executor, CompletableFuture<Void>> 
localCleanupAsyncFunction =
-                (ignoredJobId, ignoredExecutor) -> 
FutureUtils.completedVoidFuture();
+        private TriFunction<JobID, Executor, Executor, CompletableFuture<Void>>
+                localCleanupAsyncFunction =
+                        (ignoredJobId, ignoredExecutor, mainThreadExecutor) ->
+                                FutureUtils.completedVoidFuture();
         private BiFunction<JobID, Executor, CompletableFuture<Void>> 
globalCleanupAsyncFunction =
                 (ignoredJobId, ignoredExecutor) -> 
FutureUtils.completedVoidFuture();
 
+        private Builder fromDefaultJobManagerRunnerRegistry(

Review Comment:
   What's the purpose of this? It looks like we're map each of the 
`DefaultJobManagerRunnerRegistry` methods to the 
`TestingJobManagerRunnerRegistry` callback generating actually a "new" 
`DefaultJobManagerRunnerRegistry` instance wrapped in a 
`TestingJobManagerRunnerRegistry`. We could just use 
`DefaultJobManagerRunnerRegistry`, instead. Don't you think? :thinking: 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java:
##########
@@ -157,25 +144,33 @@ void testFailingLocalCleanup() {
 
         assertThatFuture(
                         testInstance.localCleanupAsync(
-                                jobManagerRunner.getJobID(), 
Executors.directExecutor()))
+                                jobManagerRunner.getJobID(),
+                                Executors.directExecutor(),
+                                Executors.directExecutor()))
                 .isCompletedExceptionally()
                 .eventuallyFailsWith(ExecutionException.class)
                 .extracting(FlinkAssertions::chainOfCauses, 
FlinkAssertions.STREAM_THROWABLE)
                 .hasExactlyElementsOfTypes(ExecutionException.class, 
expectedException.getClass())
                 .last()
                 .isEqualTo(expectedException);
-        
assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+        assertThat(testInstance.isRegistered(jobManagerRunner.getJobID()))
+                .isTrue()
+                .as(
+                        "Since the cleanup failed, the JobManagerRunner is 
expected to not have been unregistered.");

Review Comment:
   ```suggestion
                   .as(
                           "Since the cleanup failed, the JobManagerRunner is 
expected to not have been unregistered.")
                   .isTrue();
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableInMainThreadResource.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * {@code LocallyCleanableInMainThreadResource} is supposed to be implemented 
by any class that
+ * provides artifacts for a given job that need to be cleaned up in the main 
thread after the job
+ * reached a local terminal state. Local cleanups that needs to be triggered 
for a global terminal
+ * state as well, need to be implemented using the {@link 
GloballyCleanableResource}.
+ *
+ * <p>The {@link DispatcherResourceCleanerFactory} provides a workaround to 
trigger some {@code
+ * LocallyCleanableInMainThreadResource} as globally cleanable. FLINK-26175 is 
created to cover a
+ * refactoring and straighten things out.
+ *
+ * @see org.apache.flink.api.common.JobStatus
+ */
+@FunctionalInterface
+public interface LocallyCleanableInMainThreadResource {
+
+    /**
+     * {@code localCleanupAsync} is expected to be called from the main thread 
and uses the passed
+     * {@code mainThreadExecutor} for cleanups. Thread-safety must be ensured.
+     *
+     * @param jobId The {@link JobID} of the job for which the local data 
should be cleaned up.
+     * @param cleanupExecutor The fallback executor for IO-heavy operations.
+     * @param mainThreadExecutor The main thread executor.
+     * @return The cleanup result future.
+     */
+    CompletableFuture<Void> localCleanupAsync(
+            JobID jobId, Executor cleanupExecutor, Executor 
mainThreadExecutor);

Review Comment:
   ```suggestion
               JobID jobId, Executor cleanupExecutor, Executor 
mainThreadExecutor);
   ```
   We could move the conversion also into the interface if we really need this 
in multiple places rather than only in the `[DispatcherResourceCleanerFactory`. 
WDYT?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java:
##########
@@ -190,8 +185,13 @@ void testFailingLocalCleanupAsync() {
 
         final CompletableFuture<Void> cleanupResult =
                 testInstance.localCleanupAsync(
-                        jobManagerRunner.getJobID(), 
Executors.directExecutor());
-        
assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+                        jobManagerRunner.getJobID(),
+                        Executors.directExecutor(),
+                        Executors.directExecutor());
+        assertThat(testInstance.isRegistered(jobManagerRunner.getJobID()))
+                .isTrue()
+                .as(
+                        "Since the cleanup failed, the JobManagerRunner is 
expected to not have been unregistered.");

Review Comment:
   ```suggestion
                   .as(
                           "Since the cleanup failed, the JobManagerRunner is 
expected to not have been unregistered.")
                   .isTrue();
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java:
##########
@@ -26,27 +26,42 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /** {@code TestingResourceCleanerFactory} for adding custom {@link 
ResourceCleaner} creation. */
 public class TestingResourceCleanerFactory implements ResourceCleanerFactory {
 
     private final Collection<LocallyCleanableResource> 
locallyCleanableResources;
+    private Collection<LocallyCleanableInMainThreadResource> 
locallyCleanableInMainThreadResources;
     private final Collection<GloballyCleanableResource> 
globallyCleanableResources;
 
     private final Executor cleanupExecutor;
 
     private TestingResourceCleanerFactory(
             Collection<LocallyCleanableResource> locallyCleanableResources,
+            Collection<LocallyCleanableInMainThreadResource> 
locallyCleanableInMainThreadResources,
             Collection<GloballyCleanableResource> globallyCleanableResources,
             Executor cleanupExecutor) {
         this.locallyCleanableResources = locallyCleanableResources;
+        this.locallyCleanableInMainThreadResources = 
locallyCleanableInMainThreadResources;
         this.globallyCleanableResources = globallyCleanableResources;
         this.cleanupExecutor = cleanupExecutor;
     }
 
     @Override
     public ResourceCleaner createLocalResourceCleaner(
             ComponentMainThreadExecutor mainThreadExecutor) {
+        Collection<LocallyCleanableResource> locallyCleanableResources =
+                this.locallyCleanableResources;
+
+        locallyCleanableResources.addAll(
+                locallyCleanableInMainThreadResources.stream()
+                        .map(
+                                r ->
+                                        
DispatcherResourceCleanerFactory.toLocallyCleanableResource(

Review Comment:
   `TestingResourceCleanerFactory` is a testing implementation that shouldn't 
depend on the `DispatcherResourceCleanerFactory`-internal code. We can do the 
mapping from `LocallyCleanableInMainThreadResource` to 
`LocallyCleanableResource` internally, I guess :thinking: 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java:
##########
@@ -202,23 +202,33 @@ void testFailingLocalCleanupAsync() {
     }
 
     @Test
-    void testLocalCleanupAsyncNonBlocking() {
+    void testLocalCleanupAsyncNonBlocking() throws ExecutionException, 
InterruptedException {
         final TestingJobManagerRunner jobManagerRunner =
                 
TestingJobManagerRunner.newBuilder().setBlockingTermination(true).build();
         testInstance.register(jobManagerRunner);
 
         // this call shouldn't block
         final CompletableFuture<Void> cleanupFuture =
                 testInstance.localCleanupAsync(
-                        jobManagerRunner.getJobID(), 
UnsupportedOperationExecutor.INSTANCE);
-
-        
assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse();
+                        jobManagerRunner.getJobID(),
+                        Executors.directExecutor(),
+                        Executors.directExecutor());
+
+        assertThat(testInstance.isRegistered(jobManagerRunner.getJobID()))
+                .isTrue()
+                .as(
+                        "Since the cleanup future hasn't completed yet, the 
JobManagerRunner is expected to not have been unregistered.");

Review Comment:
   ```suggestion
                   .as(
                           "Since the cleanup failed, the JobManagerRunner is 
expected to not have been unregistered.")
                   .isTrue();
   ```
   The assert message has to be put in front of the assertion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to