gaoran10 commented on a change in pull request #12711:
URL: https://github.com/apache/pulsar/pull/12711#discussion_r747633513
##########
File path:
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
##########
@@ -194,6 +197,38 @@ protected static void deleteRecursive(BaseResources
resources, final String path
}
}
+ protected static CompletableFuture<Void>
deleteRecursiveAsync(BaseResources resources, final String pathRoot) {
+ PathUtils.validatePath(pathRoot);
+ List<String> tree = null;
+ try {
+ tree = listSubTreeBFS(resources, pathRoot);
+ } catch (MetadataStoreException e) {
+
+ }
+
+ if (tree != null) {
+ log.debug("Deleting {} with size {}", tree, tree.size());
+ log.debug("Deleting " + tree.size() + " subnodes ");
Review comment:
Please check this log.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
##########
@@ -484,42 +474,39 @@ protected void
internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
asyncResponse.resume(new
RestException((PulsarAdminException) exception.getCause()));
return null;
} else {
- log.error("[{}] Failed to remove owned namespace {}",
clientAppId(), namespaceName, exception);
+ log.error("[{}] Failed to remove forcefully owned
namespace {}",
+ clientAppId(), namespaceName, exception);
asyncResponse.resume(new
RestException(exception.getCause()));
return null;
}
}
- try {
- // remove partitioned topics znode
-
pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
- .clearPartitionedTopicMetadata(namespaceName);
-
- try {
-
pulsar().getPulsarResources().getTopicResources().clearDomainPersistence(namespaceName).get();
-
pulsar().getPulsarResources().getTopicResources().clearNamespacePersistence(namespaceName).get();
- } catch (ExecutionException | InterruptedException e) {
- // warn level log here since this failure has no side
effect besides left a un-used metadata
- // and also will not affect the re-creation of namespace
- log.warn("[{}] Failed to remove managed-ledger for {}",
clientAppId(), namespaceName, e);
- }
-
- // we have successfully removed all the ownership for the
namespace, the policies znode can be deleted
- // now
- namespaceResources().deletePolicies(namespaceName);
-
- try {
- getLocalPolicies().deleteLocalPolicies(namespaceName);
- } catch (NotFoundException nne) {
- // If the z-node with the modified information is not
there anymore, we're already good
- }
- } catch (Exception e) {
- log.error("[{}] Failed to remove owned namespace {} from ZK",
clientAppId(), namespaceName, e);
- asyncResponse.resume(new RestException(e));
- return null;
- }
+ // clear resource of `/namespace/{namespaceName}` for zk-node
+ namespaceResources().deleteNamespaceAsync(namespaceName)
+ .thenCompose(ignore ->
namespaceResources().getPartitionedTopicResources()
+ .clearPartitionedTopicMetadataAsync(namespaceName))
+ // clear resource for manager-ledger z-node
+ .thenCompose(ignore ->
pulsar().getPulsarResources().getTopicResources()
+ .clearDomainPersistence(namespaceName))
+ .thenCompose(ignore ->
pulsar().getPulsarResources().getTopicResources()
+ .clearNamespacePersistence(namespaceName))
+ // we have successfully removed all the ownership for the
namespace, the policies
+ // z-node can be deleted now
+ .thenCompose(ignore ->
namespaceResources().deletePoliciesAsync(namespaceName))
+ // clear z-node of local policies
+ .thenCompose(ignore ->
getLocalPolicies().deleteLocalPoliciesAsync(namespaceName))
+ .whenComplete((ignore, ex) -> {
+ if (ex != null) {
+ log.warn("[{}] Failed to force remove namespace or
managed-ledger for {}",
+ clientAppId(), namespaceName, ex);
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ log.info("[{}] Remove forcefully namespace or
managed-ledger successfully {}",
+ clientAppId(), namespaceName);
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
Review comment:
It seems that this code block was presented twice, could we add a method
for this code block and reuse it.
##########
File path:
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
##########
@@ -194,6 +197,38 @@ protected static void deleteRecursive(BaseResources
resources, final String path
}
}
+ protected static CompletableFuture<Void>
deleteRecursiveAsync(BaseResources resources, final String pathRoot) {
+ PathUtils.validatePath(pathRoot);
+ List<String> tree = null;
+ try {
+ tree = listSubTreeBFS(resources, pathRoot);
+ } catch (MetadataStoreException e) {
+
+ }
+
+ if (tree != null) {
+ log.debug("Deleting {} with size {}", tree, tree.size());
+ log.debug("Deleting " + tree.size() + " subnodes ");
+
+ final List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (int i = tree.size() - 1; i >= 0; --i) {
+ // Delete the leaves first and eventually get rid of the root
+ futures.add(resources.deleteAsync(tree.get(i)));
+ }
+
+ FutureUtil.waitForAll(futures).handle((result, exception) -> {
+ if (exception != null) {
+ log.error("Failed to remove partitioned topics",
exception);
+ return null;
+ }
+ Response.noContent().build();
Review comment:
It seems that this statement is useless.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]