XComp commented on code in PR #25027: URL: https://github.com/apache/flink/pull/25027#discussion_r1703145088
########## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableInMainThreadResource.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * {@code LocallyCleanableInMainThreadResource} is supposed to be implemented by any class that + * provides artifacts for a given job that need to be cleaned up in the main thread after the job + * reached a local terminal state. Local cleanups that needs to be triggered for a global terminal + * state as well, need to be implemented using the {@link GloballyCleanableResource}. + * + * <p>The {@link DispatcherResourceCleanerFactory} provides a workaround to trigger some {@code + * LocallyCleanableInMainThreadResource} as globally cleanable. FLINK-26175 is created to cover a + * refactoring and straighten things out. + * + * @see org.apache.flink.api.common.JobStatus + */ +@FunctionalInterface +public interface LocallyCleanableInMainThreadResource { Review Comment: I would suggest a slightly different name: `LocallyCleanableResourceWithMainThread`. All cleanable resources are cleaned in the main thread (i.e. the interface method is called from the main thread). The difference with this interface is that the main thread reference is also passed in. ########## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/OnMainThreadJobManagerRunnerRegistry.java: ########## @@ -84,9 +84,10 @@ public Collection<JobManagerRunner> getJobManagerRunners() { } @Override - public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor executor) { + public CompletableFuture<Void> localCleanupAsync( + JobID jobId, Executor cleanupExecutor, Executor executor) { Review Comment: ```suggestion JobID jobId, Executor cleanupExecutor, Executor mainThreadExecutor) { ``` we should have the role of the two executors being reflected in the parameter names. Otherwise, it might become quite confusing. I'm wondering whether we should even use `ComponentMainThreadExecutor` as a type for the main thread executor to make things clear. WDYT? ########## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java: ########## @@ -124,10 +127,39 @@ JOB_MANAGER_METRIC_GROUP_LABEL, ofLocalResource(jobManagerMetricGroup)) * state when we terminate globally. * * @param localResource Local resource that we want to clean during a global cleanup. + * @param mainThreadExecutor Main thread executor for cleanup. * @return Globally cleanable resource. */ - private static GloballyCleanableResource ofLocalResource( + private static GloballyCleanableResource toGloballyCleanableResource( + LocallyCleanableInMainThreadResource localResource, + ComponentMainThreadExecutor mainThreadExecutor) { + return (jobId, cleanupExecutor) -> + localResource.localCleanupAsync(jobId, cleanupExecutor, mainThreadExecutor); + } + + /** + * A simple wrapper for the resources that don't have any artifacts that can outlive the {@link + * org.apache.flink.runtime.dispatcher.Dispatcher}, but we still want to clean up their local + * state when we terminate globally. + * + * @param localResource Local resource that we want to clean during a global cleanup. + * @return Globally cleanable resource. + */ + private static GloballyCleanableResource toGloballyCleanableResource( LocallyCleanableResource localResource) { return localResource::localCleanupAsync; } + + /** + * Converts a LocallyCleanableInMainThreadResource object to a LocallyCleanableResource object. + * + * @param localResource LocallyCleanableInMainThreadResource that we want to translate. + * @return A LocallyCleanableResource. + */ + public static LocallyCleanableResource toLocallyCleanableResource( Review Comment: ```suggestion @VisibleForTesting public static LocallyCleanableResource toLocallyCleanableResource( ``` If we really need this one in test code, we should mark it as `@VisibleForTesting`. But I'd argue that this is an implementation detail of the `DispatcherResourceCleanerFactory` which shouldn't be exposed. WDYT? ########## flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java: ########## @@ -188,11 +197,26 @@ public static class Builder { private Supplier<Collection<JobManagerRunner>> getJobManagerRunnersSupplier = Collections::emptyList; private Function<JobID, JobManagerRunner> unregisterFunction = ignoredJobId -> null; - private BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupAsyncFunction = - (ignoredJobId, ignoredExecutor) -> FutureUtils.completedVoidFuture(); + private TriFunction<JobID, Executor, Executor, CompletableFuture<Void>> + localCleanupAsyncFunction = + (ignoredJobId, ignoredExecutor, mainThreadExecutor) -> + FutureUtils.completedVoidFuture(); private BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupAsyncFunction = (ignoredJobId, ignoredExecutor) -> FutureUtils.completedVoidFuture(); Review Comment: ```suggestion ``` I guess you could remove this one in a hotfix commit if you like. That seems to be some legacy code from when the JobManagerRunnerRegistry implemented both, the `LocallyCleanableResource` and the `GloballyCleanableResource` interfaces. ########## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableInMainThreadResource.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * {@code LocallyCleanableInMainThreadResource} is supposed to be implemented by any class that + * provides artifacts for a given job that need to be cleaned up in the main thread after the job + * reached a local terminal state. Local cleanups that needs to be triggered for a global terminal + * state as well, need to be implemented using the {@link GloballyCleanableResource}. + * + * <p>The {@link DispatcherResourceCleanerFactory} provides a workaround to trigger some {@code + * LocallyCleanableInMainThreadResource} as globally cleanable. FLINK-26175 is created to cover a + * refactoring and straighten things out. + * + * @see org.apache.flink.api.common.JobStatus Review Comment: ```suggestion * {@code LocallyCleanableInMainThreadResource} is an extension of the {@link * LocallyCleanableResource} interface. It allows the {@link DispatcherResourceCleanerFactory} to * inject the main thread as part of the cleanup procedure. along the cleanup procedure. * * <p>See {@code LocallyCleanableResource} for further context on the proper contract of the two * interfaces. ``` I would propose a more purpose-focused JavaDoc here. WDYT? ########## flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java: ########## @@ -157,25 +144,33 @@ void testFailingLocalCleanup() { assertThatFuture( testInstance.localCleanupAsync( - jobManagerRunner.getJobID(), Executors.directExecutor())) + jobManagerRunner.getJobID(), + Executors.directExecutor(), + Executors.directExecutor())) .isCompletedExceptionally() .eventuallyFailsWith(ExecutionException.class) .extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE) .hasExactlyElementsOfTypes(ExecutionException.class, expectedException.getClass()) .last() .isEqualTo(expectedException); - assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse(); + assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())) + .isTrue() + .as( + "Since the cleanup failed, the JobManagerRunner is expected to not have been unregistered."); } @Test - void testSuccessfulLocalCleanupAsync() { + void testSuccessfulLocalCleanupAsync() throws InterruptedException, ExecutionException { final TestingJobManagerRunner jobManagerRunner = registerTestingJobManagerRunner(); - final CompletableFuture<Void> cleanupResult = testInstance.localCleanupAsync( - jobManagerRunner.getJobID(), Executors.directExecutor()); + jobManagerRunner.getJobID(), + Executors.directExecutor(), + Executors.directExecutor()); + + // Wait for the unregister future to complete + cleanupResult.get(); Review Comment: ```suggestion FlinkAssertions.assertThatFuture(cleanupFuture) .as("Wait for the unregistration to complete") .eventuallySucceeds(); ``` nit: there are utility methods for asserting `CompletableFuture` instances ########## flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java: ########## @@ -369,7 +377,9 @@ private MiniDispatcher createMiniDispatcher( // JobManagerRunnerRegistry needs to be added explicitly // because cleaning it will trigger the closeAsync latch // provided by TestingJobManagerRunner - .withLocallyCleanableResource(jobManagerRunnerRegistry) + .withLocallyCleanableResource( Review Comment: Here we are inconsistent. The `TestingResourceCleanerFactory` has a dedicated `withLocallyCleanableInMainThreadResource` method for this purpose. Or am I missing something? :thinking: ########## flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java: ########## @@ -202,23 +202,33 @@ void testFailingLocalCleanupAsync() { } @Test - void testLocalCleanupAsyncNonBlocking() { + void testLocalCleanupAsyncNonBlocking() throws ExecutionException, InterruptedException { final TestingJobManagerRunner jobManagerRunner = TestingJobManagerRunner.newBuilder().setBlockingTermination(true).build(); testInstance.register(jobManagerRunner); // this call shouldn't block final CompletableFuture<Void> cleanupFuture = testInstance.localCleanupAsync( - jobManagerRunner.getJobID(), UnsupportedOperationExecutor.INSTANCE); - - assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse(); + jobManagerRunner.getJobID(), + Executors.directExecutor(), + Executors.directExecutor()); + + assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())) + .isTrue() + .as( + "Since the cleanup future hasn't completed yet, the JobManagerRunner is expected to not have been unregistered."); assertThat(jobManagerRunner.getTerminationFuture()).isNotCompleted(); assertThat(cleanupFuture).isNotCompleted(); + // Complete the closeAsync() future jobManagerRunner.getTerminationFuture().complete(null); - assertThat(cleanupFuture).isCompleted(); + // Wait for the unregistration to complete + cleanupFuture.get(); + assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())) + .isFalse() + .as("The unregister future is complete now."); Review Comment: ```suggestion .as("The unregister future is complete now.") .isFalse(); ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java: ########## @@ -83,9 +83,16 @@ public Collection<JobManagerRunner> getJobManagerRunners() { } @Override - public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor unusedExecutor) { + public CompletableFuture<Void> localCleanupAsync( + JobID jobId, Executor ignoredExecutor, Executor mainThreadExecutor) { if (isRegistered(jobId)) { - return unregister(jobId).closeAsync(); + CompletableFuture<Void> resultFuture = this.jobManagerRunners.get(jobId).closeAsync(); + + return resultFuture.thenApplyAsync( + result -> { + mainThreadExecutor.execute(() -> unregister(jobId)); + return result; + }); Review Comment: ```suggestion return get(jobId) .closeAsync() .thenRunAsync(() -> unregister(jobId), mainThreadExecutor); ``` We can shorten this part of the code. WDYT? ########## flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java: ########## @@ -188,11 +197,26 @@ public static class Builder { private Supplier<Collection<JobManagerRunner>> getJobManagerRunnersSupplier = Collections::emptyList; private Function<JobID, JobManagerRunner> unregisterFunction = ignoredJobId -> null; - private BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupAsyncFunction = - (ignoredJobId, ignoredExecutor) -> FutureUtils.completedVoidFuture(); + private TriFunction<JobID, Executor, Executor, CompletableFuture<Void>> + localCleanupAsyncFunction = + (ignoredJobId, ignoredExecutor, mainThreadExecutor) -> + FutureUtils.completedVoidFuture(); private BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupAsyncFunction = (ignoredJobId, ignoredExecutor) -> FutureUtils.completedVoidFuture(); + private Builder fromDefaultJobManagerRunnerRegistry( Review Comment: What's the purpose of this? It looks like we're map each of the `DefaultJobManagerRunnerRegistry` methods to the `TestingJobManagerRunnerRegistry` callback generating actually a "new" `DefaultJobManagerRunnerRegistry` instance wrapped in a `TestingJobManagerRunnerRegistry`. We could just use `DefaultJobManagerRunnerRegistry`, instead. Don't you think? :thinking: ########## flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java: ########## @@ -157,25 +144,33 @@ void testFailingLocalCleanup() { assertThatFuture( testInstance.localCleanupAsync( - jobManagerRunner.getJobID(), Executors.directExecutor())) + jobManagerRunner.getJobID(), + Executors.directExecutor(), + Executors.directExecutor())) .isCompletedExceptionally() .eventuallyFailsWith(ExecutionException.class) .extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE) .hasExactlyElementsOfTypes(ExecutionException.class, expectedException.getClass()) .last() .isEqualTo(expectedException); - assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse(); + assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())) + .isTrue() + .as( + "Since the cleanup failed, the JobManagerRunner is expected to not have been unregistered."); Review Comment: ```suggestion .as( "Since the cleanup failed, the JobManagerRunner is expected to not have been unregistered.") .isTrue(); ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableInMainThreadResource.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * {@code LocallyCleanableInMainThreadResource} is supposed to be implemented by any class that + * provides artifacts for a given job that need to be cleaned up in the main thread after the job + * reached a local terminal state. Local cleanups that needs to be triggered for a global terminal + * state as well, need to be implemented using the {@link GloballyCleanableResource}. + * + * <p>The {@link DispatcherResourceCleanerFactory} provides a workaround to trigger some {@code + * LocallyCleanableInMainThreadResource} as globally cleanable. FLINK-26175 is created to cover a + * refactoring and straighten things out. + * + * @see org.apache.flink.api.common.JobStatus + */ +@FunctionalInterface +public interface LocallyCleanableInMainThreadResource { + + /** + * {@code localCleanupAsync} is expected to be called from the main thread and uses the passed + * {@code mainThreadExecutor} for cleanups. Thread-safety must be ensured. + * + * @param jobId The {@link JobID} of the job for which the local data should be cleaned up. + * @param cleanupExecutor The fallback executor for IO-heavy operations. + * @param mainThreadExecutor The main thread executor. + * @return The cleanup result future. + */ + CompletableFuture<Void> localCleanupAsync( + JobID jobId, Executor cleanupExecutor, Executor mainThreadExecutor); Review Comment: ```suggestion JobID jobId, Executor cleanupExecutor, Executor mainThreadExecutor); ``` We could move the conversion also into the interface if we really need this in multiple places rather than only in the `[DispatcherResourceCleanerFactory`. WDYT? ########## flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java: ########## @@ -190,8 +185,13 @@ void testFailingLocalCleanupAsync() { final CompletableFuture<Void> cleanupResult = testInstance.localCleanupAsync( - jobManagerRunner.getJobID(), Executors.directExecutor()); - assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse(); + jobManagerRunner.getJobID(), + Executors.directExecutor(), + Executors.directExecutor()); + assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())) + .isTrue() + .as( + "Since the cleanup failed, the JobManagerRunner is expected to not have been unregistered."); Review Comment: ```suggestion .as( "Since the cleanup failed, the JobManagerRunner is expected to not have been unregistered.") .isTrue(); ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java: ########## @@ -26,27 +26,42 @@ import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** {@code TestingResourceCleanerFactory} for adding custom {@link ResourceCleaner} creation. */ public class TestingResourceCleanerFactory implements ResourceCleanerFactory { private final Collection<LocallyCleanableResource> locallyCleanableResources; + private Collection<LocallyCleanableInMainThreadResource> locallyCleanableInMainThreadResources; private final Collection<GloballyCleanableResource> globallyCleanableResources; private final Executor cleanupExecutor; private TestingResourceCleanerFactory( Collection<LocallyCleanableResource> locallyCleanableResources, + Collection<LocallyCleanableInMainThreadResource> locallyCleanableInMainThreadResources, Collection<GloballyCleanableResource> globallyCleanableResources, Executor cleanupExecutor) { this.locallyCleanableResources = locallyCleanableResources; + this.locallyCleanableInMainThreadResources = locallyCleanableInMainThreadResources; this.globallyCleanableResources = globallyCleanableResources; this.cleanupExecutor = cleanupExecutor; } @Override public ResourceCleaner createLocalResourceCleaner( ComponentMainThreadExecutor mainThreadExecutor) { + Collection<LocallyCleanableResource> locallyCleanableResources = + this.locallyCleanableResources; + + locallyCleanableResources.addAll( + locallyCleanableInMainThreadResources.stream() + .map( + r -> + DispatcherResourceCleanerFactory.toLocallyCleanableResource( Review Comment: `TestingResourceCleanerFactory` is a testing implementation that shouldn't depend on the `DispatcherResourceCleanerFactory`-internal code. We can do the mapping from `LocallyCleanableInMainThreadResource` to `LocallyCleanableResource` internally, I guess :thinking: ########## flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java: ########## @@ -202,23 +202,33 @@ void testFailingLocalCleanupAsync() { } @Test - void testLocalCleanupAsyncNonBlocking() { + void testLocalCleanupAsyncNonBlocking() throws ExecutionException, InterruptedException { final TestingJobManagerRunner jobManagerRunner = TestingJobManagerRunner.newBuilder().setBlockingTermination(true).build(); testInstance.register(jobManagerRunner); // this call shouldn't block final CompletableFuture<Void> cleanupFuture = testInstance.localCleanupAsync( - jobManagerRunner.getJobID(), UnsupportedOperationExecutor.INSTANCE); - - assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse(); + jobManagerRunner.getJobID(), + Executors.directExecutor(), + Executors.directExecutor()); + + assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())) + .isTrue() + .as( + "Since the cleanup future hasn't completed yet, the JobManagerRunner is expected to not have been unregistered."); Review Comment: ```suggestion .as( "Since the cleanup failed, the JobManagerRunner is expected to not have been unregistered.") .isTrue(); ``` The assert message has to be put in front of the assertion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
