This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d23c5ed011f5f9d39834cbb2a9d700a4d9ed4b0e
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Jun 24 18:58:37 2021 +0200

    Made the PulsarClusterMetadataTeardown deletes idempotent (#11042)
    
    ### Motivation
    
    If some of the z-nodes are not there anymore, the cluster teardown 
operation should not fail.
    
    Also changed the delete recursive operation to delete in parallel for 
children of same node, to speed up the overall execution time.
    
    (cherry picked from commit 7efabc4909e3a4a70d56a8e790443f3b349a1a89)
---
 .../pulsar/PulsarClusterMetadataTeardown.java      | 27 +++++++++++++++-------
 .../cli/ClusterMetadataTearDownTest.java           |  3 +++
 2 files changed, 22 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
index 53fff01..c4c2c18 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
@@ -22,6 +22,8 @@ import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -31,6 +33,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
@@ -100,7 +103,7 @@ public class PulsarClusterMetadataTeardown {
         }
 
         for (String localZkNode : localZkNodes) {
-            deleteRecursively(metadataStore, "/" + localZkNode);
+            deleteRecursively(metadataStore, "/" + localZkNode).join();
         }
 
         if (arguments.configurationStore != null && arguments.cluster != null) 
{
@@ -108,18 +111,26 @@ public class PulsarClusterMetadataTeardown {
             @Cleanup
             MetadataStore configMetadataStore = 
MetadataStoreFactory.create(arguments.configurationStore,
                     
MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis).build());
-            deleteRecursively(configMetadataStore, "/admin/clusters/" + 
arguments.cluster);
+            deleteRecursively(configMetadataStore, "/admin/clusters/" + 
arguments.cluster).join();
         }
 
         log.info("Cluster metadata for '{}' teardown.", arguments.cluster);
     }
 
-    private static void deleteRecursively(MetadataStore metadataStore, String 
path){
-        metadataStore.getChildren(path).join().forEach(child -> {
-            deleteRecursively(metadataStore, path + "/" + child);
-        });
-
-        metadataStore.delete(path, Optional.empty()).join();
+    private static CompletableFuture<Void> deleteRecursively(MetadataStore 
metadataStore, String path) {
+        return metadataStore.getChildren(path)
+                .thenCompose(children -> FutureUtil.waitForAll(
+                        children.stream()
+                                .map(child -> deleteRecursively(metadataStore, 
path + "/" + child))
+                                .collect(Collectors.toList())))
+                .thenCompose(__ -> metadataStore.exists(path))
+                .thenCompose(exists -> {
+                    if (exists) {
+                        return metadataStore.delete(path, Optional.empty());
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                });
     }
 
     private static void deleteLedger(BookKeeper bookKeeper, long ledgerId) {
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java
index a60ecc2..8aeb4bf 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java
@@ -199,6 +199,9 @@ public class ClusterMetadataTearDownTest extends 
TestRetrySupport {
         }
         List<String> clusterNodes = configStore.getChildren( 
"/admin/clusters").join();
         assertFalse(clusterNodes.contains(pulsarCluster.getClusterName()));
+
+        // Try delete again, should not fail
+        PulsarClusterMetadataTeardown.main(args);
     }
 
     private long getNumOfLedgers() {

Reply via email to