This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new cd95b560d0c [FLINK-32445][runtime] Refactors the
closeAndCleanupAllData methods in BlobStoreService and HighAvailabilityServices
(#23424)
cd95b560d0c is described below
commit cd95b560d0c11a64b42bf6b98107314d32a4de86
Author: Jiabao Sun <[email protected]>
AuthorDate: Wed Oct 4 23:16:38 2023 -0500
[FLINK-32445][runtime] Refactors the closeAndCleanupAllData methods in
BlobStoreService and HighAvailabilityServices (#23424)
---
.../java/org/apache/flink/hdfstests/HDFSTest.java | 12 ++++---
.../flink/runtime/blob/BlobStoreService.java | 4 +--
.../flink/runtime/blob/FileSystemBlobStore.java | 2 +-
.../apache/flink/runtime/blob/VoidBlobStore.java | 2 +-
.../runtime/entrypoint/ClusterEntrypoint.java | 6 +---
.../highavailability/AbstractHaServices.java | 40 +++++-----------------
.../highavailability/HighAvailabilityServices.java | 36 +++++++++++++++----
.../nonha/AbstractNonHaServices.java | 5 ++-
.../flink/runtime/minicluster/MiniCluster.java | 10 +-----
.../runtime/blob/BlobCacheCorruptionTest.java | 3 +-
.../flink/runtime/blob/BlobCacheRecoveryTest.java | 3 +-
.../flink/runtime/blob/BlobCacheRetriesTest.java | 6 ++--
.../flink/runtime/blob/BlobCacheSuccessTest.java | 3 +-
.../runtime/blob/BlobServerCorruptionTest.java | 3 +-
.../flink/runtime/blob/BlobServerGetTest.java | 6 ++--
.../flink/runtime/blob/BlobServerRecoveryTest.java | 3 +-
.../runtime/dispatcher/AbstractDispatcherTest.java | 2 +-
.../runtime/entrypoint/ClusterEntrypointTest.java | 12 +++----
.../BlobLibraryCacheRecoveryITCase.java | 3 +-
.../highavailability/AbstractHaServicesTest.java | 13 +++----
.../TestingHighAvailabilityServices.java | 11 +++---
.../TestingHighAvailabilityServicesBuilder.java | 11 +++---
.../nonha/embedded/EmbeddedHaServicesTest.java | 2 +-
.../nonha/standalone/StandaloneHaServicesTest.java | 2 +-
.../ZooKeeperLeaderRetrievalTest.java | 3 +-
.../resourcemanager/ResourceManagerTest.java | 2 +-
.../TaskManagerRunnerConfigurationTest.java | 8 ++---
.../taskexecutor/TaskManagerRunnerStartupTest.java | 2 +-
.../JobManagerHAProcessFailureRecoveryITCase.java | 2 +-
.../recovery/ProcessFailureCancelingITCase.java | 2 +-
30 files changed, 110 insertions(+), 109 deletions(-)
diff --git
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
index ffd0d37297d..04e1196b5ca 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
@@ -215,7 +215,8 @@ public class HDFSTest {
TestingBlobHelpers.testBlobServerRecovery(
config, blobStoreService, temporaryFolder.newFolder());
} finally {
- blobStoreService.closeAndCleanupAllData();
+ blobStoreService.cleanupAllData();
+ blobStoreService.close();
}
}
@@ -238,7 +239,8 @@ public class HDFSTest {
TestingBlobHelpers.testGetFailsFromCorruptFile(
config, blobStoreService, temporaryFolder.newFolder());
} finally {
- blobStoreService.closeAndCleanupAllData();
+ blobStoreService.cleanupAllData();
+ blobStoreService.close();
}
}
@@ -261,7 +263,8 @@ public class HDFSTest {
TestingBlobHelpers.testBlobCacheRecovery(
config, blobStoreService, temporaryFolder.newFolder());
} finally {
- blobStoreService.closeAndCleanupAllData();
+ blobStoreService.cleanupAllData();
+ blobStoreService.close();
}
}
@@ -284,7 +287,8 @@ public class HDFSTest {
TestingBlobHelpers.testGetFailsFromCorruptFile(
new JobID(), config, blobStoreService,
temporaryFolder.newFolder());
} finally {
- blobStoreService.closeAndCleanupAllData();
+ blobStoreService.cleanupAllData();
+ blobStoreService.close();
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java
index 805d863c3cc..23b203892a1 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java
@@ -23,6 +23,6 @@ import java.io.Closeable;
/** Service interface for the BlobStore which allows to close and clean up its
data. */
public interface BlobStoreService extends BlobStore, Closeable {
- /** Closes and cleans up the store. This entails the deletion of all
blobs. */
- void closeAndCleanupAllData();
+ /** Cleans up the store. This entails the deletion of all blobs. */
+ void cleanupAllData();
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index 141f270e2f8..318309c1ad6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -186,7 +186,7 @@ public class FileSystemBlobStore implements
BlobStoreService {
}
@Override
- public void closeAndCleanupAllData() {
+ public void cleanupAllData() {
try {
LOG.debug("Cleaning up {}.", basePath);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
index ffeeffea9e4..c9822cd8dbb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
@@ -47,7 +47,7 @@ public class VoidBlobStore implements BlobStoreService {
}
@Override
- public void closeAndCleanupAllData() {}
+ public void cleanupAllData() {}
@Override
public void close() throws IOException {}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 8bbe80877e8..df91a323141 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -499,11 +499,7 @@ public abstract class ClusterEntrypoint implements
AutoCloseableAsync, FatalErro
if (haServices != null) {
try {
- if (cleanupHaData) {
- haServices.closeAndCleanupAllData();
- } else {
- haServices.close();
- }
+ haServices.closeWithOptionalClean(cleanupHaData);
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
index bcca7f3e866..aecd38ed29a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
@@ -47,8 +47,7 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
* #getLeaderPathForRestServer}. The returned leader name is the ConfigMap
name in Kubernetes and
* child path in Zookeeper.
*
- * <p>{@link #close()} and {@link #closeAndCleanupAllData()} should be
implemented to destroy the
- * resources.
+ * <p>{@link #close()} and {@link #cleanupAllData()} should be implemented to
destroy the resources.
*
* <p>The abstract class is also responsible for determining which component
service should be
* reused. For example, {@link #jobResultStore} is created once and could be
reused many times.
@@ -182,8 +181,8 @@ public abstract class AbstractHaServices implements
HighAvailabilityServices {
}
@Override
- public void closeAndCleanupAllData() throws Exception {
- logger.info("Close and clean up all data for {}.",
getClass().getSimpleName());
+ public void cleanupAllData() throws Exception {
+ logger.info("Clean up all data for {}.", getClass().getSimpleName());
Throwable exception = null;
@@ -192,40 +191,20 @@ public abstract class AbstractHaServices implements
HighAvailabilityServices {
try {
internalCleanup();
deletedHAData = true;
+ blobStoreService.cleanupAllData();
} catch (Exception t) {
exception = t;
}
- try {
- if (leaderElectionService != null) {
- leaderElectionService.close();
- }
- } catch (Throwable t) {
- exception = ExceptionUtils.firstOrSuppressed(t, exception);
- }
-
- try {
- internalClose();
- } catch (Throwable t) {
- exception = ExceptionUtils.firstOrSuppressed(t, exception);
- }
-
- try {
- if (deletedHAData) {
- blobStoreService.closeAndCleanupAllData();
- } else {
- logger.info(
- "Cannot delete HA blobs because we failed to delete
the pointers in the HA store.");
- blobStoreService.close();
- }
- } catch (Throwable t) {
- exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ if (!deletedHAData) {
+ logger.info(
+ "Cannot delete HA blobs because we failed to delete the
pointers in the HA store.");
}
if (exception != null) {
ExceptionUtils.rethrowException(
exception,
- "Could not properly close and clean up all data of high
availability service.");
+ "Could not properly clean up all data of high availability
service.");
}
logger.info("Finished cleaning up the high availability data.");
}
@@ -281,8 +260,7 @@ public abstract class AbstractHaServices implements
HighAvailabilityServices {
* Clean up the meta data in the distributed system(e.g. Zookeeper,
Kubernetes ConfigMap).
*
* <p>If an exception occurs during internal cleanup, we will continue the
cleanup in {@link
- * #closeAndCleanupAllData} and report exceptions only after all cleanup
steps have been
- * attempted.
+ * #cleanupAllData} and report exceptions only after all cleanup steps
have been attempted.
*
* @throws Exception when do the cleanup operation on external storage.
*/
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 97ef9a4b1fe..1092bd20663 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -25,6 +25,7 @@ import
org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableResource;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import java.io.IOException;
@@ -212,19 +213,42 @@ public interface HighAvailabilityServices
void close() throws Exception;
/**
- * Closes the high availability services (releasing all resources) and
deletes all data stored
- * by these services in external stores.
+ * Deletes all data stored by high availability services in external
stores.
*
- * <p>After this method was called, the any job or session that was
managed by these high
+ * <p>After this method was called, any job or session that was managed by
these high
* availability services will be unrecoverable.
*
* <p>If an exception occurs during cleanup, this method will attempt to
continue the cleanup
* and report exceptions only after all cleanup steps have been attempted.
*
- * @throws Exception Thrown, if an exception occurred while closing these
services or cleaning
- * up data stored by them.
+ * @throws Exception if an error occurred while cleaning up data stored by
them.
*/
- void closeAndCleanupAllData() throws Exception;
+ void cleanupAllData() throws Exception;
+
+ /**
+ * Calls {@link #cleanupAllData()} (if {@code true} is passed as a
parameter) before calling
+ * {@link #close()} on this instance. Any error that appeared during the
cleanup will be
+ * propagated after calling {@code close()}.
+ */
+ default void closeWithOptionalClean(boolean cleanupData) throws Exception {
+ Throwable exception = null;
+ if (cleanupData) {
+ try {
+ cleanupAllData();
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+ try {
+ close();
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+
+ if (exception != null) {
+ ExceptionUtils.rethrowException(exception);
+ }
+ }
@Override
default CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor
executor) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
index ddb62b368e5..2ecd3985bba 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -106,9 +106,8 @@ public abstract class AbstractNonHaServices implements
HighAvailabilityServices
}
@Override
- public void closeAndCleanupAllData() throws Exception {
- // this stores no data, so this method is the same as 'close()'
- close();
+ public void cleanupAllData() throws Exception {
+ // this stores no data, do nothing here
}
// ----------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 6de16027cc4..9a75211e795 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -1276,15 +1276,7 @@ public class MiniCluster implements AutoCloseableAsync {
// shut down high-availability services
if (haServices != null) {
- try {
- if (cleanupHaData) {
- haServices.closeAndCleanupAllData();
- } else {
- haServices.close();
- }
- } catch (Exception e) {
- exception = ExceptionUtils.firstOrSuppressed(e, exception);
- }
+ haServices.closeWithOptionalClean(cleanupHaData);
haServices = null;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java
index 237e134f989..3ee984ff0b9 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCorruptionTest.java
@@ -94,7 +94,8 @@ class BlobCacheCorruptionTest {
TempDirUtils.newFolder(tempDir));
} finally {
if (blobStoreService != null) {
- blobStoreService.closeAndCleanupAllData();
+ blobStoreService.cleanupAllData();
+ blobStoreService.close();
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
index 23012cc7d57..798b7d48ee9 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
@@ -51,7 +51,8 @@ public class BlobCacheRecoveryTest {
config, blobStoreService, TempDirUtils.newFolder(tempDir));
} finally {
if (blobStoreService != null) {
- blobStoreService.closeAndCleanupAllData();
+ blobStoreService.cleanupAllData();
+ blobStoreService.close();
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
index 94e6d5bdc9b..791875f03b5 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
@@ -80,7 +80,8 @@ class BlobCacheRetriesTest {
testBlobFetchRetries(blobStoreService, new JobID(),
PERMANENT_BLOB);
} finally {
if (blobStoreService != null) {
- blobStoreService.closeAndCleanupAllData();
+ blobStoreService.cleanupAllData();
+ blobStoreService.close();
}
}
}
@@ -148,7 +149,8 @@ class BlobCacheRetriesTest {
testBlobFetchWithTooManyFailures(blobStoreService, new JobID(),
PERMANENT_BLOB);
} finally {
if (blobStoreService != null) {
- blobStoreService.closeAndCleanupAllData();
+ blobStoreService.cleanupAllData();
+ blobStoreService.close();
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
index 984720cc221..aa7c443a7ad 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
@@ -192,7 +192,8 @@ class BlobCacheSuccessTest {
}
} finally {
if (blobStoreService != null) {
- blobStoreService.closeAndCleanupAllData();
+ blobStoreService.cleanupAllData();
+ blobStoreService.close();
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java
index 424a8b02725..e43347f8e71 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCorruptionTest.java
@@ -62,7 +62,8 @@ class BlobServerCorruptionTest {
config, blobStoreService, TempDirUtils.newFolder(tempDir));
} finally {
if (blobStoreService != null) {
- blobStoreService.closeAndCleanupAllData();
+ blobStoreService.cleanupAllData();
+ blobStoreService.close();
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index 5f3c4d1987c..8516315412d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -207,7 +207,8 @@ class BlobServerGetTest {
}
} finally {
if (blobStore != null) {
- blobStore.closeAndCleanupAllData();
+ blobStore.cleanupAllData();
+ blobStore.close();
}
}
}
@@ -269,7 +270,8 @@ class BlobServerGetTest {
}
} finally {
if (blobStore != null) {
- blobStore.closeAndCleanupAllData();
+ blobStore.cleanupAllData();
+ blobStore.close();
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
index 31de327709a..eca49278a3e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
@@ -51,7 +51,8 @@ class BlobServerRecoveryTest {
config, blobStoreService, TempDirUtils.newFolder(tempDir));
} finally {
if (blobStoreService != null) {
- blobStoreService.closeAndCleanupAllData();
+ blobStoreService.cleanupAllData();
+ blobStoreService.close();
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
index 66fc553512c..7550372c353 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
@@ -118,7 +118,7 @@ public class AbstractDispatcherTest extends TestLogger {
@After
public void tearDown() throws Exception {
if (haServices != null) {
- haServices.closeAndCleanupAllData();
+ haServices.closeWithOptionalClean(true);
}
if (blobServer != null) {
blobServer.close();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
index ac2d7a3197a..8b2424e8694 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
@@ -134,11 +134,11 @@ public class ClusterEntrypointTest extends TestLogger {
@Test
public void testCloseAsyncShouldNotCleanUpHAData() throws Exception {
final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
- final CompletableFuture<Void> closeAndCleanupAllDataFuture = new
CompletableFuture<>();
+ final CompletableFuture<Void> cleanupAllDataFuture = new
CompletableFuture<>();
final HighAvailabilityServices testingHaService =
new TestingHighAvailabilityServicesBuilder()
.setCloseFuture(closeFuture)
-
.setCloseAndCleanupAllDataFuture(closeAndCleanupAllDataFuture)
+ .setCleanupAllDataFuture(cleanupAllDataFuture)
.build();
final TestingEntryPoint testingEntryPoint =
new TestingEntryPoint.Builder()
@@ -154,7 +154,7 @@ public class ClusterEntrypointTest extends TestLogger {
appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
is(ApplicationStatus.UNKNOWN));
assertThat(closeFuture.isDone(), is(true));
- assertThat(closeAndCleanupAllDataFuture.isDone(), is(false));
+ assertThat(cleanupAllDataFuture.isDone(), is(false));
}
@Test
@@ -184,13 +184,13 @@ public class ClusterEntrypointTest extends TestLogger {
@Test
public void
testClusterFinishedNormallyShouldDeregisterAppAndCleanupHAData() throws
Exception {
final CompletableFuture<Void> deregisterFuture = new
CompletableFuture<>();
- final CompletableFuture<Void> closeAndCleanupAllDataFuture = new
CompletableFuture<>();
+ final CompletableFuture<Void> cleanupAllDataFuture = new
CompletableFuture<>();
final CompletableFuture<ApplicationStatus> dispatcherShutDownFuture =
new CompletableFuture<>();
final HighAvailabilityServices testingHaService =
new TestingHighAvailabilityServicesBuilder()
-
.setCloseAndCleanupAllDataFuture(closeAndCleanupAllDataFuture)
+ .setCleanupAllDataFuture(cleanupAllDataFuture)
.build();
final TestingResourceManagerFactory testingResourceManagerFactory =
new TestingResourceManagerFactory.Builder()
@@ -221,7 +221,7 @@ public class ClusterEntrypointTest extends TestLogger {
appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
is(ApplicationStatus.SUCCEEDED));
assertThat(deregisterFuture.isDone(), is(true));
- assertThat(closeAndCleanupAllDataFuture.isDone(), is(true));
+ assertThat(cleanupAllDataFuture.isDone(), is(true));
}
@Test
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index 95eb16ab4ff..1c4484166e5 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -210,7 +210,8 @@ public class BlobLibraryCacheRecoveryITCase extends
TestLogger {
}
if (blobStoreService != null) {
- blobStoreService.closeAndCleanupAllData();
+ blobStoreService.cleanupAllData();
+ blobStoreService.close();
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
index 949f54e5a89..c70860cdb37 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
@@ -66,13 +66,13 @@ class AbstractHaServicesTest {
() ->
closeOperations.offer(CloseOperations.HA_CLEANUP),
ignored -> {});
- haServices.closeAndCleanupAllData();
+ haServices.closeWithOptionalClean(true);
assertThat(closeOperations)
.contains(
CloseOperations.HA_CLEANUP,
CloseOperations.HA_CLOSE,
- CloseOperations.BLOB_CLEANUP_AND_CLOSE);
+ CloseOperations.BLOB_CLEANUP);
}
/**
@@ -97,7 +97,8 @@ class AbstractHaServicesTest {
},
ignored -> {});
-
assertThatThrownBy(haServices::closeAndCleanupAllData).isInstanceOf(FlinkException.class);
+ assertThatThrownBy(() -> haServices.closeWithOptionalClean(true))
+ .isInstanceOf(FlinkException.class);
assertThat(closeOperations).contains(CloseOperations.HA_CLOSE,
CloseOperations.BLOB_CLOSE);
}
@@ -127,7 +128,7 @@ class AbstractHaServicesTest {
private enum CloseOperations {
HA_CLEANUP,
HA_CLOSE,
- BLOB_CLEANUP_AND_CLOSE,
+ BLOB_CLEANUP,
BLOB_CLOSE,
}
@@ -140,8 +141,8 @@ class AbstractHaServicesTest {
}
@Override
- public void closeAndCleanupAllData() {
- closeOperations.offer(CloseOperations.BLOB_CLEANUP_AND_CLOSE);
+ public void cleanupAllData() {
+ closeOperations.offer(CloseOperations.BLOB_CLEANUP);
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index e8e744d0dc4..d68a46db797 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -72,7 +72,7 @@ public class TestingHighAvailabilityServices implements
HighAvailabilityServices
private CompletableFuture<Void> closeFuture = new CompletableFuture<>();
- private CompletableFuture<Void> closeAndCleanupAllDataFuture = new
CompletableFuture<>();
+ private CompletableFuture<Void> cleanupAllDataFuture = new
CompletableFuture<>();
private volatile CompletableFuture<JobID> globalCleanupFuture;
@@ -142,9 +142,8 @@ public class TestingHighAvailabilityServices implements
HighAvailabilityServices
this.closeFuture = closeFuture;
}
- public void setCloseAndCleanupAllDataFuture(
- CompletableFuture<Void> closeAndCleanupAllDataFuture) {
- this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
+ public void setCleanupAllDataFuture(CompletableFuture<Void>
cleanupAllDataFuture) {
+ this.cleanupAllDataFuture = cleanupAllDataFuture;
}
public void setGlobalCleanupFuture(CompletableFuture<JobID>
globalCleanupFuture) {
@@ -280,8 +279,8 @@ public class TestingHighAvailabilityServices implements
HighAvailabilityServices
}
@Override
- public void closeAndCleanupAllData() throws Exception {
- closeAndCleanupAllDataFuture.complete(null);
+ public void cleanupAllData() throws Exception {
+ cleanupAllDataFuture.complete(null);
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServicesBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServicesBuilder.java
index 7b3cab432cf..66fb485d40d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServicesBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServicesBuilder.java
@@ -74,7 +74,7 @@ public class TestingHighAvailabilityServicesBuilder {
private CompletableFuture<Void> closeFuture = new CompletableFuture<>();
- private CompletableFuture<Void> closeAndCleanupAllDataFuture = new
CompletableFuture<>();
+ private CompletableFuture<Void> cleanupAllDataFuture = new
CompletableFuture<>();
public TestingHighAvailabilityServices build() {
final TestingHighAvailabilityServices testingHighAvailabilityServices =
@@ -102,8 +102,7 @@ public class TestingHighAvailabilityServicesBuilder {
testingHighAvailabilityServices.setJobResultStore(jobResultStore);
testingHighAvailabilityServices.setCloseFuture(closeFuture);
- testingHighAvailabilityServices.setCloseAndCleanupAllDataFuture(
- closeAndCleanupAllDataFuture);
+
testingHighAvailabilityServices.setCleanupAllDataFuture(cleanupAllDataFuture);
return testingHighAvailabilityServices;
}
@@ -178,9 +177,9 @@ public class TestingHighAvailabilityServicesBuilder {
return this;
}
- public TestingHighAvailabilityServicesBuilder
setCloseAndCleanupAllDataFuture(
- CompletableFuture<Void> closeAndCleanupAllDataFuture) {
- this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
+ public TestingHighAvailabilityServicesBuilder setCleanupAllDataFuture(
+ CompletableFuture<Void> cleanupAllDataFuture) {
+ this.cleanupAllDataFuture = cleanupAllDataFuture;
return this;
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java
index 8b6da28cd5f..61220aa2b0f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java
@@ -57,7 +57,7 @@ public class EmbeddedHaServicesTest extends TestLogger {
@After
public void teardownTest() throws Exception {
if (embeddedHaServices != null) {
- embeddedHaServices.closeAndCleanupAllData();
+ embeddedHaServices.closeWithOptionalClean(true);
embeddedHaServices = null;
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
index ee4aacd24eb..f9de159d016 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServicesTest.java
@@ -54,7 +54,7 @@ public class StandaloneHaServicesTest extends TestLogger {
@After
public void teardownTest() throws Exception {
if (standaloneHaServices != null) {
- standaloneHaServices.closeAndCleanupAllData();
+ standaloneHaServices.closeWithOptionalClean(true);
standaloneHaServices = null;
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
index ba7975c7b78..14b6c35c193 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
@@ -100,8 +100,7 @@ class ZooKeeperLeaderRetrievalTest {
@AfterEach
void after() throws Exception {
if (highAvailabilityServices != null) {
- highAvailabilityServices.closeAndCleanupAllData();
-
+ highAvailabilityServices.closeWithOptionalClean(true);
highAvailabilityServices = null;
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index ea65581e84d..e8e689deebb 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -147,7 +147,7 @@ class ResourceManagerTest {
}
if (highAvailabilityServices != null) {
- highAvailabilityServices.closeAndCleanupAllData();
+ highAvailabilityServices.closeWithOptionalClean(true);
}
if (testingFatalErrorHandler.hasExceptionOccurred()) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
index b1eeeb9438a..39ff43860ce 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
@@ -97,7 +97,7 @@ class TaskManagerRunnerConfigurationTest {
assertThat(taskManagerRpcService.getAddress()).isEqualTo(taskmanagerHost);
} finally {
maybeCloseRpcService(taskManagerRpcService);
- highAvailabilityServices.closeAndCleanupAllData();
+ highAvailabilityServices.closeWithOptionalClean(true);
}
}
@@ -115,7 +115,7 @@ class TaskManagerRunnerConfigurationTest {
assertThat(taskManagerRpcService.getAddress()).isNotNull().isNotEmpty();
} finally {
maybeCloseRpcService(taskManagerRpcService);
- highAvailabilityServices.closeAndCleanupAllData();
+ highAvailabilityServices.closeWithOptionalClean(true);
}
}
@@ -136,7 +136,7 @@ class TaskManagerRunnerConfigurationTest {
assertThat(taskManagerRpcService.getAddress()).matches(InetAddresses::isInetAddress);
} finally {
maybeCloseRpcService(taskManagerRpcService);
- highAvailabilityServices.closeAndCleanupAllData();
+ highAvailabilityServices.closeWithOptionalClean(true);
IOUtils.closeQuietly(testJobManagerSocket);
}
}
@@ -159,7 +159,7 @@ class TaskManagerRunnerConfigurationTest {
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid port range definition: -1");
} finally {
- highAvailabilityServices.closeAndCleanupAllData();
+ highAvailabilityServices.closeWithOptionalClean(true);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
index cb4a68e1300..9e937eb282b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
@@ -105,7 +105,7 @@ class TaskManagerRunnerStartupTest {
@AfterEach
void tearDownTest() throws Exception {
- highAvailabilityServices.closeAndCleanupAllData();
+ highAvailabilityServices.closeWithOptionalClean(true);
highAvailabilityServices = null;
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
index 6f6cce4cc08..28525dd9d55 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
@@ -406,7 +406,7 @@ class JobManagerHAProcessFailureRecoveryITCase {
}
if (highAvailabilityServices != null) {
- highAvailabilityServices.closeAndCleanupAllData();
+ highAvailabilityServices.closeWithOptionalClean(true);
}
RpcUtils.terminateRpcService(rpcService);
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 1007a2168f9..93b49a98662 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -247,7 +247,7 @@ class ProcessFailureCancelingITCase {
RpcUtils.terminateRpcService(rpcService);
- haServices.closeAndCleanupAllData();
+ haServices.closeWithOptionalClean(true);
}
}