dmvk commented on a change in pull request #18536:
URL: https://github.com/apache/flink/pull/18536#discussion_r798483128



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
##########
@@ -873,69 +878,96 @@ public boolean deletePermanent(JobID jobId, 
PermanentBlobKey key) {
      * doesn't touch the job's entry in the {@link BlobStore} to enable 
recovering.
      *
      * @param jobId The {@code JobID} of the job that is subject to cleanup.
-     * @throws IOException if the cleanup failed.
      */
     @Override
-    public void localCleanup(JobID jobId) throws IOException {
+    public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor 
cleanupExecutor) {
         checkNotNull(jobId);
 
+        return CompletableFuture.runAsync(
+                () -> {
+                    readWriteLock.writeLock().lock();
+                    try {
+                        internalLocalCleanup(jobId);
+                    } catch (IOException e) {
+                        throw new CompletionException(e);
+                    } finally {
+                        readWriteLock.writeLock().unlock();
+                    }
+                },
+                cleanupExecutor);
+    }
+
+    @GuardedBy("readWriteLock")
+    private void internalLocalCleanup(JobID jobId) throws IOException {
         final File jobDir =
                 new File(
                         BlobUtils.getStorageLocationPath(
                                 storageDir.deref().getAbsolutePath(), jobId));
-
         FileUtils.deleteDirectory(jobDir);
 
-        // NOTE on why blobExpiryTimes are not cleaned up:
-        //       Instead of going through blobExpiryTimes, keep lingering 
entries - they
-        //       will be cleaned up by the timer task which tolerates 
non-existing files
-        //       If inserted again with the same IDs (via put()), the TTL will 
be updated
-        //       again.
+        // NOTE on why blobExpiryTimes are not cleaned up: Instead of going 
through
+        // blobExpiryTimes, keep lingering entries. They will be cleaned up by 
the timer
+        // task which tolerate non-existing files. If inserted again with the 
same IDs
+        // (via put()), the TTL will be updated again.
     }
 
     /**
      * Removes all BLOBs from local and HA store belonging to the given {@link 
JobID}.
      *
      * @param jobId ID of the job this blob belongs to
-     * @throws Exception if the cleanup fails.
      */
     @Override
-    public void globalCleanup(JobID jobId) throws Exception {
+    public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor 
executor) {
         checkNotNull(jobId);
 
-        readWriteLock.writeLock().lock();
+        return CompletableFuture.runAsync(
+                () -> {
+                    readWriteLock.writeLock().lock();
 
-        try {
-            Exception exception = null;
+                    try {
+                        IOException exception = null;
 
-            try {
-                localCleanup(jobId);
-            } catch (IOException e) {
-                exception = e;
-            }
+                        try {
+                            internalLocalCleanup(jobId);
+                        } catch (IOException e) {
+                            exception = e;
+                        }
 
-            if (!blobStore.deleteAll(jobId)) {
-                exception =
-                        ExceptionUtils.firstOrSuppressed(
-                                new FlinkException(
-                                        "Error while cleaning up the BlobStore 
for job " + jobId),
-                                exception);
-            }
+                        if (!blobStore.deleteAll(jobId)) {
+                            exception =
+                                    ExceptionUtils.firstOrSuppressed(
+                                            new IOException(
+                                                    "Error while cleaning up 
the BlobStore for job "
+                                                            + jobId),
+                                            exception);
+                        }
 
-            ExceptionUtils.tryRethrowException(exception);
-        } finally {
-            readWriteLock.writeLock().unlock();
-        }
+                        if (exception != null) {
+                            throw new CompletionException(exception);
+                        }
+                    } finally {
+                        readWriteLock.writeLock().unlock();
+                    }
+                },
+                executor);
     }
 
-    public void retainJobs(Collection<JobID> jobsToRetain) throws Exception {
+    public void retainJobs(Collection<JobID> jobsToRetain, Executor 
ioExecutor) throws IOException {
         if (storageDir.deref().exists()) {
             final Set<JobID> jobsToRemove = 
BlobUtils.listExistingJobs(storageDir.deref().toPath());
 
             jobsToRemove.removeAll(jobsToRetain);
 
+            final Collection<CompletableFuture<Void>> cleanupResultFutures =
+                    new ArrayList<>(jobsToRemove.size());
             for (JobID jobToRemove : jobsToRemove) {
-                globalCleanup(jobToRemove);
+                cleanupResultFutures.add(globalCleanupAsync(jobToRemove, 
ioExecutor));
+            }
+
+            try {
+                FutureUtils.completeAll(cleanupResultFutures).get();
+            } catch (InterruptedException | ExecutionException e) {
+                ExceptionUtils.rethrowIOException(e);

Review comment:
       hmm, I've missed the last branch where we wrap the exception in 
IOException anyway :/ Non of the other branches will match anyway, so this 
basically has the same semantics as:
   
   ```suggestion
                   throw new IOException(e);
   ```
   
   which is more explicit 🤔 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to