dmvk commented on a change in pull request #18536:
URL: https://github.com/apache/flink/pull/18536#discussion_r798308914



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -169,100 +169,35 @@ public Dispatcher(
             DispatcherBootstrapFactory dispatcherBootstrapFactory,
             DispatcherServices dispatcherServices)
             throws Exception {
-        this(
-                rpcService,
-                fencingToken,
-                recoveredJobs,
-                recoveredDirtyJobs,
-                dispatcherBootstrapFactory,
-                dispatcherServices,
-                new JobManagerRunnerRegistry(16));
-    }
-
-    private Dispatcher(

Review comment:
       nit: Why did we revert this commit instead of removing it? It adds noise 
to the review. (no need to address this)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
##########
@@ -873,69 +878,96 @@ public boolean deletePermanent(JobID jobId, 
PermanentBlobKey key) {
      * doesn't touch the job's entry in the {@link BlobStore} to enable 
recovering.
      *
      * @param jobId The {@code JobID} of the job that is subject to cleanup.
-     * @throws IOException if the cleanup failed.
      */
     @Override
-    public void localCleanup(JobID jobId) throws IOException {
+    public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor 
cleanupExecutor) {
         checkNotNull(jobId);
 
+        return CompletableFuture.runAsync(
+                () -> {
+                    readWriteLock.writeLock().lock();
+                    try {
+                        internalLocalCleanup(jobId);
+                    } catch (IOException e) {
+                        throw new CompletionException(e);
+                    } finally {
+                        readWriteLock.writeLock().unlock();
+                    }
+                },
+                cleanupExecutor);
+    }
+
+    @GuardedBy("readWriteLock")
+    private void internalLocalCleanup(JobID jobId) throws IOException {
         final File jobDir =
                 new File(
                         BlobUtils.getStorageLocationPath(
                                 storageDir.deref().getAbsolutePath(), jobId));
-
         FileUtils.deleteDirectory(jobDir);
 
-        // NOTE on why blobExpiryTimes are not cleaned up:
-        //       Instead of going through blobExpiryTimes, keep lingering 
entries - they
-        //       will be cleaned up by the timer task which tolerates 
non-existing files
-        //       If inserted again with the same IDs (via put()), the TTL will 
be updated
-        //       again.
+        // NOTE on why blobExpiryTimes are not cleaned up: Instead of going 
through
+        // blobExpiryTimes, keep lingering entries. They will be cleaned up by 
the timer
+        // task which tolerate non-existing files. If inserted again with the 
same IDs
+        // (via put()), the TTL will be updated again.
     }
 
     /**
      * Removes all BLOBs from local and HA store belonging to the given {@link 
JobID}.
      *
      * @param jobId ID of the job this blob belongs to
-     * @throws Exception if the cleanup fails.
      */
     @Override
-    public void globalCleanup(JobID jobId) throws Exception {
+    public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor 
executor) {
         checkNotNull(jobId);
 
-        readWriteLock.writeLock().lock();
+        return CompletableFuture.runAsync(
+                () -> {
+                    readWriteLock.writeLock().lock();
 
-        try {
-            Exception exception = null;
+                    try {
+                        IOException exception = null;
 
-            try {
-                localCleanup(jobId);
-            } catch (IOException e) {
-                exception = e;
-            }
+                        try {
+                            internalLocalCleanup(jobId);
+                        } catch (IOException e) {
+                            exception = e;
+                        }
 
-            if (!blobStore.deleteAll(jobId)) {
-                exception =
-                        ExceptionUtils.firstOrSuppressed(
-                                new FlinkException(
-                                        "Error while cleaning up the BlobStore 
for job " + jobId),
-                                exception);
-            }
+                        if (!blobStore.deleteAll(jobId)) {
+                            exception =
+                                    ExceptionUtils.firstOrSuppressed(
+                                            new IOException(
+                                                    "Error while cleaning up 
the BlobStore for job "
+                                                            + jobId),
+                                            exception);
+                        }
 
-            ExceptionUtils.tryRethrowException(exception);
-        } finally {
-            readWriteLock.writeLock().unlock();
-        }
+                        if (exception != null) {
+                            throw new CompletionException(exception);
+                        }
+                    } finally {
+                        readWriteLock.writeLock().unlock();
+                    }
+                },
+                executor);
     }
 
-    public void retainJobs(Collection<JobID> jobsToRetain) throws Exception {
+    public void retainJobs(Collection<JobID> jobsToRetain, Executor 
ioExecutor) throws IOException {
         if (storageDir.deref().exists()) {
             final Set<JobID> jobsToRemove = 
BlobUtils.listExistingJobs(storageDir.deref().toPath());
 
             jobsToRemove.removeAll(jobsToRetain);
 
+            final Collection<CompletableFuture<Void>> cleanupResultFutures =
+                    new ArrayList<>(jobsToRemove.size());
             for (JobID jobToRemove : jobsToRemove) {
-                globalCleanup(jobToRemove);
+                cleanupResultFutures.add(globalCleanupAsync(jobToRemove, 
ioExecutor));
+            }
+
+            try {
+                FutureUtils.completeAll(cleanupResultFutures).get();
+            } catch (InterruptedException | ExecutionException e) {
+                ExceptionUtils.rethrowIOException(e);

Review comment:
       This is never going re-throw

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
##########
@@ -873,69 +878,96 @@ public boolean deletePermanent(JobID jobId, 
PermanentBlobKey key) {
      * doesn't touch the job's entry in the {@link BlobStore} to enable 
recovering.
      *
      * @param jobId The {@code JobID} of the job that is subject to cleanup.
-     * @throws IOException if the cleanup failed.
      */
     @Override
-    public void localCleanup(JobID jobId) throws IOException {
+    public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor 
cleanupExecutor) {
         checkNotNull(jobId);
 
+        return CompletableFuture.runAsync(

Review comment:
       ```java
       private CompletableFuture<Void> writeAsync(
               ThrowingRunnable<IOException> runnable, Executor executor) {
           return CompletableFuture.runAsync(
                   () -> {
                       readWriteLock.writeLock().lock();
                       try {
                           runnable.run();
                       } catch (IOException e) {
                           throw new CompletionException(e);
                       } finally {
                           readWriteLock.writeLock().unlock();
                       }
                   },
                   executor);
       }
   ```
   
   Introducing a simple helper method would make the code more readable here 
for both local and global cleanup

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -501,45 +441,59 @@ private boolean isPartialResourceConfigured(JobGraph 
jobGraph) {
     private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph 
jobGraph) {

Review comment:
       Can be simplified
   
   ```java
   private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
       log.info("Submitting job '{}' ({}).", jobGraph.getName(), 
jobGraph.getJobID());
       return waitForTerminatingJob(jobGraph.getJobID(), jobGraph, 
this::persistAndRunJob)
               .handle((ignored, throwable) -> 
handleTermination(jobGraph.getJobID(), throwable))
               .thenCompose(Function.identity());
   }
   
   private CompletableFuture<Acknowledge> handleTermination(
           JobID jobId, @Nullable Throwable terminationThrowable) {
       if (terminationThrowable != null) {
           return globalResourceCleaner
                   .cleanupAsync(jobId)
                   .handleAsync(
                           (ignored, cleanupThrowable) -> {
                               if (cleanupThrowable != null) {
                                   log.warn(
                                           "Cleanup didn't succeed after job 
submission failed for job {}.",
                                           jobId,
                                           cleanupThrowable);
                                   
terminationThrowable.addSuppressed(cleanupThrowable);
                               }
                               
ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(
                                       terminationThrowable);
                               final Throwable strippedThrowable =
                                       ExceptionUtils.stripCompletionException(
                                               terminationThrowable);
                               log.error("Failed to submit job {}.", jobId, 
strippedThrowable);
                               throw new CompletionException(
                                       new JobSubmissionException(
                                               jobId, "Failed to submit job.", 
strippedThrowable));
                           },
                           getMainThreadExecutor());
       }
       return CompletableFuture.completedFuture(Acknowledge.get());
   }
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -1169,6 +1136,7 @@ private void jobMasterFailed(JobID jobId, Throwable 
cause) {
     }
 
     CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
+        getMainThreadExecutor().assertRunningInMainThread();

Review comment:
       remove

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java
##########
@@ -238,42 +241,77 @@ public void putJobGraph(JobGraph jobGraph) throws 
Exception {
     }
 
     @Override
-    public void globalCleanup(JobID jobId) throws Exception {
+    public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor 
executor) {
         checkNotNull(jobId, "Job ID");
-        String name = jobGraphStoreUtil.jobIDToName(jobId);
 
-        LOG.debug("Removing job graph {} from {}.", jobId, 
jobGraphStateHandleStore);
+        return CompletableFuture.runAsync(

Review comment:
       Similar to `BlobServer` & `writeAsync` comment, we can simplify this a 
bit

##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/concurrent/RunningInMainThreadAssert.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+/**
+ * {@code RunningInMainThreadAssert} provides a method for verifying that the 
current thread is the
+ * main thread.
+ */
+@FunctionalInterface
+public interface RunningInMainThreadAssert {

Review comment:
       I miss the point of disentangling this from the 
`ComponentMainThreadExecutor`. It doesn't feel like a simplification (we 
already have all necessary test tools for `ComponentMainThreadExecutor` in 
place)
   
   Why do we need to do that?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -937,16 +893,27 @@ private void registerJobManagerRunnerTerminationFuture(
     private CompletableFuture<Void> removeJob(JobID jobId, CleanupJobState 
cleanupJobState) {
         switch (cleanupJobState) {
             case LOCAL:
-                return localResourceCleaner.cleanupAsync(jobId);
+                return localCleanup(jobId);
             case GLOBAL:
-                return globalResourceCleaner
-                        .cleanupAsync(jobId)
-                        .thenRun(() -> markJobAsClean(jobId));
+                return globalCleanup(jobId).thenRun(() -> 
markJobAsClean(jobId));
             default:
                 throw new IllegalStateException("Invalid cleanup state: " + 
cleanupJobState);
         }
     }
 
+    private CompletableFuture<Void> globalCleanup(JobID jobId) {
+        getMainThreadExecutor().assertRunningInMainThread();

Review comment:
       no need to check this (for both local & global), we already have a 
safeguard in resource cleaner

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -571,9 +525,11 @@ private void runJob(JobGraph jobGraph, ExecutionType 
executionType) throws Excep
                                 getMainThreadExecutor());
 
         final CompletableFuture<Void> jobTerminationFuture =
-                cleanupJobStateFuture
-                        .thenApply(cleanupJobState -> removeJob(jobId, 
cleanupJobState))
-                        .thenCompose(Function.identity());
+                cleanupJobStateFuture.thenCompose(
+                        cleanupJobState -> {
+                            
getMainThreadExecutor().assertRunningInMainThread();

Review comment:
       we don't need this check here, this was for debugging

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java
##########
@@ -32,29 +34,37 @@
 public class DefaultResourceCleaner implements ResourceCleaner {

Review comment:
       This class seems bit confusing. Few things:
   - 👍 It's immutable
   - 👎 Working with bi-functions instead of resources
   - 👎 Confusing how to construct this class. Builder pattern would make things 
simpler
   
   What about something along these lines?
   
   ```java
   package org.apache.flink.runtime.dispatcher.cleanup;
   
   import org.apache.flink.api.common.JobID;
   import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
   import org.apache.flink.util.concurrent.FutureUtils;
   
   import java.util.Collection;
   import java.util.concurrent.CompletableFuture;
   import java.util.concurrent.Executor;
   import java.util.stream.Collectors;
   
   /** {@code DefaultResourceCleaner} is the default implementation of {@link 
ResourceCleaner}. */
   public class DefaultResourceCleaner<T> implements ResourceCleaner {
   
       private final ComponentMainThreadExecutor mainThreadExecutor;
       private final Executor cleanupExecutor;
       private final CleanupFn<T> cleanupFn;
   
       private final Collection<T> prioritizedCleanup;
       private final Collection<T> regularCleanup;
   
       public static Builder<LocallyCleanableResource> ofLocalResource(
               ComponentMainThreadExecutor mainThreadExecutor, Executor 
cleanupExecutor) {
           return new Builder<>(
                   mainThreadExecutor, cleanupExecutor, 
LocallyCleanableResource::localCleanupAsync);
       }
   
       public static Builder<GloballyCleanableResource> ofGlobalResource(
               ComponentMainThreadExecutor mainThreadExecutor, Executor 
cleanupExecutor) {
           return new Builder<>(
                   mainThreadExecutor, cleanupExecutor, 
GloballyCleanableResource::globalCleanupAsync);
       }
   
       @FunctionalInterface
       private interface CleanupFn<T> {
   
           CompletableFuture<Void> cleanupAsync(T resource, JobID jobId, 
Executor cleanupExecutor);
       }
   
       public static class Builder<T> {
   
           private final ComponentMainThreadExecutor mainThreadExecutor;
           private final Executor cleanupExecutor;
           private final CleanupFn<T> cleanupFn;
   
           private Collection<T> prioritizedCleanup;
           private Collection<T> regularCleanup;
   
           private Builder(
                   ComponentMainThreadExecutor mainThreadExecutor,
                   Executor cleanupExecutor,
                   CleanupFn<T> cleanupFn) {
               this.mainThreadExecutor = mainThreadExecutor;
               this.cleanupExecutor = cleanupExecutor;
               this.cleanupFn = cleanupFn;
           }
   
           public void withPrioritizedCleanup(Collection<T> prioritizedCleanup) 
{
               this.prioritizedCleanup = prioritizedCleanup;
           }
   
           public void withRegularCleanup(Collection<T> regularCleanup) {
               this.regularCleanup = regularCleanup;
           }
   
           public DefaultResourceCleaner<T> build() {
               return new DefaultResourceCleaner<>(
                       mainThreadExecutor,
                       cleanupExecutor,
                       cleanupFn,
                       prioritizedCleanup,
                       regularCleanup);
           }
       }
   
       private DefaultResourceCleaner(
               ComponentMainThreadExecutor mainThreadExecutor,
               Executor cleanupExecutor,
               CleanupFn<T> cleanupFn,
               Collection<T> prioritizedCleanup,
               Collection<T> regularCleanup) {
           this.mainThreadExecutor = mainThreadExecutor;
           this.cleanupExecutor = cleanupExecutor;
           this.cleanupFn = cleanupFn;
           this.prioritizedCleanup = prioritizedCleanup;
           this.regularCleanup = regularCleanup;
       }
   
       @Override
       public CompletableFuture<Void> cleanupAsync(JobID jobId) {
           mainThreadExecutor.assertRunningInMainThread();
           CompletableFuture<Void> cleanupFuture = 
FutureUtils.completedVoidFuture();
           for (T cleanup : prioritizedCleanup) {
               cleanupFuture =
                       cleanupFuture.thenCompose(
                               ignoredValue ->
                                       cleanupFn.cleanupAsync(cleanup, jobId, 
cleanupExecutor));
           }
           return cleanupFuture.thenCompose(
                   ignoredValue ->
                           FutureUtils.completeAll(
                                   regularCleanup.stream()
                                           .map(
                                                   cleanup ->
                                                           
cleanupFn.cleanupAsync(
                                                                   cleanup, 
jobId, cleanupExecutor))
                                           .collect(Collectors.toList())));
       }
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to