XComp commented on code in PR #25027:
URL: https://github.com/apache/flink/pull/25027#discussion_r1687470490
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java:
##########
@@ -83,9 +83,15 @@ public Collection<JobManagerRunner> getJobManagerRunners() {
}
@Override
- public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor
unusedExecutor) {
+ public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor
mainThreadExecutor) {
if (isRegistered(jobId)) {
- return unregister(jobId).closeAsync();
+ CompletableFuture<Void> resultFuture =
this.jobManagerRunners.get(jobId).closeAsync();
+
+ return resultFuture.thenApply(
Review Comment:
```suggestion
return resultFuture.thenApplyAsync(
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java:
##########
@@ -30,16 +30,17 @@
/** {@code TestingResourceCleanerFactory} for adding custom {@link
ResourceCleaner} creation. */
public class TestingResourceCleanerFactory implements ResourceCleanerFactory {
- private final Collection<LocallyCleanableResource>
locallyCleanableResources;
+ private final Collection<LocallyCleanableInMainThreadResource>
+ locallyCleanableInMainThreadResource;
private final Collection<GloballyCleanableResource>
globallyCleanableResources;
private final Executor cleanupExecutor;
private TestingResourceCleanerFactory(
- Collection<LocallyCleanableResource> locallyCleanableResources,
+ Collection<LocallyCleanableInMainThreadResource>
locallyCleanableInMainThreadResource,
Review Comment:
`LocallyCleanableInMainThreadResource` is an implementation detail of the
`DispatcherResourceCleanerFactory`. We don't have to use it in
`TestingResourceCleanerFactory`.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java:
##########
Review Comment:
I'm wondering whether you could come up with a test where we verify that the
close is called in another thread and the unregister is then executed in the
main thread.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/LocallyCleanableInMainThreadResource.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher.cleanup;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * {@code LocallyCleanableInMainThreadResource} is supposed to be implemented
by any class that
+ * provides artifacts for a given job that need to be cleaned up in the main
thread after the job
+ * reached a local terminal state. Local cleanups that needs to be triggered
for a global terminal
+ * state as well, need to be implemented using the {@link
GloballyCleanableResource}.
+ *
+ * <p>The {@link DispatcherResourceCleanerFactory} provides a workaround to
trigger some {@code
+ * LocallyCleanableInMainThreadResource} as globally cleanable. FLINK-26175 is
created to cover a
+ * refactoring and straighten things out.
+ *
+ * @see org.apache.flink.api.common.JobStatus
+ */
+@FunctionalInterface
+public interface LocallyCleanableInMainThreadResource {
+
+ /**
+ * {@code localCleanupAsync} is expected to be called from the main thread
and uses the passed
+ * {@code mainThreadExecutor} for cleanups. Thread-safety must be ensured.
+ *
+ * @param jobId The {@link JobID} of the job for which the local data
should be cleaned up.
+ * @param mainThreadExecutor The main thread executor.
+ * @return The cleanup result future.
+ */
+ CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor
mainThreadExecutor);
Review Comment:
We shouldn't remove the `ioExecutor` from the signature because we still
need to translate `LocallyCleanableInMainThreadResource` into
`LocallyCleanableResource` (see the proposal in [my
comment](https://github.com/apache/flink/pull/25027#discussion_r1683903323)).
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java:
##########
@@ -126,8 +129,24 @@ JOB_MANAGER_METRIC_GROUP_LABEL,
ofLocalResource(jobManagerMetricGroup))
* @param localResource Local resource that we want to clean during a
global cleanup.
* @return Globally cleanable resource.
*/
- private static GloballyCleanableResource ofLocalResource(
+ private static GloballyCleanableResource toGloballyCleanableResource(
+ LocallyCleanableInMainThreadResource localResource) {
+ return localResource::localCleanupAsync;
+ }
+
+ private static GloballyCleanableResource toGloballyCleanableResource(
LocallyCleanableResource localResource) {
return localResource::localCleanupAsync;
}
+
+ /**
+ * Converts a LocallyCleanableInMainThreadResource object to a
LocallyCleanableResource object.
+ *
+ * @param localResource LocallyCleanableInMainThreadResource that we want
to translate.
+ * @return A LocallyCleanableResource.
+ */
+ private static LocallyCleanableResource toLocallyCleanableResource(
+ LocallyCleanableInMainThreadResource localResource) {
+ return localResource::localCleanupAsync;
Review Comment:
These conversion methods don't have any effect, meaning they wouldn't pass
in the main thread in any way. You're just replacing one method with another
method having the same signature (but different method and parameter names).
The input stays the same (meaning that the `ioExecutor` would be passed in)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]