This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 342fb54  replace BaseResources#deleteRecursive with 
MetadataStore#deleteRecursive (#13240)
342fb54 is described below

commit 342fb54a2cde98fac22b823026f8769f263e2a86
Author: JiangHaiting <[email protected]>
AuthorDate: Sun Dec 12 02:39:23 2021 +0800

    replace BaseResources#deleteRecursive with MetadataStore#deleteRecursive 
(#13240)
---
 .../pulsar/broker/resources/BaseResources.java     | 72 ----------------------
 .../broker/resources/NamespaceResources.java       | 50 ++-------------
 2 files changed, 5 insertions(+), 117 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
index a6972cd..12bb7ec 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
@@ -20,9 +20,6 @@ package org.apache.pulsar.broker.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Joiner;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -31,11 +28,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
-import org.apache.zookeeper.common.PathUtils;
 
 /**
  * Base class for all configuration resources to access configurations from 
metadata-store.
@@ -181,71 +176,4 @@ public class BaseResources<T> {
         Joiner.on('/').appendTo(sb, parts);
         return sb.toString();
     }
-
-    protected static CompletableFuture<Void> 
deleteRecursiveAsync(BaseResources resources, final String pathRoot) {
-        PathUtils.validatePath(pathRoot);
-
-        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-        listSubTreeBFSAsync(resources, pathRoot).whenComplete((tree, ex) -> {
-            if (ex == null) {
-                log.debug("Deleting {} with size {}", tree, tree.size());
-
-                final List<CompletableFuture<Void>> futures = new 
ArrayList<>();
-                for (int i = tree.size() - 1; i >= 0; --i) {
-                    // Delete the leaves first and eventually get rid of the 
root
-                    futures.add(resources.deleteAsync(tree.get(i)));
-                }
-
-                FutureUtil.waitForAll(futures).handle((result, exception) -> {
-                    if (exception != null) {
-                        log.error("Failed to remove partitioned topics", 
exception);
-                        return 
completableFuture.completeExceptionally(exception.getCause());
-                    }
-                    return completableFuture.complete(null);
-                });
-            } else {
-                log.warn("Failed to delete partitioned topics z-node [{}]", 
pathRoot, ex.getCause());
-            }
-        });
-
-        return completableFuture;
-    }
-
-    protected static CompletableFuture<List<String>> 
listSubTreeBFSAsync(BaseResources resources,
-            final String pathRoot) {
-        Deque<String> queue = new LinkedList<>();
-        List<String> tree = new ArrayList<>();
-        queue.add(pathRoot);
-        tree.add(pathRoot);
-        CompletableFuture<List<String>> completableFuture = new 
CompletableFuture<>();
-        final List<CompletableFuture<Void>> futures = new ArrayList<>();
-        for (int i = 0; i < queue.size(); i++) {
-            String node = queue.pollFirst();
-            if (node == null) {
-                break;
-            }
-            futures.add(resources.getChildrenAsync(node)
-                    .whenComplete((children, ex) -> {
-                        if (ex == null) {
-                            for (final String child : (List<String>) children) 
{
-                                final String childPath = node + "/" + child;
-                                queue.add(childPath);
-                                tree.add(childPath);
-                            }
-                        } else {
-                            log.warn("Failed to get data error from z-node 
[{}]", node);
-                        }
-                    }));
-        }
-
-        FutureUtil.waitForAll(futures).handle((result, exception) -> {
-            if (exception != null) {
-                log.error("Failed to get partitioned topics", exception);
-                return 
completableFuture.completeExceptionally(exception.getCause());
-            }
-            return completableFuture.complete(tree);
-        });
-
-        return completableFuture;
-    }
 }
\ No newline at end of file
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 5e47086..17c2427 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -261,25 +261,7 @@ public class NamespaceResources extends 
BaseResources<Policies> {
 
         public CompletableFuture<Void> 
clearPartitionedTopicMetadataAsync(NamespaceName namespaceName) {
             final String globalPartitionedPath = 
joinPath(PARTITIONED_TOPIC_PATH, namespaceName.toString());
-
-            CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-
-            deleteRecursiveAsync(this, globalPartitionedPath)
-                    .thenAccept(ignore -> {
-                        log.info("Clear partitioned topic metadata [{}] 
success.", namespaceName);
-                        completableFuture.complete(null);
-                    }).exceptionally(ex -> {
-                if (ex.getCause().getCause() instanceof 
KeeperException.NoNodeException) {
-                    completableFuture.complete(null);
-                } else {
-                    log.error("Clear partitioned topic metadata failed.");
-                    completableFuture.completeExceptionally(ex.getCause());
-                    return null;
-                }
-                return null;
-            });
-
-            return completableFuture;
+            return getStore().deleteRecursive(globalPartitionedPath);
         }
 
         public CompletableFuture<Void> clearPartitionedTopicTenantAsync(String 
tenant) {
@@ -298,38 +280,16 @@ public class NamespaceResources extends 
BaseResources<Policies> {
         }
     }
 
-    // clear resource of `/loadbalance/bundle-data/{tenant}/{namespace}/` for 
zk-node
+    // clear resource of `/loadbalance/bundle-data/{tenant}/{namespace}/` in 
metadata-store
     public CompletableFuture<Void> deleteBundleDataAsync(NamespaceName ns) {
         final String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, 
ns.toString());
-        CompletableFuture<Void> future = new CompletableFuture<Void>();
-        deleteRecursiveAsync(this, namespaceBundlePath).whenComplete((ignore, 
ex) -> {
-            if (ex instanceof MetadataStoreException.NotFoundException) {
-                future.complete(null);
-            } else if (ex != null) {
-                future.completeExceptionally(ex);
-            } else {
-                future.complete(null);
-            }
-        });
-
-        return future;
+        return getStore().deleteRecursive(namespaceBundlePath);
     }
 
-    // clear resource of `/loadbalance/bundle-data/{tenant}/` for zk-node
+    // clear resource of `/loadbalance/bundle-data/{tenant}/` in metadata-store
     public CompletableFuture<Void> deleteBundleDataTenantAsync(String tenant) {
         final String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, 
tenant);
-        CompletableFuture<Void> future = new CompletableFuture<Void>();
-        deleteRecursiveAsync(this, tenantBundlePath).whenComplete((ignore, ex) 
-> {
-            if (ex instanceof MetadataStoreException.NotFoundException) {
-                future.complete(null);
-            } else if (ex != null) {
-                future.completeExceptionally(ex);
-            } else {
-                future.complete(null);
-            }
-        });
-
-        return future;
+        return getStore().deleteRecursive(tenantBundlePath);
     }
 
 }
\ No newline at end of file

Reply via email to