dmvk commented on a change in pull request #18536: URL: https://github.com/apache/flink/pull/18536#discussion_r798308914
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ########## @@ -169,100 +169,35 @@ public Dispatcher( DispatcherBootstrapFactory dispatcherBootstrapFactory, DispatcherServices dispatcherServices) throws Exception { - this( - rpcService, - fencingToken, - recoveredJobs, - recoveredDirtyJobs, - dispatcherBootstrapFactory, - dispatcherServices, - new JobManagerRunnerRegistry(16)); - } - - private Dispatcher( Review comment: nit: Why did we revert this commit instead of removing it? It adds noise to the review. (no need to address this) ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ########## @@ -873,69 +878,96 @@ public boolean deletePermanent(JobID jobId, PermanentBlobKey key) { * doesn't touch the job's entry in the {@link BlobStore} to enable recovering. * * @param jobId The {@code JobID} of the job that is subject to cleanup. - * @throws IOException if the cleanup failed. */ @Override - public void localCleanup(JobID jobId) throws IOException { + public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor cleanupExecutor) { checkNotNull(jobId); + return CompletableFuture.runAsync( + () -> { + readWriteLock.writeLock().lock(); + try { + internalLocalCleanup(jobId); + } catch (IOException e) { + throw new CompletionException(e); + } finally { + readWriteLock.writeLock().unlock(); + } + }, + cleanupExecutor); + } + + @GuardedBy("readWriteLock") + private void internalLocalCleanup(JobID jobId) throws IOException { final File jobDir = new File( BlobUtils.getStorageLocationPath( storageDir.deref().getAbsolutePath(), jobId)); - FileUtils.deleteDirectory(jobDir); - // NOTE on why blobExpiryTimes are not cleaned up: - // Instead of going through blobExpiryTimes, keep lingering entries - they - // will be cleaned up by the timer task which tolerates non-existing files - // If inserted again with the same IDs (via put()), the TTL will be updated - // again. + // NOTE on why blobExpiryTimes are not cleaned up: Instead of going through + // blobExpiryTimes, keep lingering entries. They will be cleaned up by the timer + // task which tolerate non-existing files. If inserted again with the same IDs + // (via put()), the TTL will be updated again. } /** * Removes all BLOBs from local and HA store belonging to the given {@link JobID}. * * @param jobId ID of the job this blob belongs to - * @throws Exception if the cleanup fails. */ @Override - public void globalCleanup(JobID jobId) throws Exception { + public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) { checkNotNull(jobId); - readWriteLock.writeLock().lock(); + return CompletableFuture.runAsync( + () -> { + readWriteLock.writeLock().lock(); - try { - Exception exception = null; + try { + IOException exception = null; - try { - localCleanup(jobId); - } catch (IOException e) { - exception = e; - } + try { + internalLocalCleanup(jobId); + } catch (IOException e) { + exception = e; + } - if (!blobStore.deleteAll(jobId)) { - exception = - ExceptionUtils.firstOrSuppressed( - new FlinkException( - "Error while cleaning up the BlobStore for job " + jobId), - exception); - } + if (!blobStore.deleteAll(jobId)) { + exception = + ExceptionUtils.firstOrSuppressed( + new IOException( + "Error while cleaning up the BlobStore for job " + + jobId), + exception); + } - ExceptionUtils.tryRethrowException(exception); - } finally { - readWriteLock.writeLock().unlock(); - } + if (exception != null) { + throw new CompletionException(exception); + } + } finally { + readWriteLock.writeLock().unlock(); + } + }, + executor); } - public void retainJobs(Collection<JobID> jobsToRetain) throws Exception { + public void retainJobs(Collection<JobID> jobsToRetain, Executor ioExecutor) throws IOException { if (storageDir.deref().exists()) { final Set<JobID> jobsToRemove = BlobUtils.listExistingJobs(storageDir.deref().toPath()); jobsToRemove.removeAll(jobsToRetain); + final Collection<CompletableFuture<Void>> cleanupResultFutures = + new ArrayList<>(jobsToRemove.size()); for (JobID jobToRemove : jobsToRemove) { - globalCleanup(jobToRemove); + cleanupResultFutures.add(globalCleanupAsync(jobToRemove, ioExecutor)); + } + + try { + FutureUtils.completeAll(cleanupResultFutures).get(); + } catch (InterruptedException | ExecutionException e) { + ExceptionUtils.rethrowIOException(e); Review comment: This is never going re-throw ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ########## @@ -873,69 +878,96 @@ public boolean deletePermanent(JobID jobId, PermanentBlobKey key) { * doesn't touch the job's entry in the {@link BlobStore} to enable recovering. * * @param jobId The {@code JobID} of the job that is subject to cleanup. - * @throws IOException if the cleanup failed. */ @Override - public void localCleanup(JobID jobId) throws IOException { + public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor cleanupExecutor) { checkNotNull(jobId); + return CompletableFuture.runAsync( Review comment: ```java private CompletableFuture<Void> writeAsync( ThrowingRunnable<IOException> runnable, Executor executor) { return CompletableFuture.runAsync( () -> { readWriteLock.writeLock().lock(); try { runnable.run(); } catch (IOException e) { throw new CompletionException(e); } finally { readWriteLock.writeLock().unlock(); } }, executor); } ``` Introducing a simple helper method would make the code more readable here for both local and global cleanup ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ########## @@ -501,45 +441,59 @@ private boolean isPartialResourceConfigured(JobGraph jobGraph) { private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) { Review comment: Can be simplified ```java private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) { log.info("Submitting job '{}' ({}).", jobGraph.getName(), jobGraph.getJobID()); return waitForTerminatingJob(jobGraph.getJobID(), jobGraph, this::persistAndRunJob) .handle((ignored, throwable) -> handleTermination(jobGraph.getJobID(), throwable)) .thenCompose(Function.identity()); } private CompletableFuture<Acknowledge> handleTermination( JobID jobId, @Nullable Throwable terminationThrowable) { if (terminationThrowable != null) { return globalResourceCleaner .cleanupAsync(jobId) .handleAsync( (ignored, cleanupThrowable) -> { if (cleanupThrowable != null) { log.warn( "Cleanup didn't succeed after job submission failed for job {}.", jobId, cleanupThrowable); terminationThrowable.addSuppressed(cleanupThrowable); } ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError( terminationThrowable); final Throwable strippedThrowable = ExceptionUtils.stripCompletionException( terminationThrowable); log.error("Failed to submit job {}.", jobId, strippedThrowable); throw new CompletionException( new JobSubmissionException( jobId, "Failed to submit job.", strippedThrowable)); }, getMainThreadExecutor()); } return CompletableFuture.completedFuture(Acknowledge.get()); } ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ########## @@ -1169,6 +1136,7 @@ private void jobMasterFailed(JobID jobId, Throwable cause) { } CompletableFuture<Void> getJobTerminationFuture(JobID jobId) { + getMainThreadExecutor().assertRunningInMainThread(); Review comment: remove ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java ########## @@ -238,42 +241,77 @@ public void putJobGraph(JobGraph jobGraph) throws Exception { } @Override - public void globalCleanup(JobID jobId) throws Exception { + public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) { checkNotNull(jobId, "Job ID"); - String name = jobGraphStoreUtil.jobIDToName(jobId); - LOG.debug("Removing job graph {} from {}.", jobId, jobGraphStateHandleStore); + return CompletableFuture.runAsync( Review comment: Similar to `BlobServer` & `writeAsync` comment, we can simplify this a bit ########## File path: flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/concurrent/RunningInMainThreadAssert.java ########## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +/** + * {@code RunningInMainThreadAssert} provides a method for verifying that the current thread is the + * main thread. + */ +@FunctionalInterface +public interface RunningInMainThreadAssert { Review comment: I miss the point of disentangling this from the `ComponentMainThreadExecutor`. It doesn't feel like a simplification (we already have all necessary test tools for `ComponentMainThreadExecutor` in place) Why do we need to do that? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ########## @@ -937,16 +893,27 @@ private void registerJobManagerRunnerTerminationFuture( private CompletableFuture<Void> removeJob(JobID jobId, CleanupJobState cleanupJobState) { switch (cleanupJobState) { case LOCAL: - return localResourceCleaner.cleanupAsync(jobId); + return localCleanup(jobId); case GLOBAL: - return globalResourceCleaner - .cleanupAsync(jobId) - .thenRun(() -> markJobAsClean(jobId)); + return globalCleanup(jobId).thenRun(() -> markJobAsClean(jobId)); default: throw new IllegalStateException("Invalid cleanup state: " + cleanupJobState); } } + private CompletableFuture<Void> globalCleanup(JobID jobId) { + getMainThreadExecutor().assertRunningInMainThread(); Review comment: no need to check this (for both local & global), we already have a safeguard in resource cleaner ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ########## @@ -571,9 +525,11 @@ private void runJob(JobGraph jobGraph, ExecutionType executionType) throws Excep getMainThreadExecutor()); final CompletableFuture<Void> jobTerminationFuture = - cleanupJobStateFuture - .thenApply(cleanupJobState -> removeJob(jobId, cleanupJobState)) - .thenCompose(Function.identity()); + cleanupJobStateFuture.thenCompose( + cleanupJobState -> { + getMainThreadExecutor().assertRunningInMainThread(); Review comment: we don't need this check here, this was for debugging ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java ########## @@ -32,29 +34,37 @@ public class DefaultResourceCleaner implements ResourceCleaner { Review comment: This class seems bit confusing. Few things: - 👍 It's immutable - 👎 Working with bi-functions instead of resources - 👎 Confusing how to construct this class. Builder pattern would make things simpler What about something along these lines? ```java package org.apache.flink.runtime.dispatcher.cleanup; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.util.concurrent.FutureUtils; import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.stream.Collectors; /** {@code DefaultResourceCleaner} is the default implementation of {@link ResourceCleaner}. */ public class DefaultResourceCleaner<T> implements ResourceCleaner { private final ComponentMainThreadExecutor mainThreadExecutor; private final Executor cleanupExecutor; private final CleanupFn<T> cleanupFn; private final Collection<T> prioritizedCleanup; private final Collection<T> regularCleanup; public static Builder<LocallyCleanableResource> ofLocalResource( ComponentMainThreadExecutor mainThreadExecutor, Executor cleanupExecutor) { return new Builder<>( mainThreadExecutor, cleanupExecutor, LocallyCleanableResource::localCleanupAsync); } public static Builder<GloballyCleanableResource> ofGlobalResource( ComponentMainThreadExecutor mainThreadExecutor, Executor cleanupExecutor) { return new Builder<>( mainThreadExecutor, cleanupExecutor, GloballyCleanableResource::globalCleanupAsync); } @FunctionalInterface private interface CleanupFn<T> { CompletableFuture<Void> cleanupAsync(T resource, JobID jobId, Executor cleanupExecutor); } public static class Builder<T> { private final ComponentMainThreadExecutor mainThreadExecutor; private final Executor cleanupExecutor; private final CleanupFn<T> cleanupFn; private Collection<T> prioritizedCleanup; private Collection<T> regularCleanup; private Builder( ComponentMainThreadExecutor mainThreadExecutor, Executor cleanupExecutor, CleanupFn<T> cleanupFn) { this.mainThreadExecutor = mainThreadExecutor; this.cleanupExecutor = cleanupExecutor; this.cleanupFn = cleanupFn; } public void withPrioritizedCleanup(Collection<T> prioritizedCleanup) { this.prioritizedCleanup = prioritizedCleanup; } public void withRegularCleanup(Collection<T> regularCleanup) { this.regularCleanup = regularCleanup; } public DefaultResourceCleaner<T> build() { return new DefaultResourceCleaner<>( mainThreadExecutor, cleanupExecutor, cleanupFn, prioritizedCleanup, regularCleanup); } } private DefaultResourceCleaner( ComponentMainThreadExecutor mainThreadExecutor, Executor cleanupExecutor, CleanupFn<T> cleanupFn, Collection<T> prioritizedCleanup, Collection<T> regularCleanup) { this.mainThreadExecutor = mainThreadExecutor; this.cleanupExecutor = cleanupExecutor; this.cleanupFn = cleanupFn; this.prioritizedCleanup = prioritizedCleanup; this.regularCleanup = regularCleanup; } @Override public CompletableFuture<Void> cleanupAsync(JobID jobId) { mainThreadExecutor.assertRunningInMainThread(); CompletableFuture<Void> cleanupFuture = FutureUtils.completedVoidFuture(); for (T cleanup : prioritizedCleanup) { cleanupFuture = cleanupFuture.thenCompose( ignoredValue -> cleanupFn.cleanupAsync(cleanup, jobId, cleanupExecutor)); } return cleanupFuture.thenCompose( ignoredValue -> FutureUtils.completeAll( regularCleanup.stream() .map( cleanup -> cleanupFn.cleanupAsync( cleanup, jobId, cleanupExecutor)) .collect(Collectors.toList()))); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org