This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9cac58227d2c28e7fb5262cc76b3f5b15515a73c Author: Matthias Pohl <[email protected]> AuthorDate: Wed Feb 2 00:20:55 2022 +0100 [FLINK-25954][runtime] Adds cleanup testcases to BlobServerCleanupTest --- .../flink/runtime/blob/BlobServerCleanupTest.java | 181 ++++++++++++++++++++- 1 file changed, 173 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java index 5b484b4..15cc61e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.util.function.TriConsumerWithException; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -41,12 +42,15 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Random; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -58,6 +62,7 @@ import static org.apache.flink.runtime.blob.BlobServerPutTest.put; import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; /** A few tests for the cleanup of transient BLOBs at the {@link BlobServer}. */ public class BlobServerCleanupTest extends TestLogger { @@ -73,6 +78,21 @@ public class BlobServerCleanupTest extends TestLogger { return randomData; } + private static BlobServer createTestInstance(String storageDirectoryPath, long cleanupInterval) + throws IOException { + return createTestInstance(storageDirectoryPath, cleanupInterval, new VoidBlobStore()); + } + + private static BlobServer createTestInstance( + String storageDirectoryPath, long cleanupInterval, BlobStore blobStore) + throws IOException { + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, storageDirectoryPath); + config.setLong(BlobServerOptions.CLEANUP_INTERVAL, cleanupInterval); + + return new BlobServer(config, new File(storageDirectoryPath), blobStore); + } + @Test public void testTransientBlobNoJobCleanup() throws IOException, InterruptedException, ExecutionException { @@ -102,13 +122,10 @@ public class BlobServerCleanupTest extends TestLogger { byte[] data = createRandomData(); byte[] data2 = createRandomData(); - Configuration config = new Configuration(); - config.setLong(BlobServerOptions.CLEANUP_INTERVAL, cleanupInterval); - long cleanupLowerBound; try (BlobServer server = - new BlobServer(config, temporaryFolder, new VoidBlobStore())) { + createTestInstance(temporaryFolder.getAbsolutePath(), cleanupInterval)) { ConcurrentMap<Tuple2<JobID, TransientBlobKey>, Long> transientBlobExpiryTimes = server.getBlobExpiryTimes(); @@ -195,6 +212,154 @@ public class BlobServerCleanupTest extends TestLogger { } @Test + public void testLocalCleanup() throws Exception { + final TestingBlobStore blobStore = + createTestingBlobStoreBuilder() + .setDeleteAllFunction( + jobDataToDelete -> + fail( + "No deleteAll call is expected to be triggered but was for %s.", + jobDataToDelete)) + .createTestingBlobStore(); + testSuccessfulCleanup( + new JobID(), + (testInstance, jobId, executor) -> + testInstance.localCleanupAsync(jobId, executor).join(), + blobStore); + } + + @Test + public void testGlobalCleanup() throws Exception { + final Set<JobID> actuallyDeletedJobData = new HashSet<>(); + final JobID jobId = new JobID(); + final TestingBlobStore blobStore = + createTestingBlobStoreBuilder() + .setDeleteAllFunction( + jobDataToDelete -> { + actuallyDeletedJobData.add(jobDataToDelete); + return true; + }) + .createTestingBlobStore(); + testSuccessfulCleanup( + jobId, + (testInstance, jobIdForCleanup, executor) -> + testInstance.globalCleanupAsync(jobIdForCleanup, executor).join(), + blobStore); + + assertThat(actuallyDeletedJobData).containsExactlyInAnyOrder(jobId); + } + + @Test + public void testGlobalCleanupUnsuccessfulInBlobStore() throws Exception { + final TestingBlobStore blobStore = + createTestingBlobStoreBuilder() + .setDeleteAllFunction(jobDataToDelete -> false) + .createTestingBlobStore(); + + testFailedCleanup( + new JobID(), + (testInstance, jobId, executor) -> + assertThat(testInstance.globalCleanupAsync(new JobID(), executor)) + .failsWithin(Duration.ofMillis(100)) + .withThrowableOfType(ExecutionException.class) + .withCauseInstanceOf(IOException.class), + blobStore); + } + + @Test + public void testGlobalCleanupFailureInBlobStore() throws Exception { + final RuntimeException actualException = new RuntimeException("Expected RuntimeException"); + final TestingBlobStore blobStore = + createTestingBlobStoreBuilder() + .setDeleteAllFunction( + jobDataToDelete -> { + throw actualException; + }) + .createTestingBlobStore(); + + testFailedCleanup( + new JobID(), + (testInstance, jobId, executor) -> + assertThat(testInstance.globalCleanupAsync(new JobID(), executor)) + .failsWithin(Duration.ofMillis(100)) + .withThrowableOfType(ExecutionException.class) + .withCause(actualException), + blobStore); + } + + private TestingBlobStoreBuilder createTestingBlobStoreBuilder() { + return new TestingBlobStoreBuilder() + .setDeleteFunction( + (jobId, blobKey) -> { + throw new UnsupportedOperationException( + "Deletion of individual blobs is not supported."); + }); + } + + private void testFailedCleanup( + JobID jobId, + TriConsumerWithException<BlobServer, JobID, Executor, ? extends Exception> callback, + BlobStore blobStore) + throws Exception { + testCleanup(jobId, callback, blobStore, 2); + } + + private void testSuccessfulCleanup( + JobID jobId, + TriConsumerWithException<BlobServer, JobID, Executor, ? extends Exception> callback, + BlobStore blobStore) + throws Exception { + testCleanup(jobId, callback, blobStore, 0); + } + + private void testCleanup( + JobID jobId, + TriConsumerWithException<BlobServer, JobID, Executor, ? extends Exception> callback, + BlobStore blobStore, + int expectedFileCountAfterCleanup) + throws Exception { + final JobID otherJobId = new JobID(); + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + try (BlobServer testInstance = + createTestInstance( + temporaryFolder.getAbsolutePath(), Integer.MAX_VALUE, blobStore)) { + testInstance.start(); + + final BlobKey transientDataBlobKey = + put(testInstance, jobId, createRandomData(), TRANSIENT_BLOB); + final BlobKey otherTransientDataBlobKey = + put(testInstance, otherJobId, createRandomData(), TRANSIENT_BLOB); + + final BlobKey permanentDataBlobKey = + put(testInstance, jobId, createRandomData(), PERMANENT_BLOB); + final BlobKey otherPermanentDataBlobKey = + put(testInstance, otherJobId, createRandomData(), PERMANENT_BLOB); + + checkFilesExist( + jobId, + Arrays.asList(transientDataBlobKey, permanentDataBlobKey), + testInstance, + true); + checkFilesExist( + otherJobId, + Arrays.asList(otherTransientDataBlobKey, otherPermanentDataBlobKey), + testInstance, + true); + + callback.accept(testInstance, jobId, executorService); + + checkFileCountForJob(expectedFileCountAfterCleanup, jobId, testInstance); + checkFilesExist( + otherJobId, + Arrays.asList(otherTransientDataBlobKey, otherPermanentDataBlobKey), + testInstance, + true); + } finally { + assertThat(executorService.shutdownNow()).isEmpty(); + } + } + + @Test public void testBlobServerExpiresRecoveredTransientJobBlob() throws Exception { runBlobServerExpiresRecoveredTransientBlob(new JobID()); } @@ -207,8 +372,6 @@ public class BlobServerCleanupTest extends TestLogger { private void runBlobServerExpiresRecoveredTransientBlob(@Nullable JobID jobId) throws Exception { final long cleanupInterval = 1L; - final Configuration configuration = new Configuration(); - configuration.set(BlobServerOptions.CLEANUP_INTERVAL, cleanupInterval); final TransientBlobKey transientBlobKey = TestingBlobUtils.writeTransientBlob( @@ -216,7 +379,7 @@ public class BlobServerCleanupTest extends TestLogger { final File blob = BlobUtils.getStorageLocation(temporaryFolder, jobId, transientBlobKey); try (final BlobServer blobServer = - new BlobServer(configuration, temporaryFolder, new VoidBlobStore())) { + createTestInstance(temporaryFolder.getAbsolutePath(), cleanupInterval)) { CommonTestUtils.waitUntilCondition( () -> !blob.exists(), Deadline.fromNow(Duration.ofSeconds(cleanupInterval * 5L)), @@ -237,7 +400,9 @@ public class BlobServerCleanupTest extends TestLogger { final ExecutorService executorService = Executors.newSingleThreadExecutor(); try (final BlobServer blobServer = - new BlobServer(new Configuration(), temporaryFolder, new VoidBlobStore())) { + createTestInstance( + temporaryFolder.getAbsolutePath(), + BlobServerOptions.CLEANUP_INTERVAL.defaultValue())) { blobServer.retainJobs(Collections.singleton(jobId1), executorService); assertThat(blobServer.getFile(jobId1, blobKey1)).hasBinaryContent(fileContent);
