This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d23c5ed011f5f9d39834cbb2a9d700a4d9ed4b0e Author: Matteo Merli <[email protected]> AuthorDate: Thu Jun 24 18:58:37 2021 +0200 Made the PulsarClusterMetadataTeardown deletes idempotent (#11042) ### Motivation If some of the z-nodes are not there anymore, the cluster teardown operation should not fail. Also changed the delete recursive operation to delete in parallel for children of same node, to speed up the overall execution time. (cherry picked from commit 7efabc4909e3a4a70d56a8e790443f3b349a1a89) --- .../pulsar/PulsarClusterMetadataTeardown.java | 27 +++++++++++++++------- .../cli/ClusterMetadataTearDownTest.java | 3 +++ 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java index 53fff01..c4c2c18 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java @@ -22,6 +22,8 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.google.protobuf.InvalidProtocolBufferException; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import lombok.Cleanup; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -31,6 +33,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.MetadataStoreFactory; @@ -100,7 +103,7 @@ public class PulsarClusterMetadataTeardown { } for (String localZkNode : localZkNodes) { - deleteRecursively(metadataStore, "/" + localZkNode); + deleteRecursively(metadataStore, "/" + localZkNode).join(); } if (arguments.configurationStore != null && arguments.cluster != null) { @@ -108,18 +111,26 @@ public class PulsarClusterMetadataTeardown { @Cleanup MetadataStore configMetadataStore = MetadataStoreFactory.create(arguments.configurationStore, MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis).build()); - deleteRecursively(configMetadataStore, "/admin/clusters/" + arguments.cluster); + deleteRecursively(configMetadataStore, "/admin/clusters/" + arguments.cluster).join(); } log.info("Cluster metadata for '{}' teardown.", arguments.cluster); } - private static void deleteRecursively(MetadataStore metadataStore, String path){ - metadataStore.getChildren(path).join().forEach(child -> { - deleteRecursively(metadataStore, path + "/" + child); - }); - - metadataStore.delete(path, Optional.empty()).join(); + private static CompletableFuture<Void> deleteRecursively(MetadataStore metadataStore, String path) { + return metadataStore.getChildren(path) + .thenCompose(children -> FutureUtil.waitForAll( + children.stream() + .map(child -> deleteRecursively(metadataStore, path + "/" + child)) + .collect(Collectors.toList()))) + .thenCompose(__ -> metadataStore.exists(path)) + .thenCompose(exists -> { + if (exists) { + return metadataStore.delete(path, Optional.empty()); + } else { + return CompletableFuture.completedFuture(null); + } + }); } private static void deleteLedger(BookKeeper bookKeeper, long ledgerId) { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java index a60ecc2..8aeb4bf 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java @@ -199,6 +199,9 @@ public class ClusterMetadataTearDownTest extends TestRetrySupport { } List<String> clusterNodes = configStore.getChildren( "/admin/clusters").join(); assertFalse(clusterNodes.contains(pulsarCluster.getClusterName())); + + // Try delete again, should not fail + PulsarClusterMetadataTeardown.main(args); } private long getNumOfLedgers() {
