This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f96b699d70a27dfad69a3e12afc3b708cc708700
Author: Matthias Pohl <[email protected]>
AuthorDate: Wed Dec 15 12:37:05 2021 +0100

    [FLINK-25432][runtime] Makes BlobServer implement LocallyCleanableResource 
and GloballyCleanableResource
---
 .../org/apache/flink/runtime/blob/BlobServer.java  | 120 +++++++++++++++------
 .../flink/runtime/dispatcher/Dispatcher.java       |  22 +++-
 .../flink/runtime/blob/BlobCacheCleanupTest.java   |  16 +--
 .../flink/runtime/blob/BlobServerCleanupTest.java  |   7 +-
 .../flink/runtime/blob/BlobServerDeleteTest.java   |  15 ++-
 .../flink/runtime/blob/BlobServerRecoveryTest.java |   7 +-
 .../dispatcher/DispatcherResourceCleanupTest.java  |  79 ++++++++++----
 .../BlobLibraryCacheRecoveryITCase.java            |   9 +-
 8 files changed, 198 insertions(+), 77 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 944031e..7b239ce0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -25,18 +25,23 @@ import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableResource;
+import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.Reference;
 import org.apache.flink.util.ShutdownHookUtil;
+import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.FunctionUtils;
+import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 import javax.net.ServerSocketFactory;
 
 import java.io.File;
@@ -55,8 +60,12 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.Timer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -74,7 +83,12 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * the directory structure to store the BLOBs or temporarily cache them.
  */
 public class BlobServer extends Thread
-        implements BlobService, BlobWriter, PermanentBlobService, 
TransientBlobService {
+        implements BlobService,
+                BlobWriter,
+                PermanentBlobService,
+                TransientBlobService,
+                LocallyCleanableResource,
+                GloballyCleanableResource {
 
     /** The log object used for debugging. */
     private static final Logger LOG = 
LoggerFactory.getLogger(BlobServer.class);
@@ -861,61 +875,99 @@ public class BlobServer extends Thread
     }
 
     /**
-     * Removes all BLOBs from local and HA store belonging to the given job ID.
+     * Deletes locally stored artifacts for the job represented by the given 
{@link JobID}. This
+     * doesn't touch the job's entry in the {@link BlobStore} to enable 
recovering.
      *
-     * @param jobId ID of the job this blob belongs to
-     * @param cleanupBlobStoreFiles True if the corresponding blob store files 
shall be cleaned up
-     *     as well. Otherwise false.
-     * @return <tt>true</tt> if the job directory is successfully deleted or 
non-existing;
-     *     <tt>false</tt> otherwise
+     * @param jobId The {@code JobID} of the job that is subject to cleanup.
      */
-    public boolean cleanupJob(JobID jobId, boolean cleanupBlobStoreFiles) {
+    @Override
+    public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor 
cleanupExecutor) {
         checkNotNull(jobId);
 
+        return runAsyncWithWriteLock(() -> internalLocalCleanup(jobId), 
cleanupExecutor);
+    }
+
+    @GuardedBy("readWriteLock")
+    private void internalLocalCleanup(JobID jobId) throws IOException {
         final File jobDir =
                 new File(
                         BlobUtils.getStorageLocationPath(
                                 storageDir.deref().getAbsolutePath(), jobId));
+        FileUtils.deleteDirectory(jobDir);
 
-        readWriteLock.writeLock().lock();
+        // NOTE on why blobExpiryTimes are not cleaned up: Instead of going 
through
+        // blobExpiryTimes, keep lingering entries. They will be cleaned up by 
the timer
+        // task which tolerate non-existing files. If inserted again with the 
same IDs
+        // (via put()), the TTL will be updated again.
+    }
 
-        try {
-            // delete locally
-            boolean deletedLocally = false;
-            try {
-                FileUtils.deleteDirectory(jobDir);
+    /**
+     * Removes all BLOBs from local and HA store belonging to the given {@link 
JobID}.
+     *
+     * @param jobId ID of the job this blob belongs to
+     */
+    @Override
+    public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor 
executor) {
+        checkNotNull(jobId);
 
-                // NOTE on why blobExpiryTimes are not cleaned up:
-                //       Instead of going through blobExpiryTimes, keep 
lingering entries - they
-                //       will be cleaned up by the timer task which tolerates 
non-existing files
-                //       If inserted again with the same IDs (via put()), the 
TTL will be updated
-                //       again.
+        return runAsyncWithWriteLock(
+                () -> {
+                    IOException exception = null;
 
-                deletedLocally = true;
-            } catch (IOException e) {
-                LOG.warn(
-                        "Failed to locally delete BLOB storage directory at "
-                                + jobDir.getAbsolutePath(),
-                        e);
-            }
+                    try {
+                        internalLocalCleanup(jobId);
+                    } catch (IOException e) {
+                        exception = e;
+                    }
 
-            // delete in HA blob store files
-            final boolean deletedHA = !cleanupBlobStoreFiles || 
blobStore.deleteAll(jobId);
+                    if (!blobStore.deleteAll(jobId)) {
+                        exception =
+                                ExceptionUtils.firstOrSuppressed(
+                                        new IOException(
+                                                "Error while cleaning up the 
BlobStore for job "
+                                                        + jobId),
+                                        exception);
+                    }
 
-            return deletedLocally && deletedHA;
-        } finally {
-            readWriteLock.writeLock().unlock();
-        }
+                    if (exception != null) {
+                        throw new IOException(exception);
+                    }
+                },
+                executor);
+    }
+
+    private CompletableFuture<Void> runAsyncWithWriteLock(
+            ThrowingRunnable<IOException> runnable, Executor executor) {
+        return CompletableFuture.runAsync(
+                () -> {
+                    readWriteLock.writeLock().lock();
+                    try {
+                        runnable.run();
+                    } catch (IOException e) {
+                        throw new CompletionException(e);
+                    } finally {
+                        readWriteLock.writeLock().unlock();
+                    }
+                },
+                executor);
     }
 
-    public void retainJobs(Collection<JobID> jobsToRetain) throws IOException {
+    public void retainJobs(Collection<JobID> jobsToRetain, Executor 
ioExecutor) throws IOException {
         if (storageDir.deref().exists()) {
             final Set<JobID> jobsToRemove = 
BlobUtils.listExistingJobs(storageDir.deref().toPath());
 
             jobsToRemove.removeAll(jobsToRetain);
 
+            final Collection<CompletableFuture<Void>> cleanupResultFutures =
+                    new ArrayList<>(jobsToRemove.size());
             for (JobID jobToRemove : jobsToRemove) {
-                cleanupJob(jobToRemove, true);
+                cleanupResultFutures.add(globalCleanupAsync(jobToRemove, 
ioExecutor));
+            }
+
+            try {
+                FutureUtils.completeAll(cleanupResultFutures).get();
+            } catch (InterruptedException | ExecutionException e) {
+                ExceptionUtils.rethrowIOException(e);
             }
         }
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index e6b7e8e..fecba4b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -201,7 +201,8 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
         this.recoveredJobs = new HashSet<>(recoveredJobs);
 
         this.blobServer.retainJobs(
-                
recoveredJobs.stream().map(JobGraph::getJobID).collect(Collectors.toSet()));
+                
recoveredJobs.stream().map(JobGraph::getJobID).collect(Collectors.toSet()),
+                ioExecutor);
 
         this.dispatcherCachedOperationsHandler =
                 new DispatcherCachedOperationsHandler(
@@ -909,8 +910,25 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                 log.warn(
                         "Could not properly clean data for job {} stored by ha 
services", jobId, e);
             }
+
+            try {
+                blobServer.globalCleanup(jobId);
+            } catch (Exception e) {
+                log.warn(
+                        "Could not properly global clean data for job {} 
stored in the BlobServer.",
+                        jobId,
+                        e);
+            }
+        } else {
+            try {
+                blobServer.localCleanup(jobId);
+            } catch (IOException e) {
+                log.warn(
+                        "Could not properly clean local data for job {} stored 
in the BlobServer.",
+                        jobId,
+                        e);
+            }
         }
-        blobServer.cleanupJob(jobId, jobGraphRemoved);
     }
 
     private void markJobAsClean(JobID jobId) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
index 42fa235..6940ed8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.FutureUtils;
 
+import org.hamcrest.collection.IsEmptyCollection;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
@@ -44,7 +45,6 @@ import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -317,14 +317,12 @@ public class BlobCacheCleanupTest extends TestLogger {
     }
 
     @Test
-    public void testTransientBlobNoJobCleanup()
-            throws IOException, InterruptedException, ExecutionException {
+    public void testTransientBlobNoJobCleanup() throws Exception {
         testTransientBlobCleanup(null);
     }
 
     @Test
-    public void testTransientBlobForJobCleanup()
-            throws IOException, InterruptedException, ExecutionException {
+    public void testTransientBlobForJobCleanup() throws Exception {
         testTransientBlobCleanup(new JobID());
     }
 
@@ -332,8 +330,7 @@ public class BlobCacheCleanupTest extends TestLogger {
      * Tests that {@link TransientBlobCache} cleans up after a default TTL and 
keeps files which are
      * constantly accessed.
      */
-    private void testTransientBlobCleanup(@Nullable final JobID jobId)
-            throws IOException, InterruptedException, ExecutionException {
+    private void testTransientBlobCleanup(@Nullable final JobID jobId) throws 
Exception {
 
         // 1s should be a safe-enough buffer to still check for existence 
after a BLOB's last access
         long cleanupInterval = 1L; // in seconds
@@ -351,6 +348,7 @@ public class BlobCacheCleanupTest extends TestLogger {
 
         long cleanupLowerBound;
 
+        final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
         try (BlobServer server =
                         new BlobServer(config, temporaryFolder.newFolder(), 
new VoidBlobStore());
                 final BlobCacheService cache =
@@ -390,7 +388,7 @@ public class BlobCacheCleanupTest extends TestLogger {
             // files are cached now for the given TTL - remove from server so 
that they are not
             // re-downloaded
             if (jobId != null) {
-                server.cleanupJob(jobId, true);
+                server.globalCleanupAsync(jobId, executorService).join();
             } else {
                 server.deleteFromCache(key1);
                 server.deleteFromCache(key2);
@@ -430,6 +428,8 @@ public class BlobCacheCleanupTest extends TestLogger {
             filesFuture.get();
 
             verifyDeletedEventually(server, jobId, key1, key2);
+        } finally {
+            assertThat(executorService.shutdownNow(), 
IsEmptyCollection.empty());
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
index f4beefd..f4fa398 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
@@ -220,7 +220,7 @@ public class BlobServerCleanupTest extends TestLogger {
     }
 
     @Test
-    public void testBlobServerRetainsJobs() throws IOException {
+    public void testBlobServerRetainsJobs() throws Exception {
         final File storageDirectory = temporaryFolder.newFolder();
 
         final JobID jobId1 = new JobID();
@@ -232,13 +232,16 @@ public class BlobServerCleanupTest extends TestLogger {
         final PermanentBlobKey blobKey2 =
                 TestingBlobUtils.writePermanentBlob(storageDirectory.toPath(), 
jobId2, fileContent);
 
+        final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
         try (final BlobServer blobServer =
                 new BlobServer(new Configuration(), storageDirectory, new 
VoidBlobStore())) {
-            blobServer.retainJobs(Collections.singleton(jobId1));
+            blobServer.retainJobs(Collections.singleton(jobId1), 
executorService);
 
             assertThat(blobServer.getFile(jobId1, 
blobKey1)).hasBinaryContent(fileContent);
             assertThatThrownBy(() -> blobServer.getFile(jobId2, blobKey2))
                     .isInstanceOf(NoSuchFileException.class);
+        } finally {
+            assertThat(executorService.shutdownNow()).isEmpty();
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index a87a2b1..91ad3ff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.FutureUtils;
 
+import org.hamcrest.collection.IsEmptyCollection;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -41,6 +42,7 @@ import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -51,6 +53,7 @@ import static 
org.apache.flink.runtime.blob.BlobServerCleanupTest.checkFileCount
 import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -276,7 +279,8 @@ public class BlobServerDeleteTest extends TestLogger {
     }
 
     /**
-     * Tests that {@link BlobServer} cleans up after calling {@link 
BlobServer#cleanupJob}.
+     * Tests that {@link BlobServer} cleans up after calling {@link
+     * BlobServer#globalCleanupAsync(JobID, Executor)}.
      *
      * @param blobType whether the BLOB should become permanent or transient
      */
@@ -286,6 +290,7 @@ public class BlobServerDeleteTest extends TestLogger {
 
         Configuration config = new Configuration();
 
+        final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
         try (BlobServer server =
                 new BlobServer(config, temporaryFolder.newFolder(), new 
VoidBlobStore())) {
 
@@ -308,7 +313,7 @@ public class BlobServerDeleteTest extends TestLogger {
             verifyContents(server, jobId2, key2, data);
             checkFileCountForJob(1, jobId2, server);
 
-            server.cleanupJob(jobId1, true);
+            server.globalCleanupAsync(jobId1, executorService).join();
 
             verifyDeleted(server, jobId1, key1a);
             verifyDeleted(server, jobId1, key1b);
@@ -316,14 +321,16 @@ public class BlobServerDeleteTest extends TestLogger {
             verifyContents(server, jobId2, key2, data);
             checkFileCountForJob(1, jobId2, server);
 
-            server.cleanupJob(jobId2, true);
+            server.globalCleanupAsync(jobId2, executorService).join();
 
             checkFileCountForJob(0, jobId1, server);
             verifyDeleted(server, jobId2, key2);
             checkFileCountForJob(0, jobId2, server);
 
             // calling a second time should not fail
-            server.cleanupJob(jobId2, true);
+            server.globalCleanupAsync(jobId2, executorService).join();
+        } finally {
+            assertThat(executorService.shutdownNow(), 
IsEmptyCollection.empty());
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
index dfa2799..5f3e6c5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.Executors;
 
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -89,7 +90,7 @@ public class BlobServerRecoveryTest extends TestLogger {
      */
     public static void testBlobServerRecovery(
             final Configuration config, final BlobStore blobStore, final File 
blobStorage)
-            throws IOException {
+            throws Exception {
         final String clusterId = 
config.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
         String storagePath =
                 config.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + 
"/" + clusterId;
@@ -141,8 +142,8 @@ public class BlobServerRecoveryTest extends TestLogger {
             verifyDeleted(cache1, jobId[0], nonHAKey);
 
             // Remove again
-            server1.cleanupJob(jobId[0], true);
-            server1.cleanupJob(jobId[1], true);
+            server1.globalCleanupAsync(jobId[0], 
Executors.directExecutor()).join();
+            server1.globalCleanupAsync(jobId[1], 
Executors.directExecutor()).join();
 
             // Verify everything is clean
             assertTrue("HA storage directory does not exist", fs.exists(new 
Path(storagePath)));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index 4992c76..7e057f1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -87,9 +87,11 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.BiFunction;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -136,7 +138,8 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
 
     private CompletableFuture<BlobKey> storedHABlobFuture;
     private CompletableFuture<JobID> deleteAllHABlobsFuture;
-    private CompletableFuture<JobID> cleanupJobFuture;
+    private CompletableFuture<JobID> localCleanupFuture;
+    private CompletableFuture<JobID> globalCleanupFuture;
     private CompletableFuture<JobID> cleanupJobHADataFuture;
     private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE;
 
@@ -169,14 +172,22 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
                         .setDeleteAllFunction(deleteAllHABlobsFuture::complete)
                         .createTestingBlobStore();
 
-        cleanupJobFuture = new CompletableFuture<>();
+        globalCleanupFuture = new CompletableFuture<>();
+        localCleanupFuture = new CompletableFuture<>();
 
         blobServer =
                 new TestingBlobServer(
                         configuration,
                         temporaryFolder.newFolder(),
                         testingBlobStore,
-                        cleanupJobFuture);
+                        (jobId, ignoredExecutor) -> {
+                            globalCleanupFuture.complete(jobId);
+                            return FutureUtils.completedVoidFuture();
+                        },
+                        (jobId, ignoredExecutor) -> {
+                            localCleanupFuture.complete(jobId);
+                            return FutureUtils.completedVoidFuture();
+                        });
     }
 
     private TestingJobManagerRunnerFactory startDispatcherAndSubmitJob() 
throws Exception {
@@ -254,8 +265,8 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
     }
 
     private void assertThatHABlobsHaveBeenRemoved()
-            throws InterruptedException, ExecutionException {
-        assertThat(cleanupJobFuture.get(), equalTo(jobId));
+            throws InterruptedException, ExecutionException, TimeoutException {
+        assertGlobalCleanupTriggered(jobId);
 
         // verify that we also cleared the BlobStore
         assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId));
@@ -294,8 +305,7 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
                 jobManagerRunnerFactory.takeCreatedJobManagerRunner();
         suspendJob(testingJobManagerRunner);
 
-        assertThat(cleanupJobFuture.get(), equalTo(jobId));
-
+        assertLocalCleanupTriggered(jobId);
         assertThat(blobFile.exists(), is(false));
 
         // verify that we did not clear the BlobStore
@@ -331,8 +341,7 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
 
         dispatcher.closeAsync().get();
 
-        assertThat(cleanupJobFuture.get(), equalTo(jobId));
-
+        assertLocalCleanupTriggered(jobId);
         assertThat(blobFile.exists(), is(false));
 
         // verify that we did not clear the BlobStore
@@ -375,7 +384,7 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
         // check that no exceptions have been thrown
         dispatcherTerminationFuture.get();
 
-        assertThat(cleanupJobFuture.get(), is(jobId));
+        assertGlobalCleanupTriggered(jobId);
         assertThat(deleteAllHABlobsFuture.get(), is(jobId));
     }
 
@@ -468,7 +477,7 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
                         is(true));
             }
 
-            assertThatHABlobsHaveNotBeenRemoved();
+            assertThatNoCleanupWasTriggered();
         } finally {
             
finishJob(testingJobManagerRunnerFactoryNG.takeCreatedJobManagerRunner());
         }
@@ -519,8 +528,9 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
                                 .build()));
     }
 
-    private void assertThatHABlobsHaveNotBeenRemoved() {
-        assertThat(cleanupJobFuture.isDone(), is(false));
+    private void assertThatNoCleanupWasTriggered() {
+        assertThat(globalCleanupFuture.isDone(), is(false));
+        assertThat(localCleanupFuture.isDone(), is(false));
         assertThat(deleteAllHABlobsFuture.isDone(), is(false));
         assertThat(blobFile.exists(), is(true));
     }
@@ -640,7 +650,7 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
                 jobManagerRunnerFactory.takeCreatedJobManagerRunner();
         testingJobManagerRunner.completeResultFuture(new 
ExecutionGraphInfo(executionGraph));
 
-        assertThat(cleanupJobFuture.get(), equalTo(jobId));
+        assertLocalCleanupTriggered(jobId);
         assertThat(deleteAllHABlobsFuture.isDone(), is(false));
     }
 
@@ -659,10 +669,22 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
                 jobManagerRunnerFactory.takeCreatedJobManagerRunner();
         testingJobManagerRunner.completeResultFuture(new 
ExecutionGraphInfo(executionGraph));
 
-        assertThat(cleanupJobFuture.get(), equalTo(jobId));
+        assertGlobalCleanupTriggered(jobId);
         assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId));
     }
 
+    private void assertLocalCleanupTriggered(JobID jobId)
+            throws ExecutionException, InterruptedException, TimeoutException {
+        assertThat(localCleanupFuture.get(100, TimeUnit.MILLISECONDS), 
equalTo(jobId));
+        assertThat(globalCleanupFuture.isDone(), is(false));
+    }
+
+    private void assertGlobalCleanupTriggered(JobID jobId)
+            throws ExecutionException, InterruptedException, TimeoutException {
+        assertThat(localCleanupFuture.get(100, TimeUnit.MILLISECONDS), 
equalTo(jobId));
+        assertThat(globalCleanupFuture.get(100, TimeUnit.MILLISECONDS), 
equalTo(jobId));
+    }
+
     @Test
     public void testFatalErrorIfJobCannotBeMarkedDirtyInJobResultStore() 
throws Exception {
         jobResultStore =
@@ -738,14 +760,18 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
 
     private static final class TestingBlobServer extends BlobServer {
 
-        private final CompletableFuture<JobID> cleanupJobFuture;
+        private final BiFunction<JobID, Executor, CompletableFuture<Void>> 
globalCleanupFunction;
+        private final BiFunction<JobID, Executor, CompletableFuture<Void>> 
localCleanupFunction;
 
         /**
          * Instantiates a new BLOB server and binds it to a free network port.
          *
          * @param config Configuration to be used to instantiate the BlobServer
          * @param blobStore BlobStore to store blobs persistently
-         * @param cleanupJobFuture
+         * @param globalCleanupFunction The function called along the actual 
{@link
+         *     #globalCleanupAsync(JobID, Executor)} call.
+         * @param localCleanupFunction The function called along the actual 
{@link
+         *     #localCleanupAsync(JobID, Executor)} call.
          * @throws IOException thrown if the BLOB server cannot bind to a free 
network port or if
          *     the (local or distributed) file storage cannot be created or is 
not usable
          */
@@ -753,17 +779,24 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
                 Configuration config,
                 File storageDirectory,
                 BlobStore blobStore,
-                CompletableFuture<JobID> cleanupJobFuture)
+                BiFunction<JobID, Executor, CompletableFuture<Void>> 
globalCleanupFunction,
+                BiFunction<JobID, Executor, CompletableFuture<Void>> 
localCleanupFunction)
                 throws IOException {
             super(config, storageDirectory, blobStore);
-            this.cleanupJobFuture = cleanupJobFuture;
+            this.globalCleanupFunction = globalCleanupFunction;
+            this.localCleanupFunction = localCleanupFunction;
+        }
+
+        @Override
+        public CompletableFuture<Void> globalCleanupAsync(JobID jobId, 
Executor executor) {
+            return super.globalCleanupAsync(jobId, executor)
+                    .thenCompose(ignored -> globalCleanupFunction.apply(jobId, 
executor));
         }
 
         @Override
-        public boolean cleanupJob(JobID jobId, boolean cleanupBlobStoreFiles) {
-            final boolean result = super.cleanupJob(jobId, 
cleanupBlobStoreFiles);
-            cleanupJobFuture.complete(jobId);
-            return result;
+        public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor 
executor) {
+            return super.localCleanupAsync(jobId, executor)
+                    .thenCompose(ignored -> localCleanupFunction.apply(jobId, 
executor));
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index b2e1b3d..efb43c3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.collection.IsEmptyCollection;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -41,7 +42,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
@@ -70,6 +74,7 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
                 temporaryFolder.newFolder().getAbsolutePath());
         config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3_600L);
 
+        final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
         try {
             blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
 
@@ -160,7 +165,7 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
             }
 
             // Remove blobs again
-            server[1].cleanupJob(jobId, true);
+            server[1].globalCleanupAsync(jobId, executorService).join();
 
             // Verify everything is clean below recoveryDir/<cluster_id>
             final String clusterId = 
config.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
@@ -173,6 +178,8 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
                     0,
                     recoveryFiles.length);
         } finally {
+            assertThat(executorService.shutdownNow(), 
IsEmptyCollection.empty());
+
             for (BlobLibraryCacheManager s : libServer) {
                 if (s != null) {
                     s.shutdown();

Reply via email to