This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cd95b560d0c [FLINK-32445][runtime] Refactors the 
closeAndCleanupAllData methods in BlobStoreService and HighAvailabilityServices 
(#23424)
cd95b560d0c is described below

commit cd95b560d0c11a64b42bf6b98107314d32a4de86
Author: Jiabao Sun <[email protected]>
AuthorDate: Wed Oct 4 23:16:38 2023 -0500

    [FLINK-32445][runtime] Refactors the closeAndCleanupAllData methods in 
BlobStoreService and HighAvailabilityServices (#23424)
---
 .../java/org/apache/flink/hdfstests/HDFSTest.java  | 12 ++++---
 .../flink/runtime/blob/BlobStoreService.java       |  4 +--
 .../flink/runtime/blob/FileSystemBlobStore.java    |  2 +-
 .../apache/flink/runtime/blob/VoidBlobStore.java   |  2 +-
 .../runtime/entrypoint/ClusterEntrypoint.java      |  6 +---
 .../highavailability/AbstractHaServices.java       | 40 +++++-----------------
 .../highavailability/HighAvailabilityServices.java | 36 +++++++++++++++----
 .../nonha/AbstractNonHaServices.java               |  5 ++-
 .../flink/runtime/minicluster/MiniCluster.java     | 10 +-----
 .../runtime/blob/BlobCacheCorruptionTest.java      |  3 +-
 .../flink/runtime/blob/BlobCacheRecoveryTest.java  |  3 +-
 .../flink/runtime/blob/BlobCacheRetriesTest.java   |  6 ++--
 .../flink/runtime/blob/BlobCacheSuccessTest.java   |  3 +-
 .../runtime/blob/BlobServerCorruptionTest.java     |  3 +-
 .../flink/runtime/blob/BlobServerGetTest.java      |  6 ++--
 .../flink/runtime/blob/BlobServerRecoveryTest.java |  3 +-
 .../runtime/dispatcher/AbstractDispatcherTest.java |  2 +-
 .../runtime/entrypoint/ClusterEntrypointTest.java  | 12 +++----
 .../BlobLibraryCacheRecoveryITCase.java            |  3 +-
 .../highavailability/AbstractHaServicesTest.java   | 13 +++----
 .../TestingHighAvailabilityServices.java           | 11 +++---
 .../TestingHighAvailabilityServicesBuilder.java    | 11 +++---
 .../nonha/embedded/EmbeddedHaServicesTest.java     |  2 +-
 .../nonha/standalone/StandaloneHaServicesTest.java |  2 +-
 .../ZooKeeperLeaderRetrievalTest.java              |  3 +-
 .../resourcemanager/ResourceManagerTest.java       |  2 +-
 .../TaskManagerRunnerConfigurationTest.java        |  8 ++---
 .../taskexecutor/TaskManagerRunnerStartupTest.java |  2 +-
 .../JobManagerHAProcessFailureRecoveryITCase.java  |  2 +-
 .../recovery/ProcessFailureCancelingITCase.java    |  2 +-
 30 files changed, 110 insertions(+), 109 deletions(-)

diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
index ffd0d37297d..04e1196b5ca 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
@@ -215,7 +215,8 @@ public class HDFSTest {
             TestingBlobHelpers.testBlobServerRecovery(
                     config, blobStoreService, temporaryFolder.newFolder());
         } finally {
-            blobStoreService.closeAndCleanupAllData();
+            blobStoreService.cleanupAllData();
+            blobStoreService.close();
         }
     }
 
@@ -238,7 +239,8 @@ public class HDFSTest {
             TestingBlobHelpers.testGetFailsFromCorruptFile(
                     config, blobStoreService, temporaryFolder.newFolder());
         } finally {
-            blobStoreService.closeAndCleanupAllData();
+            blobStoreService.cleanupAllData();
+            blobStoreService.close();
         }
     }
 
@@ -261,7 +263,8 @@ public class HDFSTest {
             TestingBlobHelpers.testBlobCacheRecovery(
                     config, blobStoreService, temporaryFolder.newFolder());
         } finally {
-            blobStoreService.closeAndCleanupAllData();
+            blobStoreService.cleanupAllData();
+            blobStoreService.close();
         }
     }
 
@@ -284,7 +287,8 @@ public class HDFSTest {
             TestingBlobHelpers.testGetFailsFromCorruptFile(
                     new JobID(), config, blobStoreService, 
temporaryFolder.newFolder());
         } finally {
-            blobStoreService.closeAndCleanupAllData();
+            blobStoreService.cleanupAllData();
+            blobStoreService.close();
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java
index 805d863c3cc..23b203892a1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java
@@ -23,6 +23,6 @@ import java.io.Closeable;
 /** Service interface for the BlobStore which allows to close and clean up its 
data. */
 public interface BlobStoreService extends BlobStore, Closeable {
 
-    /** Closes and cleans up the store. This entails the deletion of all 
blobs. */
-    void closeAndCleanupAllData();
+    /** Cleans up the store. This entails the deletion of all blobs. */
+    void cleanupAllData();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index 141f270e2f8..318309c1ad6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -186,7 +186,7 @@ public class FileSystemBlobStore implements 
BlobStoreService {
     }
 
     @Override
-    public void closeAndCleanupAllData() {
+    public void cleanupAllData() {
         try {
             LOG.debug("Cleaning up {}.", basePath);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
index ffeeffea9e4..c9822cd8dbb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
@@ -47,7 +47,7 @@ public class VoidBlobStore implements BlobStoreService {
     }
 
     @Override
-    public void closeAndCleanupAllData() {}
+    public void cleanupAllData() {}
 
     @Override
     public void close() throws IOException {}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 8bbe80877e8..df91a323141 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -499,11 +499,7 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
 
             if (haServices != null) {
                 try {
-                    if (cleanupHaData) {
-                        haServices.closeAndCleanupAllData();
-                    } else {
-                        haServices.close();
-                    }
+                    haServices.closeWithOptionalClean(cleanupHaData);
                 } catch (Throwable t) {
                     exception = ExceptionUtils.firstOrSuppressed(t, exception);
                 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
index bcca7f3e866..aecd38ed29a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
@@ -47,8 +47,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * #getLeaderPathForRestServer}. The returned leader name is the ConfigMap 
name in Kubernetes and
  * child path in Zookeeper.
  *
- * <p>{@link #close()} and {@link #closeAndCleanupAllData()} should be 
implemented to destroy the
- * resources.
+ * <p>{@link #close()} and {@link #cleanupAllData()} should be implemented to 
destroy the resources.
  *
  * <p>The abstract class is also responsible for determining which component 
service should be
  * reused. For example, {@link #jobResultStore} is created once and could be 
reused many times.
@@ -182,8 +181,8 @@ public abstract class AbstractHaServices implements 
HighAvailabilityServices {
     }
 
     @Override
-    public void closeAndCleanupAllData() throws Exception {
-        logger.info("Close and clean up all data for {}.", 
getClass().getSimpleName());
+    public void cleanupAllData() throws Exception {
+        logger.info("Clean up all data for {}.", getClass().getSimpleName());
 
         Throwable exception = null;
 
@@ -192,40 +191,20 @@ public abstract class AbstractHaServices implements 
HighAvailabilityServices {
         try {
             internalCleanup();
             deletedHAData = true;
+            blobStoreService.cleanupAllData();
         } catch (Exception t) {
             exception = t;
         }
 
-        try {
-            if (leaderElectionService != null) {
-                leaderElectionService.close();
-            }
-        } catch (Throwable t) {
-            exception = ExceptionUtils.firstOrSuppressed(t, exception);
-        }
-
-        try {
-            internalClose();
-        } catch (Throwable t) {
-            exception = ExceptionUtils.firstOrSuppressed(t, exception);
-        }
-
-        try {
-            if (deletedHAData) {
-                blobStoreService.closeAndCleanupAllData();
-            } else {
-                logger.info(
-                        "Cannot delete HA blobs because we failed to delete 
the pointers in the HA store.");
-                blobStoreService.close();
-            }
-        } catch (Throwable t) {
-            exception = ExceptionUtils.firstOrSuppressed(t, exception);
+        if (!deletedHAData) {
+            logger.info(
+                    "Cannot delete HA blobs because we failed to delete the 
pointers in the HA store.");
         }
 
         if (exception != null) {
             ExceptionUtils.rethrowException(
                     exception,
-                    "Could not properly close and clean up all data of high 
availability service.");
+                    "Could not properly clean up all data of high availability 
service.");
         }
         logger.info("Finished cleaning up the high availability data.");
     }
@@ -281,8 +260,7 @@ public abstract class AbstractHaServices implements 
HighAvailabilityServices {
      * Clean up the meta data in the distributed system(e.g. Zookeeper, 
Kubernetes ConfigMap).
      *
      * <p>If an exception occurs during internal cleanup, we will continue the 
cleanup in {@link
-     * #closeAndCleanupAllData} and report exceptions only after all cleanup 
steps have been
-     * attempted.
+     * #cleanupAllData} and report exceptions only after all cleanup steps 
have been attempted.
      *
      * @throws Exception when do the cleanup operation on external storage.
      */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 97ef9a4b1fe..1092bd20663 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableResource;
 import org.apache.flink.runtime.jobmanager.JobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElection;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.concurrent.FutureUtils;
 
 import java.io.IOException;
@@ -212,19 +213,42 @@ public interface HighAvailabilityServices
     void close() throws Exception;
 
     /**
-     * Closes the high availability services (releasing all resources) and 
deletes all data stored
-     * by these services in external stores.
+     * Deletes all data stored by high availability services in external 
stores.
      *
-     * <p>After this method was called, the any job or session that was 
managed by these high
+     * <p>After this method was called, any job or session that was managed by 
these high
      * availability services will be unrecoverable.
      *
      * <p>If an exception occurs during cleanup, this method will attempt to 
continue the cleanup
      * and report exceptions only after all cleanup steps have been attempted.
      *
-     * @throws Exception Thrown, if an exception occurred while closing these 
services or cleaning
-     *     up data stored by them.
+     * @throws Exception if an error occurred while cleaning up data stored by 
them.
      */
-    void closeAndCleanupAllData() throws Exception;
+    void cleanupAllData() throws Exception;
+
+    /**
+     * Calls {@link #cleanupAllData()} (if {@code true} is passed as a 
parameter) before calling
+     * {@link #close()} on this instance. Any error that appeared during the 
cleanup will be
+     * propagated after calling {@code close()}.
+     */
+    default void closeWithOptionalClean(boolean cleanupData) throws Exception {
+        Throwable exception = null;
+        if (cleanupData) {
+            try {
+                cleanupAllData();
+            } catch (Throwable t) {
+                exception = ExceptionUtils.firstOrSuppressed(t, exception);
+            }
+        }
+        try {
+            close();
+        } catch (Throwable t) {
+            exception = ExceptionUtils.firstOrSuppressed(t, exception);
+        }
+
+        if (exception != null) {
+            ExceptionUtils.rethrowException(exception);
+        }
+    }
 
     @Override
     default CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor 
executor) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
index ddb62b368e5..2ecd3985bba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -106,9 +106,8 @@ public abstract class AbstractNonHaServices implements 
HighAvailabilityServices
     }
 
     @Override
-    public void closeAndCleanupAllData() throws Exception {
-        // this stores no data, so this method is the same as 'close()'
-        close();
+    public void cleanupAllData() throws Exception {
+        // this stores no data, do nothing here
     }
 
     // ----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 6de16027cc4..9a75211e795 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -1276,15 +1276,7 @@ public class MiniCluster implements AutoCloseableAsync {
 
             // shut down high-availability services
             if (haServices != null) {
-                try {
-                    if (cleanupHaData) {
-                        haServices.closeAndCleanupAllData();
-                    } else {
-                        haServices.close();
-                    }
-                } catch (Exception e) {
-                    exception = ExceptionUtils.firstOrSuppressed(e, exception);
-                }
+                haServices.closeWithOptionalClean(cleanupHaData);
                 haServices = null;
             }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java
index 237e134f989..3ee984ff0b9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java
@@ -94,7 +94,8 @@ class BlobCacheCorruptionTest {
                     TempDirUtils.newFolder(tempDir));
         } finally {
             if (blobStoreService != null) {
-                blobStoreService.closeAndCleanupAllData();
+                blobStoreService.cleanupAllData();
+                blobStoreService.close();
             }
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
index 23012cc7d57..798b7d48ee9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
@@ -51,7 +51,8 @@ public class BlobCacheRecoveryTest {
                     config, blobStoreService, TempDirUtils.newFolder(tempDir));
         } finally {
             if (blobStoreService != null) {
-                blobStoreService.closeAndCleanupAllData();
+                blobStoreService.cleanupAllData();
+                blobStoreService.close();
             }
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
index 94e6d5bdc9b..791875f03b5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
@@ -80,7 +80,8 @@ class BlobCacheRetriesTest {
             testBlobFetchRetries(blobStoreService, new JobID(), 
PERMANENT_BLOB);
         } finally {
             if (blobStoreService != null) {
-                blobStoreService.closeAndCleanupAllData();
+                blobStoreService.cleanupAllData();
+                blobStoreService.close();
             }
         }
     }
@@ -148,7 +149,8 @@ class BlobCacheRetriesTest {
             testBlobFetchWithTooManyFailures(blobStoreService, new JobID(), 
PERMANENT_BLOB);
         } finally {
             if (blobStoreService != null) {
-                blobStoreService.closeAndCleanupAllData();
+                blobStoreService.cleanupAllData();
+                blobStoreService.close();
             }
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
index 984720cc221..aa7c443a7ad 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
@@ -192,7 +192,8 @@ class BlobCacheSuccessTest {
             }
         } finally {
             if (blobStoreService != null) {
-                blobStoreService.closeAndCleanupAllData();
+                blobStoreService.cleanupAllData();
+                blobStoreService.close();
             }
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java
index 424a8b02725..e43347f8e71 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java
@@ -62,7 +62,8 @@ class BlobServerCorruptionTest {
                     config, blobStoreService, TempDirUtils.newFolder(tempDir));
         } finally {
             if (blobStoreService != null) {
-                blobStoreService.closeAndCleanupAllData();
+                blobStoreService.cleanupAllData();
+                blobStoreService.close();
             }
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index 5f3c4d1987c..8516315412d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -207,7 +207,8 @@ class BlobServerGetTest {
             }
         } finally {
             if (blobStore != null) {
-                blobStore.closeAndCleanupAllData();
+                blobStore.cleanupAllData();
+                blobStore.close();
             }
         }
     }
@@ -269,7 +270,8 @@ class BlobServerGetTest {
             }
         } finally {
             if (blobStore != null) {
-                blobStore.closeAndCleanupAllData();
+                blobStore.cleanupAllData();
+                blobStore.close();
             }
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
index 31de327709a..eca49278a3e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
@@ -51,7 +51,8 @@ class BlobServerRecoveryTest {
                     config, blobStoreService, TempDirUtils.newFolder(tempDir));
         } finally {
             if (blobStoreService != null) {
-                blobStoreService.closeAndCleanupAllData();
+                blobStoreService.cleanupAllData();
+                blobStoreService.close();
             }
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
index 66fc553512c..7550372c353 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
@@ -118,7 +118,7 @@ public class AbstractDispatcherTest extends TestLogger {
     @After
     public void tearDown() throws Exception {
         if (haServices != null) {
-            haServices.closeAndCleanupAllData();
+            haServices.closeWithOptionalClean(true);
         }
         if (blobServer != null) {
             blobServer.close();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
index ac2d7a3197a..8b2424e8694 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
@@ -134,11 +134,11 @@ public class ClusterEntrypointTest extends TestLogger {
     @Test
     public void testCloseAsyncShouldNotCleanUpHAData() throws Exception {
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-        final CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+        final CompletableFuture<Void> cleanupAllDataFuture = new 
CompletableFuture<>();
         final HighAvailabilityServices testingHaService =
                 new TestingHighAvailabilityServicesBuilder()
                         .setCloseFuture(closeFuture)
-                        
.setCloseAndCleanupAllDataFuture(closeAndCleanupAllDataFuture)
+                        .setCleanupAllDataFuture(cleanupAllDataFuture)
                         .build();
         final TestingEntryPoint testingEntryPoint =
                 new TestingEntryPoint.Builder()
@@ -154,7 +154,7 @@ public class ClusterEntrypointTest extends TestLogger {
                 appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
                 is(ApplicationStatus.UNKNOWN));
         assertThat(closeFuture.isDone(), is(true));
-        assertThat(closeAndCleanupAllDataFuture.isDone(), is(false));
+        assertThat(cleanupAllDataFuture.isDone(), is(false));
     }
 
     @Test
@@ -184,13 +184,13 @@ public class ClusterEntrypointTest extends TestLogger {
     @Test
     public void 
testClusterFinishedNormallyShouldDeregisterAppAndCleanupHAData() throws 
Exception {
         final CompletableFuture<Void> deregisterFuture = new 
CompletableFuture<>();
-        final CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+        final CompletableFuture<Void> cleanupAllDataFuture = new 
CompletableFuture<>();
         final CompletableFuture<ApplicationStatus> dispatcherShutDownFuture =
                 new CompletableFuture<>();
 
         final HighAvailabilityServices testingHaService =
                 new TestingHighAvailabilityServicesBuilder()
-                        
.setCloseAndCleanupAllDataFuture(closeAndCleanupAllDataFuture)
+                        .setCleanupAllDataFuture(cleanupAllDataFuture)
                         .build();
         final TestingResourceManagerFactory testingResourceManagerFactory =
                 new TestingResourceManagerFactory.Builder()
@@ -221,7 +221,7 @@ public class ClusterEntrypointTest extends TestLogger {
                 appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
                 is(ApplicationStatus.SUCCEEDED));
         assertThat(deregisterFuture.isDone(), is(true));
-        assertThat(closeAndCleanupAllDataFuture.isDone(), is(true));
+        assertThat(cleanupAllDataFuture.isDone(), is(true));
     }
 
     @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index 95eb16ab4ff..1c4484166e5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -210,7 +210,8 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
             }
 
             if (blobStoreService != null) {
-                blobStoreService.closeAndCleanupAllData();
+                blobStoreService.cleanupAllData();
+                blobStoreService.close();
             }
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
index 949f54e5a89..c70860cdb37 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
@@ -66,13 +66,13 @@ class AbstractHaServicesTest {
                         () -> 
closeOperations.offer(CloseOperations.HA_CLEANUP),
                         ignored -> {});
 
-        haServices.closeAndCleanupAllData();
+        haServices.closeWithOptionalClean(true);
 
         assertThat(closeOperations)
                 .contains(
                         CloseOperations.HA_CLEANUP,
                         CloseOperations.HA_CLOSE,
-                        CloseOperations.BLOB_CLEANUP_AND_CLOSE);
+                        CloseOperations.BLOB_CLEANUP);
     }
 
     /**
@@ -97,7 +97,8 @@ class AbstractHaServicesTest {
                         },
                         ignored -> {});
 
-        
assertThatThrownBy(haServices::closeAndCleanupAllData).isInstanceOf(FlinkException.class);
+        assertThatThrownBy(() -> haServices.closeWithOptionalClean(true))
+                .isInstanceOf(FlinkException.class);
         assertThat(closeOperations).contains(CloseOperations.HA_CLOSE, 
CloseOperations.BLOB_CLOSE);
     }
 
@@ -127,7 +128,7 @@ class AbstractHaServicesTest {
     private enum CloseOperations {
         HA_CLEANUP,
         HA_CLOSE,
-        BLOB_CLEANUP_AND_CLOSE,
+        BLOB_CLEANUP,
         BLOB_CLOSE,
     }
 
@@ -140,8 +141,8 @@ class AbstractHaServicesTest {
         }
 
         @Override
-        public void closeAndCleanupAllData() {
-            closeOperations.offer(CloseOperations.BLOB_CLEANUP_AND_CLOSE);
+        public void cleanupAllData() {
+            closeOperations.offer(CloseOperations.BLOB_CLEANUP);
         }
 
         @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index e8e744d0dc4..d68a46db797 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -72,7 +72,7 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
 
     private CompletableFuture<Void> closeFuture = new CompletableFuture<>();
 
-    private CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+    private CompletableFuture<Void> cleanupAllDataFuture = new 
CompletableFuture<>();
 
     private volatile CompletableFuture<JobID> globalCleanupFuture;
 
@@ -142,9 +142,8 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
         this.closeFuture = closeFuture;
     }
 
-    public void setCloseAndCleanupAllDataFuture(
-            CompletableFuture<Void> closeAndCleanupAllDataFuture) {
-        this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
+    public void setCleanupAllDataFuture(CompletableFuture<Void> 
cleanupAllDataFuture) {
+        this.cleanupAllDataFuture = cleanupAllDataFuture;
     }
 
     public void setGlobalCleanupFuture(CompletableFuture<JobID> 
globalCleanupFuture) {
@@ -280,8 +279,8 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
     }
 
     @Override
-    public void closeAndCleanupAllData() throws Exception {
-        closeAndCleanupAllDataFuture.complete(null);
+    public void cleanupAllData() throws Exception {
+        cleanupAllDataFuture.complete(null);
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServicesBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServicesBuilder.java
index 7b3cab432cf..66fb485d40d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServicesBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServicesBuilder.java
@@ -74,7 +74,7 @@ public class TestingHighAvailabilityServicesBuilder {
 
     private CompletableFuture<Void> closeFuture = new CompletableFuture<>();
 
-    private CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+    private CompletableFuture<Void> cleanupAllDataFuture = new 
CompletableFuture<>();
 
     public TestingHighAvailabilityServices build() {
         final TestingHighAvailabilityServices testingHighAvailabilityServices =
@@ -102,8 +102,7 @@ public class TestingHighAvailabilityServicesBuilder {
         testingHighAvailabilityServices.setJobResultStore(jobResultStore);
 
         testingHighAvailabilityServices.setCloseFuture(closeFuture);
-        testingHighAvailabilityServices.setCloseAndCleanupAllDataFuture(
-                closeAndCleanupAllDataFuture);
+        
testingHighAvailabilityServices.setCleanupAllDataFuture(cleanupAllDataFuture);
 
         return testingHighAvailabilityServices;
     }
@@ -178,9 +177,9 @@ public class TestingHighAvailabilityServicesBuilder {
         return this;
     }
 
-    public TestingHighAvailabilityServicesBuilder 
setCloseAndCleanupAllDataFuture(
-            CompletableFuture<Void> closeAndCleanupAllDataFuture) {
-        this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
+    public TestingHighAvailabilityServicesBuilder setCleanupAllDataFuture(
+            CompletableFuture<Void> cleanupAllDataFuture) {
+        this.cleanupAllDataFuture = cleanupAllDataFuture;
         return this;
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java
index 8b6da28cd5f..61220aa2b0f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java
@@ -57,7 +57,7 @@ public class EmbeddedHaServicesTest extends TestLogger {
     @After
     public void teardownTest() throws Exception {
         if (embeddedHaServices != null) {
-            embeddedHaServices.closeAndCleanupAllData();
+            embeddedHaServices.closeWithOptionalClean(true);
             embeddedHaServices = null;
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
index ee4aacd24eb..f9de159d016 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
@@ -54,7 +54,7 @@ public class StandaloneHaServicesTest extends TestLogger {
     @After
     public void teardownTest() throws Exception {
         if (standaloneHaServices != null) {
-            standaloneHaServices.closeAndCleanupAllData();
+            standaloneHaServices.closeWithOptionalClean(true);
             standaloneHaServices = null;
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
index ba7975c7b78..14b6c35c193 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
@@ -100,8 +100,7 @@ class ZooKeeperLeaderRetrievalTest {
     @AfterEach
     void after() throws Exception {
         if (highAvailabilityServices != null) {
-            highAvailabilityServices.closeAndCleanupAllData();
-
+            highAvailabilityServices.closeWithOptionalClean(true);
             highAvailabilityServices = null;
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index ea65581e84d..e8e689deebb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -147,7 +147,7 @@ class ResourceManagerTest {
         }
 
         if (highAvailabilityServices != null) {
-            highAvailabilityServices.closeAndCleanupAllData();
+            highAvailabilityServices.closeWithOptionalClean(true);
         }
 
         if (testingFatalErrorHandler.hasExceptionOccurred()) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
index b1eeeb9438a..39ff43860ce 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
@@ -97,7 +97,7 @@ class TaskManagerRunnerConfigurationTest {
             
assertThat(taskManagerRpcService.getAddress()).isEqualTo(taskmanagerHost);
         } finally {
             maybeCloseRpcService(taskManagerRpcService);
-            highAvailabilityServices.closeAndCleanupAllData();
+            highAvailabilityServices.closeWithOptionalClean(true);
         }
     }
 
@@ -115,7 +115,7 @@ class TaskManagerRunnerConfigurationTest {
             
assertThat(taskManagerRpcService.getAddress()).isNotNull().isNotEmpty();
         } finally {
             maybeCloseRpcService(taskManagerRpcService);
-            highAvailabilityServices.closeAndCleanupAllData();
+            highAvailabilityServices.closeWithOptionalClean(true);
         }
     }
 
@@ -136,7 +136,7 @@ class TaskManagerRunnerConfigurationTest {
             
assertThat(taskManagerRpcService.getAddress()).matches(InetAddresses::isInetAddress);
         } finally {
             maybeCloseRpcService(taskManagerRpcService);
-            highAvailabilityServices.closeAndCleanupAllData();
+            highAvailabilityServices.closeWithOptionalClean(true);
             IOUtils.closeQuietly(testJobManagerSocket);
         }
     }
@@ -159,7 +159,7 @@ class TaskManagerRunnerConfigurationTest {
                     .isInstanceOf(IllegalArgumentException.class)
                     .hasMessage("Invalid port range definition: -1");
         } finally {
-            highAvailabilityServices.closeAndCleanupAllData();
+            highAvailabilityServices.closeWithOptionalClean(true);
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
index cb4a68e1300..9e937eb282b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
@@ -105,7 +105,7 @@ class TaskManagerRunnerStartupTest {
 
     @AfterEach
     void tearDownTest() throws Exception {
-        highAvailabilityServices.closeAndCleanupAllData();
+        highAvailabilityServices.closeWithOptionalClean(true);
         highAvailabilityServices = null;
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
index 6f6cce4cc08..28525dd9d55 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
@@ -406,7 +406,7 @@ class JobManagerHAProcessFailureRecoveryITCase {
             }
 
             if (highAvailabilityServices != null) {
-                highAvailabilityServices.closeAndCleanupAllData();
+                highAvailabilityServices.closeWithOptionalClean(true);
             }
 
             RpcUtils.terminateRpcService(rpcService);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 1007a2168f9..93b49a98662 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -247,7 +247,7 @@ class ProcessFailureCancelingITCase {
 
             RpcUtils.terminateRpcService(rpcService);
 
-            haServices.closeAndCleanupAllData();
+            haServices.closeWithOptionalClean(true);
         }
     }
 


Reply via email to