This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 342fb54 replace BaseResources#deleteRecursive with
MetadataStore#deleteRecursive (#13240)
342fb54 is described below
commit 342fb54a2cde98fac22b823026f8769f263e2a86
Author: JiangHaiting <[email protected]>
AuthorDate: Sun Dec 12 02:39:23 2021 +0800
replace BaseResources#deleteRecursive with MetadataStore#deleteRecursive
(#13240)
---
.../pulsar/broker/resources/BaseResources.java | 72 ----------------------
.../broker/resources/NamespaceResources.java | 50 ++-------------
2 files changed, 5 insertions(+), 117 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
index a6972cd..12bb7ec 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
@@ -20,9 +20,6 @@ package org.apache.pulsar.broker.resources;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Joiner;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -31,11 +28,9 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
-import org.apache.zookeeper.common.PathUtils;
/**
* Base class for all configuration resources to access configurations from
metadata-store.
@@ -181,71 +176,4 @@ public class BaseResources<T> {
Joiner.on('/').appendTo(sb, parts);
return sb.toString();
}
-
- protected static CompletableFuture<Void>
deleteRecursiveAsync(BaseResources resources, final String pathRoot) {
- PathUtils.validatePath(pathRoot);
-
- CompletableFuture<Void> completableFuture = new CompletableFuture<>();
- listSubTreeBFSAsync(resources, pathRoot).whenComplete((tree, ex) -> {
- if (ex == null) {
- log.debug("Deleting {} with size {}", tree, tree.size());
-
- final List<CompletableFuture<Void>> futures = new
ArrayList<>();
- for (int i = tree.size() - 1; i >= 0; --i) {
- // Delete the leaves first and eventually get rid of the
root
- futures.add(resources.deleteAsync(tree.get(i)));
- }
-
- FutureUtil.waitForAll(futures).handle((result, exception) -> {
- if (exception != null) {
- log.error("Failed to remove partitioned topics",
exception);
- return
completableFuture.completeExceptionally(exception.getCause());
- }
- return completableFuture.complete(null);
- });
- } else {
- log.warn("Failed to delete partitioned topics z-node [{}]",
pathRoot, ex.getCause());
- }
- });
-
- return completableFuture;
- }
-
- protected static CompletableFuture<List<String>>
listSubTreeBFSAsync(BaseResources resources,
- final String pathRoot) {
- Deque<String> queue = new LinkedList<>();
- List<String> tree = new ArrayList<>();
- queue.add(pathRoot);
- tree.add(pathRoot);
- CompletableFuture<List<String>> completableFuture = new
CompletableFuture<>();
- final List<CompletableFuture<Void>> futures = new ArrayList<>();
- for (int i = 0; i < queue.size(); i++) {
- String node = queue.pollFirst();
- if (node == null) {
- break;
- }
- futures.add(resources.getChildrenAsync(node)
- .whenComplete((children, ex) -> {
- if (ex == null) {
- for (final String child : (List<String>) children)
{
- final String childPath = node + "/" + child;
- queue.add(childPath);
- tree.add(childPath);
- }
- } else {
- log.warn("Failed to get data error from z-node
[{}]", node);
- }
- }));
- }
-
- FutureUtil.waitForAll(futures).handle((result, exception) -> {
- if (exception != null) {
- log.error("Failed to get partitioned topics", exception);
- return
completableFuture.completeExceptionally(exception.getCause());
- }
- return completableFuture.complete(tree);
- });
-
- return completableFuture;
- }
}
\ No newline at end of file
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 5e47086..17c2427 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -261,25 +261,7 @@ public class NamespaceResources extends
BaseResources<Policies> {
public CompletableFuture<Void>
clearPartitionedTopicMetadataAsync(NamespaceName namespaceName) {
final String globalPartitionedPath =
joinPath(PARTITIONED_TOPIC_PATH, namespaceName.toString());
-
- CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
-
- deleteRecursiveAsync(this, globalPartitionedPath)
- .thenAccept(ignore -> {
- log.info("Clear partitioned topic metadata [{}]
success.", namespaceName);
- completableFuture.complete(null);
- }).exceptionally(ex -> {
- if (ex.getCause().getCause() instanceof
KeeperException.NoNodeException) {
- completableFuture.complete(null);
- } else {
- log.error("Clear partitioned topic metadata failed.");
- completableFuture.completeExceptionally(ex.getCause());
- return null;
- }
- return null;
- });
-
- return completableFuture;
+ return getStore().deleteRecursive(globalPartitionedPath);
}
public CompletableFuture<Void> clearPartitionedTopicTenantAsync(String
tenant) {
@@ -298,38 +280,16 @@ public class NamespaceResources extends
BaseResources<Policies> {
}
}
- // clear resource of `/loadbalance/bundle-data/{tenant}/{namespace}/` for
zk-node
+ // clear resource of `/loadbalance/bundle-data/{tenant}/{namespace}/` in
metadata-store
public CompletableFuture<Void> deleteBundleDataAsync(NamespaceName ns) {
final String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH,
ns.toString());
- CompletableFuture<Void> future = new CompletableFuture<Void>();
- deleteRecursiveAsync(this, namespaceBundlePath).whenComplete((ignore,
ex) -> {
- if (ex instanceof MetadataStoreException.NotFoundException) {
- future.complete(null);
- } else if (ex != null) {
- future.completeExceptionally(ex);
- } else {
- future.complete(null);
- }
- });
-
- return future;
+ return getStore().deleteRecursive(namespaceBundlePath);
}
- // clear resource of `/loadbalance/bundle-data/{tenant}/` for zk-node
+ // clear resource of `/loadbalance/bundle-data/{tenant}/` in metadata-store
public CompletableFuture<Void> deleteBundleDataTenantAsync(String tenant) {
final String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH,
tenant);
- CompletableFuture<Void> future = new CompletableFuture<Void>();
- deleteRecursiveAsync(this, tenantBundlePath).whenComplete((ignore, ex)
-> {
- if (ex instanceof MetadataStoreException.NotFoundException) {
- future.complete(null);
- } else if (ex != null) {
- future.completeExceptionally(ex);
- } else {
- future.complete(null);
- }
- });
-
- return future;
+ return getStore().deleteRecursive(tenantBundlePath);
}
}
\ No newline at end of file