This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 593cc139ab3 Revert "[FLINK-30513][runtime] Cleanup HA storage path on
cluster termination"
593cc139ab3 is described below
commit 593cc139ab30bb81ab38d94b7b697d00eaaecada
Author: Weijie Guo <[email protected]>
AuthorDate: Fri Mar 10 02:13:36 2023 +0800
Revert "[FLINK-30513][runtime] Cleanup HA storage path on cluster
termination"
This reverts commit 13779ab8e4f5539ca311d9f233d031d818af6450.
---
.../highavailability/AbstractHaServices.java | 33 +++++-----------------
.../highavailability/AbstractHaServicesTest.java | 30 +++++++-------------
.../ZooKeeperLeaderRetrievalTest.java | 5 ----
3 files changed, 17 insertions(+), 51 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
index cbc48a268f7..feb8c2f0e87 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
@@ -20,8 +20,6 @@ package org.apache.flink.runtime.highavailability;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -191,26 +189,16 @@ public abstract class AbstractHaServices implements
HighAvailabilityServices {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
- if (deletedHAData) {
- try {
+ try {
+ if (deletedHAData) {
blobStoreService.closeAndCleanupAllData();
- } catch (Throwable t) {
- exception = ExceptionUtils.firstOrSuppressed(t, exception);
- }
-
- try {
- cleanupClusterHaStoragePath();
- } catch (Throwable t) {
- exception = ExceptionUtils.firstOrSuppressed(t, exception);
- }
- } else {
- logger.info(
- "Cannot delete HA blobs because we failed to delete the
pointers in the HA store.");
- try {
+ } else {
+ logger.info(
+ "Cannot delete HA blobs because we failed to delete
the pointers in the HA store.");
blobStoreService.close();
- } catch (Throwable t) {
- exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
if (exception != null) {
@@ -237,13 +225,6 @@ public abstract class AbstractHaServices implements
HighAvailabilityServices {
executor);
}
- protected void cleanupClusterHaStoragePath() throws Exception {
- final Path clusterHaStoragePath =
-
HighAvailabilityServicesUtils.getClusterHighAvailableStoragePath(configuration);
- final FileSystem fileSystem = clusterHaStoragePath.getFileSystem();
- fileSystem.delete(clusterHaStoragePath, true);
- }
-
private LeaderElectionService createLeaderElectionService(String
leaderName) {
return new
DefaultLeaderElectionService(createLeaderElectionDriverFactory(leaderName));
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
index c5dbf1bb0d1..90a642f325d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
@@ -48,12 +48,11 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
class AbstractHaServicesTest {
/**
- * Tests that we first delete all pointers from the HA services before
deleting the blobs and HA
- * storage path. See FLINK-22014 for more details.
+ * Tests that we first delete all pointers from the HA services before
deleting the blobs. See
+ * FLINK-22014 for more details.
*/
@Test
- void
testCloseAndCleanupAllDataDeletesBlobsAndHaStoragePathAfterCleaningUpHAData()
- throws Exception {
+ void testCloseAndCleanupAllDataDeletesBlobsAfterCleaningUpHAData() throws
Exception {
final Queue<CloseOperations> closeOperations = new ArrayDeque<>(3);
final TestingBlobStoreService testingBlobStoreService =
@@ -70,20 +69,18 @@ class AbstractHaServicesTest {
haServices.closeAndCleanupAllData();
- assertThat(closeOperations)
- .contains(
- CloseOperations.HA_CLEANUP,
- CloseOperations.HA_CLOSE,
- CloseOperations.BLOB_CLEANUP_AND_CLOSE,
- CloseOperations.HA_STORAGE_PATH_CLEANUP);
+ assertThat(closeOperations).contains(
+ CloseOperations.HA_CLEANUP,
+ CloseOperations.HA_CLOSE,
+ CloseOperations.BLOB_CLEANUP_AND_CLOSE);
}
/**
- * Tests that we don't delete the HA blobs and HA storage path if we could
not clean up the
- * pointers from the HA services. See FLINK-22014 for more details.
+ * Tests that we don't delete the HA blobs if we could not clean up the
pointers from the HA
+ * services. See FLINK-22014 for more details.
*/
@Test
- void
testCloseAndCleanupAllDataDoesNotDeleteBlobsAndHaStoragePathIfCleaningUpHADataFails()
{
+ void testCloseAndCleanupAllDataDoesNotDeleteBlobsIfCleaningUpHADataFails()
{
final Queue<CloseOperations> closeOperations = new ArrayDeque<>(3);
final TestingBlobStoreService testingBlobStoreService =
@@ -102,7 +99,6 @@ class AbstractHaServicesTest {
assertThatThrownBy(haServices::closeAndCleanupAllData).isInstanceOf(FlinkException.class);
assertThat(closeOperations).contains(CloseOperations.HA_CLOSE,
CloseOperations.BLOB_CLOSE);
-
assertThat(closeOperations).doesNotContain(CloseOperations.HA_STORAGE_PATH_CLEANUP);
}
@Test
@@ -133,7 +129,6 @@ class AbstractHaServicesTest {
HA_CLOSE,
BLOB_CLEANUP_AND_CLOSE,
BLOB_CLOSE,
- HA_STORAGE_PATH_CLEANUP,
}
private static final class TestingBlobStoreService implements
BlobStoreService {
@@ -239,11 +234,6 @@ class AbstractHaServicesTest {
internalJobCleanupConsumer.accept(jobID);
}
- @Override
- protected void cleanupClusterHaStoragePath() {
- closeOperations.offer(CloseOperations.HA_STORAGE_PATH_CLEANUP);
- }
-
@Override
protected String getLeaderPathForResourceManager() {
throw new UnsupportedOperationException("Not supported by this
test implementation.");
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
index a3014c08d8c..78f159cb364 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
@@ -40,7 +40,6 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.net.InetAddress;
@@ -49,7 +48,6 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.UnknownHostException;
-import java.nio.file.Path;
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
@@ -68,8 +66,6 @@ class ZooKeeperLeaderRetrievalTest {
private final TestingFatalErrorHandlerExtension
testingFatalErrorHandlerResource =
new TestingFatalErrorHandlerExtension();
- @TempDir private Path tempDir;
-
private TestingServer testingServer;
private Configuration config;
@@ -84,7 +80,6 @@ class ZooKeeperLeaderRetrievalTest {
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
config.setString(
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
testingServer.getConnectString());
- config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
tempDir.toString());
highAvailabilityServices =
new ZooKeeperMultipleComponentLeaderElectionHaServices(