This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 13779ab8e4f5539ca311d9f233d031d818af6450 Author: Zhanghao Chen <[email protected]> AuthorDate: Sat Jan 14 22:06:37 2023 +0800 [FLINK-30513][runtime] Cleanup HA storage path on cluster termination This closes #21673 --- .../highavailability/AbstractHaServices.java | 33 +++++++++++++++++----- .../highavailability/AbstractHaServicesTest.java | 30 +++++++++++++------- .../ZooKeeperLeaderRetrievalTest.java | 5 ++++ 3 files changed, 51 insertions(+), 17 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java index feb8c2f0e87..cbc48a268f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java @@ -20,6 +20,8 @@ package org.apache.flink.runtime.highavailability; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.blob.BlobStore; import org.apache.flink.runtime.blob.BlobStoreService; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; @@ -189,16 +191,26 @@ public abstract class AbstractHaServices implements HighAvailabilityServices { exception = ExceptionUtils.firstOrSuppressed(t, exception); } - try { - if (deletedHAData) { + if (deletedHAData) { + try { blobStoreService.closeAndCleanupAllData(); - } else { - logger.info( - "Cannot delete HA blobs because we failed to delete the pointers in the HA store."); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + + try { + cleanupClusterHaStoragePath(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } else { + logger.info( + "Cannot delete HA blobs because we failed to delete the pointers in the HA store."); + try { blobStoreService.close(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); } - } catch (Throwable t) { - exception = ExceptionUtils.firstOrSuppressed(t, exception); } if (exception != null) { @@ -225,6 +237,13 @@ public abstract class AbstractHaServices implements HighAvailabilityServices { executor); } + protected void cleanupClusterHaStoragePath() throws Exception { + final Path clusterHaStoragePath = + HighAvailabilityServicesUtils.getClusterHighAvailableStoragePath(configuration); + final FileSystem fileSystem = clusterHaStoragePath.getFileSystem(); + fileSystem.delete(clusterHaStoragePath, true); + } + private LeaderElectionService createLeaderElectionService(String leaderName) { return new DefaultLeaderElectionService(createLeaderElectionDriverFactory(leaderName)); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java index 90a642f325d..c5dbf1bb0d1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java @@ -48,11 +48,12 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; class AbstractHaServicesTest { /** - * Tests that we first delete all pointers from the HA services before deleting the blobs. See - * FLINK-22014 for more details. + * Tests that we first delete all pointers from the HA services before deleting the blobs and HA + * storage path. See FLINK-22014 for more details. */ @Test - void testCloseAndCleanupAllDataDeletesBlobsAfterCleaningUpHAData() throws Exception { + void testCloseAndCleanupAllDataDeletesBlobsAndHaStoragePathAfterCleaningUpHAData() + throws Exception { final Queue<CloseOperations> closeOperations = new ArrayDeque<>(3); final TestingBlobStoreService testingBlobStoreService = @@ -69,18 +70,20 @@ class AbstractHaServicesTest { haServices.closeAndCleanupAllData(); - assertThat(closeOperations).contains( - CloseOperations.HA_CLEANUP, - CloseOperations.HA_CLOSE, - CloseOperations.BLOB_CLEANUP_AND_CLOSE); + assertThat(closeOperations) + .contains( + CloseOperations.HA_CLEANUP, + CloseOperations.HA_CLOSE, + CloseOperations.BLOB_CLEANUP_AND_CLOSE, + CloseOperations.HA_STORAGE_PATH_CLEANUP); } /** - * Tests that we don't delete the HA blobs if we could not clean up the pointers from the HA - * services. See FLINK-22014 for more details. + * Tests that we don't delete the HA blobs and HA storage path if we could not clean up the + * pointers from the HA services. See FLINK-22014 for more details. */ @Test - void testCloseAndCleanupAllDataDoesNotDeleteBlobsIfCleaningUpHADataFails() { + void testCloseAndCleanupAllDataDoesNotDeleteBlobsAndHaStoragePathIfCleaningUpHADataFails() { final Queue<CloseOperations> closeOperations = new ArrayDeque<>(3); final TestingBlobStoreService testingBlobStoreService = @@ -99,6 +102,7 @@ class AbstractHaServicesTest { assertThatThrownBy(haServices::closeAndCleanupAllData).isInstanceOf(FlinkException.class); assertThat(closeOperations).contains(CloseOperations.HA_CLOSE, CloseOperations.BLOB_CLOSE); + assertThat(closeOperations).doesNotContain(CloseOperations.HA_STORAGE_PATH_CLEANUP); } @Test @@ -129,6 +133,7 @@ class AbstractHaServicesTest { HA_CLOSE, BLOB_CLEANUP_AND_CLOSE, BLOB_CLOSE, + HA_STORAGE_PATH_CLEANUP, } private static final class TestingBlobStoreService implements BlobStoreService { @@ -234,6 +239,11 @@ class AbstractHaServicesTest { internalJobCleanupConsumer.accept(jobID); } + @Override + protected void cleanupClusterHaStoragePath() { + closeOperations.offer(CloseOperations.HA_STORAGE_PATH_CLEANUP); + } + @Override protected String getLeaderPathForResourceManager() { throw new UnsupportedOperationException("Not supported by this test implementation."); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java index 78f159cb364..a3014c08d8c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java @@ -40,6 +40,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; import java.io.IOException; import java.net.InetAddress; @@ -48,6 +49,7 @@ import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; import java.net.UnknownHostException; +import java.nio.file.Path; import java.time.Duration; import java.util.concurrent.ScheduledExecutorService; @@ -66,6 +68,8 @@ class ZooKeeperLeaderRetrievalTest { private final TestingFatalErrorHandlerExtension testingFatalErrorHandlerResource = new TestingFatalErrorHandlerExtension(); + @TempDir private Path tempDir; + private TestingServer testingServer; private Configuration config; @@ -80,6 +84,7 @@ class ZooKeeperLeaderRetrievalTest { config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); config.setString( HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, tempDir.toString()); highAvailabilityServices = new ZooKeeperMultipleComponentLeaderElectionHaServices(
