This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 593cc139ab3 Revert "[FLINK-30513][runtime] Cleanup HA storage path on 
cluster termination"
593cc139ab3 is described below

commit 593cc139ab30bb81ab38d94b7b697d00eaaecada
Author: Weijie Guo <[email protected]>
AuthorDate: Fri Mar 10 02:13:36 2023 +0800

    Revert "[FLINK-30513][runtime] Cleanup HA storage path on cluster 
termination"
    
    This reverts commit 13779ab8e4f5539ca311d9f233d031d818af6450.
---
 .../highavailability/AbstractHaServices.java       | 33 +++++-----------------
 .../highavailability/AbstractHaServicesTest.java   | 30 +++++++-------------
 .../ZooKeeperLeaderRetrievalTest.java              |  5 ----
 3 files changed, 17 insertions(+), 51 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
index cbc48a268f7..feb8c2f0e87 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
@@ -20,8 +20,6 @@ package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobStore;
 import org.apache.flink.runtime.blob.BlobStoreService;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -191,26 +189,16 @@ public abstract class AbstractHaServices implements 
HighAvailabilityServices {
             exception = ExceptionUtils.firstOrSuppressed(t, exception);
         }
 
-        if (deletedHAData) {
-            try {
+        try {
+            if (deletedHAData) {
                 blobStoreService.closeAndCleanupAllData();
-            } catch (Throwable t) {
-                exception = ExceptionUtils.firstOrSuppressed(t, exception);
-            }
-
-            try {
-                cleanupClusterHaStoragePath();
-            } catch (Throwable t) {
-                exception = ExceptionUtils.firstOrSuppressed(t, exception);
-            }
-        } else {
-            logger.info(
-                    "Cannot delete HA blobs because we failed to delete the 
pointers in the HA store.");
-            try {
+            } else {
+                logger.info(
+                        "Cannot delete HA blobs because we failed to delete 
the pointers in the HA store.");
                 blobStoreService.close();
-            } catch (Throwable t) {
-                exception = ExceptionUtils.firstOrSuppressed(t, exception);
             }
+        } catch (Throwable t) {
+            exception = ExceptionUtils.firstOrSuppressed(t, exception);
         }
 
         if (exception != null) {
@@ -237,13 +225,6 @@ public abstract class AbstractHaServices implements 
HighAvailabilityServices {
                 executor);
     }
 
-    protected void cleanupClusterHaStoragePath() throws Exception {
-        final Path clusterHaStoragePath =
-                
HighAvailabilityServicesUtils.getClusterHighAvailableStoragePath(configuration);
-        final FileSystem fileSystem = clusterHaStoragePath.getFileSystem();
-        fileSystem.delete(clusterHaStoragePath, true);
-    }
-
     private LeaderElectionService createLeaderElectionService(String 
leaderName) {
         return new 
DefaultLeaderElectionService(createLeaderElectionDriverFactory(leaderName));
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
index c5dbf1bb0d1..90a642f325d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
@@ -48,12 +48,11 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 class AbstractHaServicesTest {
 
     /**
-     * Tests that we first delete all pointers from the HA services before 
deleting the blobs and HA
-     * storage path. See FLINK-22014 for more details.
+     * Tests that we first delete all pointers from the HA services before 
deleting the blobs. See
+     * FLINK-22014 for more details.
      */
     @Test
-    void 
testCloseAndCleanupAllDataDeletesBlobsAndHaStoragePathAfterCleaningUpHAData()
-            throws Exception {
+    void testCloseAndCleanupAllDataDeletesBlobsAfterCleaningUpHAData() throws 
Exception {
         final Queue<CloseOperations> closeOperations = new ArrayDeque<>(3);
 
         final TestingBlobStoreService testingBlobStoreService =
@@ -70,20 +69,18 @@ class AbstractHaServicesTest {
 
         haServices.closeAndCleanupAllData();
 
-        assertThat(closeOperations)
-                .contains(
-                        CloseOperations.HA_CLEANUP,
-                        CloseOperations.HA_CLOSE,
-                        CloseOperations.BLOB_CLEANUP_AND_CLOSE,
-                        CloseOperations.HA_STORAGE_PATH_CLEANUP);
+        assertThat(closeOperations).contains(
+                CloseOperations.HA_CLEANUP,
+                CloseOperations.HA_CLOSE,
+                CloseOperations.BLOB_CLEANUP_AND_CLOSE);
     }
 
     /**
-     * Tests that we don't delete the HA blobs and HA storage path if we could 
not clean up the
-     * pointers from the HA services. See FLINK-22014 for more details.
+     * Tests that we don't delete the HA blobs if we could not clean up the 
pointers from the HA
+     * services. See FLINK-22014 for more details.
      */
     @Test
-    void 
testCloseAndCleanupAllDataDoesNotDeleteBlobsAndHaStoragePathIfCleaningUpHADataFails()
 {
+    void testCloseAndCleanupAllDataDoesNotDeleteBlobsIfCleaningUpHADataFails() 
{
         final Queue<CloseOperations> closeOperations = new ArrayDeque<>(3);
 
         final TestingBlobStoreService testingBlobStoreService =
@@ -102,7 +99,6 @@ class AbstractHaServicesTest {
 
         
assertThatThrownBy(haServices::closeAndCleanupAllData).isInstanceOf(FlinkException.class);
         assertThat(closeOperations).contains(CloseOperations.HA_CLOSE, 
CloseOperations.BLOB_CLOSE);
-        
assertThat(closeOperations).doesNotContain(CloseOperations.HA_STORAGE_PATH_CLEANUP);
     }
 
     @Test
@@ -133,7 +129,6 @@ class AbstractHaServicesTest {
         HA_CLOSE,
         BLOB_CLEANUP_AND_CLOSE,
         BLOB_CLOSE,
-        HA_STORAGE_PATH_CLEANUP,
     }
 
     private static final class TestingBlobStoreService implements 
BlobStoreService {
@@ -239,11 +234,6 @@ class AbstractHaServicesTest {
             internalJobCleanupConsumer.accept(jobID);
         }
 
-        @Override
-        protected void cleanupClusterHaStoragePath() {
-            closeOperations.offer(CloseOperations.HA_STORAGE_PATH_CLEANUP);
-        }
-
         @Override
         protected String getLeaderPathForResourceManager() {
             throw new UnsupportedOperationException("Not supported by this 
test implementation.");
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
index a3014c08d8c..78f159cb364 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
@@ -40,7 +40,6 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -49,7 +48,6 @@ import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
-import java.nio.file.Path;
 import java.time.Duration;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -68,8 +66,6 @@ class ZooKeeperLeaderRetrievalTest {
     private final TestingFatalErrorHandlerExtension 
testingFatalErrorHandlerResource =
             new TestingFatalErrorHandlerExtension();
 
-    @TempDir private Path tempDir;
-
     private TestingServer testingServer;
 
     private Configuration config;
@@ -84,7 +80,6 @@ class ZooKeeperLeaderRetrievalTest {
         config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
         config.setString(
                 HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
testingServer.getConnectString());
-        config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
tempDir.toString());
 
         highAvailabilityServices =
                 new ZooKeeperMultipleComponentLeaderElectionHaServices(

Reply via email to