This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 13779ab8e4f5539ca311d9f233d031d818af6450
Author: Zhanghao Chen <[email protected]>
AuthorDate: Sat Jan 14 22:06:37 2023 +0800

    [FLINK-30513][runtime] Cleanup HA storage path on cluster termination
    
    This closes #21673
---
 .../highavailability/AbstractHaServices.java       | 33 +++++++++++++++++-----
 .../highavailability/AbstractHaServicesTest.java   | 30 +++++++++++++-------
 .../ZooKeeperLeaderRetrievalTest.java              |  5 ++++
 3 files changed, 51 insertions(+), 17 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
index feb8c2f0e87..cbc48a268f7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobStore;
 import org.apache.flink.runtime.blob.BlobStoreService;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -189,16 +191,26 @@ public abstract class AbstractHaServices implements 
HighAvailabilityServices {
             exception = ExceptionUtils.firstOrSuppressed(t, exception);
         }
 
-        try {
-            if (deletedHAData) {
+        if (deletedHAData) {
+            try {
                 blobStoreService.closeAndCleanupAllData();
-            } else {
-                logger.info(
-                        "Cannot delete HA blobs because we failed to delete 
the pointers in the HA store.");
+            } catch (Throwable t) {
+                exception = ExceptionUtils.firstOrSuppressed(t, exception);
+            }
+
+            try {
+                cleanupClusterHaStoragePath();
+            } catch (Throwable t) {
+                exception = ExceptionUtils.firstOrSuppressed(t, exception);
+            }
+        } else {
+            logger.info(
+                    "Cannot delete HA blobs because we failed to delete the 
pointers in the HA store.");
+            try {
                 blobStoreService.close();
+            } catch (Throwable t) {
+                exception = ExceptionUtils.firstOrSuppressed(t, exception);
             }
-        } catch (Throwable t) {
-            exception = ExceptionUtils.firstOrSuppressed(t, exception);
         }
 
         if (exception != null) {
@@ -225,6 +237,13 @@ public abstract class AbstractHaServices implements 
HighAvailabilityServices {
                 executor);
     }
 
+    protected void cleanupClusterHaStoragePath() throws Exception {
+        final Path clusterHaStoragePath =
+                
HighAvailabilityServicesUtils.getClusterHighAvailableStoragePath(configuration);
+        final FileSystem fileSystem = clusterHaStoragePath.getFileSystem();
+        fileSystem.delete(clusterHaStoragePath, true);
+    }
+
     private LeaderElectionService createLeaderElectionService(String 
leaderName) {
         return new 
DefaultLeaderElectionService(createLeaderElectionDriverFactory(leaderName));
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
index 90a642f325d..c5dbf1bb0d1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
@@ -48,11 +48,12 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 class AbstractHaServicesTest {
 
     /**
-     * Tests that we first delete all pointers from the HA services before 
deleting the blobs. See
-     * FLINK-22014 for more details.
+     * Tests that we first delete all pointers from the HA services before 
deleting the blobs and HA
+     * storage path. See FLINK-22014 for more details.
      */
     @Test
-    void testCloseAndCleanupAllDataDeletesBlobsAfterCleaningUpHAData() throws 
Exception {
+    void 
testCloseAndCleanupAllDataDeletesBlobsAndHaStoragePathAfterCleaningUpHAData()
+            throws Exception {
         final Queue<CloseOperations> closeOperations = new ArrayDeque<>(3);
 
         final TestingBlobStoreService testingBlobStoreService =
@@ -69,18 +70,20 @@ class AbstractHaServicesTest {
 
         haServices.closeAndCleanupAllData();
 
-        assertThat(closeOperations).contains(
-                CloseOperations.HA_CLEANUP,
-                CloseOperations.HA_CLOSE,
-                CloseOperations.BLOB_CLEANUP_AND_CLOSE);
+        assertThat(closeOperations)
+                .contains(
+                        CloseOperations.HA_CLEANUP,
+                        CloseOperations.HA_CLOSE,
+                        CloseOperations.BLOB_CLEANUP_AND_CLOSE,
+                        CloseOperations.HA_STORAGE_PATH_CLEANUP);
     }
 
     /**
-     * Tests that we don't delete the HA blobs if we could not clean up the 
pointers from the HA
-     * services. See FLINK-22014 for more details.
+     * Tests that we don't delete the HA blobs and HA storage path if we could 
not clean up the
+     * pointers from the HA services. See FLINK-22014 for more details.
      */
     @Test
-    void testCloseAndCleanupAllDataDoesNotDeleteBlobsIfCleaningUpHADataFails() 
{
+    void 
testCloseAndCleanupAllDataDoesNotDeleteBlobsAndHaStoragePathIfCleaningUpHADataFails()
 {
         final Queue<CloseOperations> closeOperations = new ArrayDeque<>(3);
 
         final TestingBlobStoreService testingBlobStoreService =
@@ -99,6 +102,7 @@ class AbstractHaServicesTest {
 
         
assertThatThrownBy(haServices::closeAndCleanupAllData).isInstanceOf(FlinkException.class);
         assertThat(closeOperations).contains(CloseOperations.HA_CLOSE, 
CloseOperations.BLOB_CLOSE);
+        
assertThat(closeOperations).doesNotContain(CloseOperations.HA_STORAGE_PATH_CLEANUP);
     }
 
     @Test
@@ -129,6 +133,7 @@ class AbstractHaServicesTest {
         HA_CLOSE,
         BLOB_CLEANUP_AND_CLOSE,
         BLOB_CLOSE,
+        HA_STORAGE_PATH_CLEANUP,
     }
 
     private static final class TestingBlobStoreService implements 
BlobStoreService {
@@ -234,6 +239,11 @@ class AbstractHaServicesTest {
             internalJobCleanupConsumer.accept(jobID);
         }
 
+        @Override
+        protected void cleanupClusterHaStoragePath() {
+            closeOperations.offer(CloseOperations.HA_STORAGE_PATH_CLEANUP);
+        }
+
         @Override
         protected String getLeaderPathForResourceManager() {
             throw new UnsupportedOperationException("Not supported by this 
test implementation.");
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
index 78f159cb364..a3014c08d8c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
@@ -40,6 +40,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -48,6 +49,7 @@ import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
+import java.nio.file.Path;
 import java.time.Duration;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -66,6 +68,8 @@ class ZooKeeperLeaderRetrievalTest {
     private final TestingFatalErrorHandlerExtension 
testingFatalErrorHandlerResource =
             new TestingFatalErrorHandlerExtension();
 
+    @TempDir private Path tempDir;
+
     private TestingServer testingServer;
 
     private Configuration config;
@@ -80,6 +84,7 @@ class ZooKeeperLeaderRetrievalTest {
         config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
         config.setString(
                 HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
testingServer.getConnectString());
+        config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
tempDir.toString());
 
         highAvailabilityServices =
                 new ZooKeeperMultipleComponentLeaderElectionHaServices(

Reply via email to