This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f96b699d70a27dfad69a3e12afc3b708cc708700 Author: Matthias Pohl <[email protected]> AuthorDate: Wed Dec 15 12:37:05 2021 +0100 [FLINK-25432][runtime] Makes BlobServer implement LocallyCleanableResource and GloballyCleanableResource --- .../org/apache/flink/runtime/blob/BlobServer.java | 120 +++++++++++++++------ .../flink/runtime/dispatcher/Dispatcher.java | 22 +++- .../flink/runtime/blob/BlobCacheCleanupTest.java | 16 +-- .../flink/runtime/blob/BlobServerCleanupTest.java | 7 +- .../flink/runtime/blob/BlobServerDeleteTest.java | 15 ++- .../flink/runtime/blob/BlobServerRecoveryTest.java | 7 +- .../dispatcher/DispatcherResourceCleanupTest.java | 79 ++++++++++---- .../BlobLibraryCacheRecoveryITCase.java | 9 +- 8 files changed, 198 insertions(+), 77 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 944031e..7b239ce0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -25,18 +25,23 @@ import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableResource; +import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource; import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FileUtils; import org.apache.flink.util.NetUtils; import org.apache.flink.util.Reference; import org.apache.flink.util.ShutdownHookUtil; +import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.FunctionUtils; +import org.apache.flink.util.function.ThrowingRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import javax.net.ServerSocketFactory; import java.io.File; @@ -55,8 +60,12 @@ import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.Timer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; @@ -74,7 +83,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * the directory structure to store the BLOBs or temporarily cache them. */ public class BlobServer extends Thread - implements BlobService, BlobWriter, PermanentBlobService, TransientBlobService { + implements BlobService, + BlobWriter, + PermanentBlobService, + TransientBlobService, + LocallyCleanableResource, + GloballyCleanableResource { /** The log object used for debugging. */ private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class); @@ -861,61 +875,99 @@ public class BlobServer extends Thread } /** - * Removes all BLOBs from local and HA store belonging to the given job ID. + * Deletes locally stored artifacts for the job represented by the given {@link JobID}. This + * doesn't touch the job's entry in the {@link BlobStore} to enable recovering. * - * @param jobId ID of the job this blob belongs to - * @param cleanupBlobStoreFiles True if the corresponding blob store files shall be cleaned up - * as well. Otherwise false. - * @return <tt>true</tt> if the job directory is successfully deleted or non-existing; - * <tt>false</tt> otherwise + * @param jobId The {@code JobID} of the job that is subject to cleanup. */ - public boolean cleanupJob(JobID jobId, boolean cleanupBlobStoreFiles) { + @Override + public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor cleanupExecutor) { checkNotNull(jobId); + return runAsyncWithWriteLock(() -> internalLocalCleanup(jobId), cleanupExecutor); + } + + @GuardedBy("readWriteLock") + private void internalLocalCleanup(JobID jobId) throws IOException { final File jobDir = new File( BlobUtils.getStorageLocationPath( storageDir.deref().getAbsolutePath(), jobId)); + FileUtils.deleteDirectory(jobDir); - readWriteLock.writeLock().lock(); + // NOTE on why blobExpiryTimes are not cleaned up: Instead of going through + // blobExpiryTimes, keep lingering entries. They will be cleaned up by the timer + // task which tolerate non-existing files. If inserted again with the same IDs + // (via put()), the TTL will be updated again. + } - try { - // delete locally - boolean deletedLocally = false; - try { - FileUtils.deleteDirectory(jobDir); + /** + * Removes all BLOBs from local and HA store belonging to the given {@link JobID}. + * + * @param jobId ID of the job this blob belongs to + */ + @Override + public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) { + checkNotNull(jobId); - // NOTE on why blobExpiryTimes are not cleaned up: - // Instead of going through blobExpiryTimes, keep lingering entries - they - // will be cleaned up by the timer task which tolerates non-existing files - // If inserted again with the same IDs (via put()), the TTL will be updated - // again. + return runAsyncWithWriteLock( + () -> { + IOException exception = null; - deletedLocally = true; - } catch (IOException e) { - LOG.warn( - "Failed to locally delete BLOB storage directory at " - + jobDir.getAbsolutePath(), - e); - } + try { + internalLocalCleanup(jobId); + } catch (IOException e) { + exception = e; + } - // delete in HA blob store files - final boolean deletedHA = !cleanupBlobStoreFiles || blobStore.deleteAll(jobId); + if (!blobStore.deleteAll(jobId)) { + exception = + ExceptionUtils.firstOrSuppressed( + new IOException( + "Error while cleaning up the BlobStore for job " + + jobId), + exception); + } - return deletedLocally && deletedHA; - } finally { - readWriteLock.writeLock().unlock(); - } + if (exception != null) { + throw new IOException(exception); + } + }, + executor); + } + + private CompletableFuture<Void> runAsyncWithWriteLock( + ThrowingRunnable<IOException> runnable, Executor executor) { + return CompletableFuture.runAsync( + () -> { + readWriteLock.writeLock().lock(); + try { + runnable.run(); + } catch (IOException e) { + throw new CompletionException(e); + } finally { + readWriteLock.writeLock().unlock(); + } + }, + executor); } - public void retainJobs(Collection<JobID> jobsToRetain) throws IOException { + public void retainJobs(Collection<JobID> jobsToRetain, Executor ioExecutor) throws IOException { if (storageDir.deref().exists()) { final Set<JobID> jobsToRemove = BlobUtils.listExistingJobs(storageDir.deref().toPath()); jobsToRemove.removeAll(jobsToRetain); + final Collection<CompletableFuture<Void>> cleanupResultFutures = + new ArrayList<>(jobsToRemove.size()); for (JobID jobToRemove : jobsToRemove) { - cleanupJob(jobToRemove, true); + cleanupResultFutures.add(globalCleanupAsync(jobToRemove, ioExecutor)); + } + + try { + FutureUtils.completeAll(cleanupResultFutures).get(); + } catch (InterruptedException | ExecutionException e) { + ExceptionUtils.rethrowIOException(e); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index e6b7e8e..fecba4b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -201,7 +201,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher this.recoveredJobs = new HashSet<>(recoveredJobs); this.blobServer.retainJobs( - recoveredJobs.stream().map(JobGraph::getJobID).collect(Collectors.toSet())); + recoveredJobs.stream().map(JobGraph::getJobID).collect(Collectors.toSet()), + ioExecutor); this.dispatcherCachedOperationsHandler = new DispatcherCachedOperationsHandler( @@ -909,8 +910,25 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher log.warn( "Could not properly clean data for job {} stored by ha services", jobId, e); } + + try { + blobServer.globalCleanup(jobId); + } catch (Exception e) { + log.warn( + "Could not properly global clean data for job {} stored in the BlobServer.", + jobId, + e); + } + } else { + try { + blobServer.localCleanup(jobId); + } catch (IOException e) { + log.warn( + "Could not properly clean local data for job {} stored in the BlobServer.", + jobId, + e); + } } - blobServer.cleanupJob(jobId, jobGraphRemoved); } private void markJobAsClean(JobID jobId) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java index 42fa235..6940ed8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java @@ -27,6 +27,7 @@ import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.FutureUtils; +import org.hamcrest.collection.IsEmptyCollection; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -44,7 +45,6 @@ import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -317,14 +317,12 @@ public class BlobCacheCleanupTest extends TestLogger { } @Test - public void testTransientBlobNoJobCleanup() - throws IOException, InterruptedException, ExecutionException { + public void testTransientBlobNoJobCleanup() throws Exception { testTransientBlobCleanup(null); } @Test - public void testTransientBlobForJobCleanup() - throws IOException, InterruptedException, ExecutionException { + public void testTransientBlobForJobCleanup() throws Exception { testTransientBlobCleanup(new JobID()); } @@ -332,8 +330,7 @@ public class BlobCacheCleanupTest extends TestLogger { * Tests that {@link TransientBlobCache} cleans up after a default TTL and keeps files which are * constantly accessed. */ - private void testTransientBlobCleanup(@Nullable final JobID jobId) - throws IOException, InterruptedException, ExecutionException { + private void testTransientBlobCleanup(@Nullable final JobID jobId) throws Exception { // 1s should be a safe-enough buffer to still check for existence after a BLOB's last access long cleanupInterval = 1L; // in seconds @@ -351,6 +348,7 @@ public class BlobCacheCleanupTest extends TestLogger { long cleanupLowerBound; + final ExecutorService executorService = Executors.newSingleThreadExecutor(); try (BlobServer server = new BlobServer(config, temporaryFolder.newFolder(), new VoidBlobStore()); final BlobCacheService cache = @@ -390,7 +388,7 @@ public class BlobCacheCleanupTest extends TestLogger { // files are cached now for the given TTL - remove from server so that they are not // re-downloaded if (jobId != null) { - server.cleanupJob(jobId, true); + server.globalCleanupAsync(jobId, executorService).join(); } else { server.deleteFromCache(key1); server.deleteFromCache(key2); @@ -430,6 +428,8 @@ public class BlobCacheCleanupTest extends TestLogger { filesFuture.get(); verifyDeletedEventually(server, jobId, key1, key2); + } finally { + assertThat(executorService.shutdownNow(), IsEmptyCollection.empty()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java index f4beefd..f4fa398 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java @@ -220,7 +220,7 @@ public class BlobServerCleanupTest extends TestLogger { } @Test - public void testBlobServerRetainsJobs() throws IOException { + public void testBlobServerRetainsJobs() throws Exception { final File storageDirectory = temporaryFolder.newFolder(); final JobID jobId1 = new JobID(); @@ -232,13 +232,16 @@ public class BlobServerCleanupTest extends TestLogger { final PermanentBlobKey blobKey2 = TestingBlobUtils.writePermanentBlob(storageDirectory.toPath(), jobId2, fileContent); + final ExecutorService executorService = Executors.newSingleThreadExecutor(); try (final BlobServer blobServer = new BlobServer(new Configuration(), storageDirectory, new VoidBlobStore())) { - blobServer.retainJobs(Collections.singleton(jobId1)); + blobServer.retainJobs(Collections.singleton(jobId1), executorService); assertThat(blobServer.getFile(jobId1, blobKey1)).hasBinaryContent(fileContent); assertThatThrownBy(() -> blobServer.getFile(jobId2, blobKey2)) .isInstanceOf(NoSuchFileException.class); + } finally { + assertThat(executorService.shutdownNow()).isEmpty(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java index a87a2b1..91ad3ff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java @@ -26,6 +26,7 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.FutureUtils; +import org.hamcrest.collection.IsEmptyCollection; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -41,6 +42,7 @@ import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -51,6 +53,7 @@ import static org.apache.flink.runtime.blob.BlobServerCleanupTest.checkFileCount import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted; import static org.apache.flink.runtime.blob.BlobServerPutTest.put; import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -276,7 +279,8 @@ public class BlobServerDeleteTest extends TestLogger { } /** - * Tests that {@link BlobServer} cleans up after calling {@link BlobServer#cleanupJob}. + * Tests that {@link BlobServer} cleans up after calling {@link + * BlobServer#globalCleanupAsync(JobID, Executor)}. * * @param blobType whether the BLOB should become permanent or transient */ @@ -286,6 +290,7 @@ public class BlobServerDeleteTest extends TestLogger { Configuration config = new Configuration(); + final ExecutorService executorService = Executors.newSingleThreadExecutor(); try (BlobServer server = new BlobServer(config, temporaryFolder.newFolder(), new VoidBlobStore())) { @@ -308,7 +313,7 @@ public class BlobServerDeleteTest extends TestLogger { verifyContents(server, jobId2, key2, data); checkFileCountForJob(1, jobId2, server); - server.cleanupJob(jobId1, true); + server.globalCleanupAsync(jobId1, executorService).join(); verifyDeleted(server, jobId1, key1a); verifyDeleted(server, jobId1, key1b); @@ -316,14 +321,16 @@ public class BlobServerDeleteTest extends TestLogger { verifyContents(server, jobId2, key2, data); checkFileCountForJob(1, jobId2, server); - server.cleanupJob(jobId2, true); + server.globalCleanupAsync(jobId2, executorService).join(); checkFileCountForJob(0, jobId1, server); verifyDeleted(server, jobId2, key2); checkFileCountForJob(0, jobId2, server); // calling a second time should not fail - server.cleanupJob(jobId2, true); + server.globalCleanupAsync(jobId2, executorService).join(); + } finally { + assertThat(executorService.shutdownNow(), IsEmptyCollection.empty()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java index dfa2799..5f3e6c5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java @@ -25,6 +25,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.Executors; import org.junit.ClassRule; import org.junit.Test; @@ -89,7 +90,7 @@ public class BlobServerRecoveryTest extends TestLogger { */ public static void testBlobServerRecovery( final Configuration config, final BlobStore blobStore, final File blobStorage) - throws IOException { + throws Exception { final String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID); String storagePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + "/" + clusterId; @@ -141,8 +142,8 @@ public class BlobServerRecoveryTest extends TestLogger { verifyDeleted(cache1, jobId[0], nonHAKey); // Remove again - server1.cleanupJob(jobId[0], true); - server1.cleanupJob(jobId[1], true); + server1.globalCleanupAsync(jobId[0], Executors.directExecutor()).join(); + server1.globalCleanupAsync(jobId[1], Executors.directExecutor()).join(); // Verify everything is clean assertTrue("HA storage directory does not exist", fs.exists(new Path(storagePath))); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java index 4992c76..7e057f1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java @@ -87,9 +87,11 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.BiFunction; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -136,7 +138,8 @@ public class DispatcherResourceCleanupTest extends TestLogger { private CompletableFuture<BlobKey> storedHABlobFuture; private CompletableFuture<JobID> deleteAllHABlobsFuture; - private CompletableFuture<JobID> cleanupJobFuture; + private CompletableFuture<JobID> localCleanupFuture; + private CompletableFuture<JobID> globalCleanupFuture; private CompletableFuture<JobID> cleanupJobHADataFuture; private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE; @@ -169,14 +172,22 @@ public class DispatcherResourceCleanupTest extends TestLogger { .setDeleteAllFunction(deleteAllHABlobsFuture::complete) .createTestingBlobStore(); - cleanupJobFuture = new CompletableFuture<>(); + globalCleanupFuture = new CompletableFuture<>(); + localCleanupFuture = new CompletableFuture<>(); blobServer = new TestingBlobServer( configuration, temporaryFolder.newFolder(), testingBlobStore, - cleanupJobFuture); + (jobId, ignoredExecutor) -> { + globalCleanupFuture.complete(jobId); + return FutureUtils.completedVoidFuture(); + }, + (jobId, ignoredExecutor) -> { + localCleanupFuture.complete(jobId); + return FutureUtils.completedVoidFuture(); + }); } private TestingJobManagerRunnerFactory startDispatcherAndSubmitJob() throws Exception { @@ -254,8 +265,8 @@ public class DispatcherResourceCleanupTest extends TestLogger { } private void assertThatHABlobsHaveBeenRemoved() - throws InterruptedException, ExecutionException { - assertThat(cleanupJobFuture.get(), equalTo(jobId)); + throws InterruptedException, ExecutionException, TimeoutException { + assertGlobalCleanupTriggered(jobId); // verify that we also cleared the BlobStore assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId)); @@ -294,8 +305,7 @@ public class DispatcherResourceCleanupTest extends TestLogger { jobManagerRunnerFactory.takeCreatedJobManagerRunner(); suspendJob(testingJobManagerRunner); - assertThat(cleanupJobFuture.get(), equalTo(jobId)); - + assertLocalCleanupTriggered(jobId); assertThat(blobFile.exists(), is(false)); // verify that we did not clear the BlobStore @@ -331,8 +341,7 @@ public class DispatcherResourceCleanupTest extends TestLogger { dispatcher.closeAsync().get(); - assertThat(cleanupJobFuture.get(), equalTo(jobId)); - + assertLocalCleanupTriggered(jobId); assertThat(blobFile.exists(), is(false)); // verify that we did not clear the BlobStore @@ -375,7 +384,7 @@ public class DispatcherResourceCleanupTest extends TestLogger { // check that no exceptions have been thrown dispatcherTerminationFuture.get(); - assertThat(cleanupJobFuture.get(), is(jobId)); + assertGlobalCleanupTriggered(jobId); assertThat(deleteAllHABlobsFuture.get(), is(jobId)); } @@ -468,7 +477,7 @@ public class DispatcherResourceCleanupTest extends TestLogger { is(true)); } - assertThatHABlobsHaveNotBeenRemoved(); + assertThatNoCleanupWasTriggered(); } finally { finishJob(testingJobManagerRunnerFactoryNG.takeCreatedJobManagerRunner()); } @@ -519,8 +528,9 @@ public class DispatcherResourceCleanupTest extends TestLogger { .build())); } - private void assertThatHABlobsHaveNotBeenRemoved() { - assertThat(cleanupJobFuture.isDone(), is(false)); + private void assertThatNoCleanupWasTriggered() { + assertThat(globalCleanupFuture.isDone(), is(false)); + assertThat(localCleanupFuture.isDone(), is(false)); assertThat(deleteAllHABlobsFuture.isDone(), is(false)); assertThat(blobFile.exists(), is(true)); } @@ -640,7 +650,7 @@ public class DispatcherResourceCleanupTest extends TestLogger { jobManagerRunnerFactory.takeCreatedJobManagerRunner(); testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(executionGraph)); - assertThat(cleanupJobFuture.get(), equalTo(jobId)); + assertLocalCleanupTriggered(jobId); assertThat(deleteAllHABlobsFuture.isDone(), is(false)); } @@ -659,10 +669,22 @@ public class DispatcherResourceCleanupTest extends TestLogger { jobManagerRunnerFactory.takeCreatedJobManagerRunner(); testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(executionGraph)); - assertThat(cleanupJobFuture.get(), equalTo(jobId)); + assertGlobalCleanupTriggered(jobId); assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId)); } + private void assertLocalCleanupTriggered(JobID jobId) + throws ExecutionException, InterruptedException, TimeoutException { + assertThat(localCleanupFuture.get(100, TimeUnit.MILLISECONDS), equalTo(jobId)); + assertThat(globalCleanupFuture.isDone(), is(false)); + } + + private void assertGlobalCleanupTriggered(JobID jobId) + throws ExecutionException, InterruptedException, TimeoutException { + assertThat(localCleanupFuture.get(100, TimeUnit.MILLISECONDS), equalTo(jobId)); + assertThat(globalCleanupFuture.get(100, TimeUnit.MILLISECONDS), equalTo(jobId)); + } + @Test public void testFatalErrorIfJobCannotBeMarkedDirtyInJobResultStore() throws Exception { jobResultStore = @@ -738,14 +760,18 @@ public class DispatcherResourceCleanupTest extends TestLogger { private static final class TestingBlobServer extends BlobServer { - private final CompletableFuture<JobID> cleanupJobFuture; + private final BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupFunction; + private final BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupFunction; /** * Instantiates a new BLOB server and binds it to a free network port. * * @param config Configuration to be used to instantiate the BlobServer * @param blobStore BlobStore to store blobs persistently - * @param cleanupJobFuture + * @param globalCleanupFunction The function called along the actual {@link + * #globalCleanupAsync(JobID, Executor)} call. + * @param localCleanupFunction The function called along the actual {@link + * #localCleanupAsync(JobID, Executor)} call. * @throws IOException thrown if the BLOB server cannot bind to a free network port or if * the (local or distributed) file storage cannot be created or is not usable */ @@ -753,17 +779,24 @@ public class DispatcherResourceCleanupTest extends TestLogger { Configuration config, File storageDirectory, BlobStore blobStore, - CompletableFuture<JobID> cleanupJobFuture) + BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupFunction, + BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupFunction) throws IOException { super(config, storageDirectory, blobStore); - this.cleanupJobFuture = cleanupJobFuture; + this.globalCleanupFunction = globalCleanupFunction; + this.localCleanupFunction = localCleanupFunction; + } + + @Override + public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) { + return super.globalCleanupAsync(jobId, executor) + .thenCompose(ignored -> globalCleanupFunction.apply(jobId, executor)); } @Override - public boolean cleanupJob(JobID jobId, boolean cleanupBlobStoreFiles) { - final boolean result = super.cleanupJob(jobId, cleanupBlobStoreFiles); - cleanupJobFuture.complete(jobId); - return result; + public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor executor) { + return super.localCleanupAsync(jobId, executor) + .thenCompose(ignored -> localCleanupFunction.apply(jobId, executor)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java index b2e1b3d..efb43c3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.util.TestLogger; +import org.hamcrest.collection.IsEmptyCollection; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -41,7 +42,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -70,6 +74,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger { temporaryFolder.newFolder().getAbsolutePath()); config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3_600L); + final ExecutorService executorService = Executors.newSingleThreadExecutor(); try { blobStoreService = BlobUtils.createBlobStoreFromConfig(config); @@ -160,7 +165,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger { } // Remove blobs again - server[1].cleanupJob(jobId, true); + server[1].globalCleanupAsync(jobId, executorService).join(); // Verify everything is clean below recoveryDir/<cluster_id> final String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID); @@ -173,6 +178,8 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger { 0, recoveryFiles.length); } finally { + assertThat(executorService.shutdownNow(), IsEmptyCollection.empty()); + for (BlobLibraryCacheManager s : libServer) { if (s != null) { s.shutdown();
