eemario commented on code in PR #27741: URL: https://github.com/apache/flink/pull/27741#discussion_r2973216177
########## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultApplicationResourceCleaner.java: ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.util.concurrent.RetryStrategy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +/** + * {@code DefaultApplicationResourceCleaner} is the default implementation of {@link + * ApplicationResourceCleaner}. It will try to clean up any resource that was added. Failure will + * result in an individual retry of the cleanup. The overall cleanup result succeeds after all + * subtasks succeeded. + */ +public class DefaultApplicationResourceCleaner<T> implements ApplicationResourceCleaner { + + private static final Logger LOG = + LoggerFactory.getLogger(DefaultApplicationResourceCleaner.class); + + private final ComponentMainThreadExecutor mainThreadExecutor; + private final Executor cleanupExecutor; + private final CleanupFn<T> cleanupFn; + + private final Collection<CleanupWithLabel<T>> regularCleanup; + + private final RetryStrategy retryStrategy; + + public static Builder<GloballyCleanableApplicationResource> forGloballyCleanableResources( + ComponentMainThreadExecutor mainThreadExecutor, + Executor cleanupExecutor, + RetryStrategy retryStrategy) { + return forCleanableResources( + mainThreadExecutor, + cleanupExecutor, + GloballyCleanableApplicationResource::globalCleanupAsync, + retryStrategy); + } + + @VisibleForTesting + static <T> Builder<T> forCleanableResources( + ComponentMainThreadExecutor mainThreadExecutor, + Executor cleanupExecutor, + CleanupFn<T> cleanupFunction, + RetryStrategy retryStrategy) { + return new Builder<>(mainThreadExecutor, cleanupExecutor, cleanupFunction, retryStrategy); + } + + @VisibleForTesting + @FunctionalInterface + interface CleanupFn<T> { + CompletableFuture<Void> cleanupAsync( + T resource, ApplicationID applicationId, Executor cleanupExecutor); + } + + /** + * {@code Builder} for creating {@code DefaultApplicationResourceCleaner} instances. + * + * @param <T> The functional interface that's being translated into the internally used {@link + * CleanupFn}. + */ + public static class Builder<T> { + + private final ComponentMainThreadExecutor mainThreadExecutor; + private final Executor cleanupExecutor; + private final CleanupFn<T> cleanupFn; + + private final RetryStrategy retryStrategy; + + private final Collection<CleanupWithLabel<T>> prioritizedCleanup = new ArrayList<>(); + private final Collection<CleanupWithLabel<T>> regularCleanup = new ArrayList<>(); + + private Builder( + ComponentMainThreadExecutor mainThreadExecutor, + Executor cleanupExecutor, + CleanupFn<T> cleanupFn, + RetryStrategy retryStrategy) { + this.mainThreadExecutor = mainThreadExecutor; + this.cleanupExecutor = cleanupExecutor; + this.cleanupFn = cleanupFn; + this.retryStrategy = retryStrategy; + } + + /** + * Prioritized cleanups run before their regular counterparts. This method enables the + * caller to model dependencies between cleanup tasks. The order in which cleanable + * resources are added matters, i.e. if two cleanable resources are added as prioritized + * cleanup tasks, the resource being added first will block the cleanup of the second + * resource. All prioritized cleanup resources will run and finish before any resource that + * is added using {@link #withRegularCleanup(String, Object)} is started. + * + * @param label The label being used when logging errors in the given cleanup. + * @param prioritizedCleanup The cleanup callback that is going to be prioritized. + */ + public Builder<T> withPrioritizedCleanup(String label, T prioritizedCleanup) { + this.prioritizedCleanup.add(new CleanupWithLabel<>(prioritizedCleanup, label)); + return this; + } + + /** + * Regular cleanups are resources for which the cleanup is triggered after all prioritized + * cleanups succeeded. All added regular cleanups will run concurrently to each other. + * + * @param label The label being used when logging errors in the given cleanup. + * @param regularCleanup The cleanup callback that is going to run after all prioritized + * cleanups are finished. + * @see #withPrioritizedCleanup(String, Object) + */ + public Builder<T> withRegularCleanup(String label, T regularCleanup) { + this.regularCleanup.add(new CleanupWithLabel<>(regularCleanup, label)); + return this; + } + + public DefaultApplicationResourceCleaner build() { + return new DefaultApplicationResourceCleaner<>( + mainThreadExecutor, + cleanupExecutor, + cleanupFn, + prioritizedCleanup, + regularCleanup, + retryStrategy); + } + } + + private DefaultApplicationResourceCleaner( + ComponentMainThreadExecutor mainThreadExecutor, + Executor cleanupExecutor, + CleanupFn<T> cleanupFn, + Collection<CleanupWithLabel<T>> prioritizedCleanup, + Collection<CleanupWithLabel<T>> regularCleanup, + RetryStrategy retryStrategy) { + this.mainThreadExecutor = mainThreadExecutor; + this.cleanupExecutor = cleanupExecutor; + this.cleanupFn = cleanupFn; + this.regularCleanup = regularCleanup; + this.retryStrategy = retryStrategy; Review Comment: Updated: removed prioritizedCleanup since it is unnecessary for application resources. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
