This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new fa90384b161 fix znode leakage caused by deleting tenant #12711#12972 
(#18698)
fa90384b161 is described below

commit fa90384b1613cfe361927acf25ee32349e7513ec
Author: congbo <[email protected]>
AuthorDate: Fri Dec 9 10:19:42 2022 +0800

    fix znode leakage caused by deleting tenant #12711#12972 (#18698)
---
 .../pulsar/broker/resources/BaseResources.java     | 36 ++++++++-
 .../broker/resources/LocalPoliciesResources.java   | 24 ++++++
 .../broker/resources/NamespaceResources.java       | 48 +++++++++--
 .../pulsar/broker/resources/TenantResources.java   | 11 +++
 .../pulsar/broker/resources/TopicResources.java    | 27 +++++++
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 92 ++++++++-------------
 .../pulsar/broker/admin/impl/TenantsBase.java      | 83 +++++++++----------
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 93 ++++++++++++++++++++++
 8 files changed, 304 insertions(+), 110 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
index 839011cc95f..b0f0d9c9efa 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
@@ -26,9 +26,10 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
-import com.google.common.base.Joiner;
 import lombok.Getter;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -151,6 +152,23 @@ public class BaseResources<T> {
         return cache.delete(path);
     }
 
+    protected CompletableFuture<Void> deleteIfExistsAsync(String path) {
+        CompletableFuture<Void> future = new CompletableFuture<Void>();
+        deleteAsync(path).whenComplete((ignore, ex) -> {
+            if (ex != null) {
+                if (ex.getCause() instanceof 
MetadataStoreException.NotFoundException) {
+                    // if not found, this path has been deleted
+                    future.complete(null);
+                } else {
+                    future.completeExceptionally(ex);
+                }
+            } else {
+                future.complete(null);
+            }
+        });
+        return future;
+    }
+
     public boolean exists(String path) throws MetadataStoreException {
         try {
             return existsAsync(path).get(operationTimeoutSec, 
TimeUnit.SECONDS);
@@ -175,4 +193,20 @@ public class BaseResources<T> {
         Joiner.on('/').appendTo(sb, parts);
         return sb.toString();
     }
+
+    protected CompletableFuture<Void> deleteRecursiveAsync(String path) {
+        return getChildrenAsync(path)
+                .thenCompose(children -> FutureUtil.waitForAll(
+                        children.stream()
+                                .map(child -> deleteRecursiveAsync(path + "/" 
+ child))
+                                .collect(Collectors.toList())))
+                .thenCompose(__ -> existsAsync(path))
+                .thenCompose(exists -> {
+                    if (exists) {
+                        return deleteAsync(path);
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                });
+    }
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
index ebaa38e6b10..31ee471ec38 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
@@ -18,12 +18,36 @@
  */
 package org.apache.pulsar.broker.resources;
 
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import java.util.concurrent.CompletableFuture;
 
 public class LocalPoliciesResources extends BaseResources<LocalPolicies> {
 
+    public static final String LOCAL_POLICIES_ROOT = "/admin/local-policies";
+
     public LocalPoliciesResources(MetadataStoreExtended configurationStore, 
int operationTimeoutSec) {
         super(configurationStore, LocalPolicies.class, operationTimeoutSec);
     }
+
+    public CompletableFuture<Void> deleteLocalPoliciesAsync(NamespaceName ns) {
+        CompletableFuture<Void> completableFuture = 
deleteIfExistsAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString()));
+        // in order to delete the cluster for namespace v1
+        if (ns.getCluster() != null) {
+            String clusterPath = joinPath(LOCAL_POLICIES_ROOT, ns.getTenant(), 
ns.getCluster());
+            return getChildrenAsync(clusterPath).thenCompose(nss -> {
+                if (nss.isEmpty()) {
+                    return deleteIfExistsAsync(clusterPath);
+                }
+                return completableFuture;
+            });
+        } else {
+            return completableFuture;
+        }
+    }
+
+    public CompletableFuture<Void> deleteLocalPoliciesTenantAsync(String 
tenant) {
+        return deleteIfExistsAsync(joinPath(LOCAL_POLICIES_ROOT, tenant));
+    }
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index f4d876d2534..f9fb3eee802 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -22,9 +22,8 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-
 import lombok.Getter;
-
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -35,10 +34,12 @@ import 
org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
 @Getter
+@Slf4j
 public class NamespaceResources extends BaseResources<Policies> {
-    private IsolationPolicyResources isolationPolicies;
-    private PartitionedTopicResources partitionedTopicResources;
-    private MetadataStoreExtended configurationStore;
+    private final IsolationPolicyResources isolationPolicies;
+    private final PartitionedTopicResources partitionedTopicResources;
+    private final MetadataStoreExtended configurationStore;
+    private static final String NAMESPACE_BASE_PATH = "/namespace";
 
     public NamespaceResources(MetadataStoreExtended configurationStore, int 
operationTimeoutSec) {
         super(configurationStore, Policies.class, operationTimeoutSec);
@@ -47,10 +48,25 @@ public class NamespaceResources extends 
BaseResources<Policies> {
         partitionedTopicResources = new 
PartitionedTopicResources(configurationStore, operationTimeoutSec);
     }
 
+    public CompletableFuture<Void> deletePoliciesAsync(NamespaceName ns){
+        return deleteIfExistsAsync(joinPath(BASE_POLICIES_PATH, 
ns.toString()));
+    }
     public CompletableFuture<Optional<Policies>> 
getPoliciesAsync(NamespaceName ns) {
         return getCache().get(joinPath(BASE_POLICIES_PATH, ns.toString()));
     }
 
+    // clear resource of `/namespace/{namespaceName}` for zk-node
+    public CompletableFuture<Void> deleteNamespaceAsync(NamespaceName ns) {
+        final String namespacePath = joinPath(NAMESPACE_BASE_PATH, 
ns.toString());
+        return deleteIfExistsAsync(namespacePath);
+    }
+
+    // clear resource of `/namespace/{tenant}` for zk-node
+    public CompletableFuture<Void> deleteTenantAsync(String tenant) {
+        final String tenantPath = joinPath(NAMESPACE_BASE_PATH, tenant);
+        return deleteIfExistsAsync(tenantPath);
+    }
+
     public static class IsolationPolicyResources extends 
BaseResources<Map<String, NamespaceIsolationDataImpl>> {
         public IsolationPolicyResources(MetadataStoreExtended store, int 
operationTimeoutSec) {
             super(store, new TypeReference<Map<String, 
NamespaceIsolationDataImpl>>() {
@@ -74,5 +90,27 @@ public class NamespaceResources extends 
BaseResources<Policies> {
             return createAsync(joinPath(PARTITIONED_TOPIC_PATH, 
tn.getNamespace(), tn.getDomain().value(),
                     tn.getEncodedLocalName()), tm);
         }
+
+        public CompletableFuture<Void> 
clearPartitionedTopicMetadataAsync(NamespaceName namespaceName) {
+            final String globalPartitionedPath = 
joinPath(PARTITIONED_TOPIC_PATH, namespaceName.toString());
+
+            CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
+
+            deleteRecursiveAsync(globalPartitionedPath)
+                    .thenAccept(ignore -> {
+                        log.info("Clear partitioned topic metadata [{}] 
success.", namespaceName);
+                        completableFuture.complete(null);
+                    }).exceptionally(ex -> {
+                        log.error("Clear partitioned topic metadata failed.");
+                        completableFuture.completeExceptionally(ex.getCause());
+                        return null;
+                    });
+
+            return completableFuture;
+        }
+
+        public CompletableFuture<Void> clearPartitionedTopicTenantAsync(String 
tenant) {
+            return deleteIfExistsAsync(joinPath(PARTITIONED_TOPIC_PATH, 
tenant));
+        }
     }
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
index 78d80b9d6f7..90a15cd6bdd 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
@@ -20,7 +20,10 @@ package org.apache.pulsar.broker.resources;
 
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
 public class TenantResources extends BaseResources<TenantInfo> {
@@ -31,4 +34,12 @@ public class TenantResources extends 
BaseResources<TenantInfo> {
     public CompletableFuture<Optional<TenantInfo>> getTenantAsync(String 
tenantName) {
         return getAsync(joinPath(BASE_POLICIES_PATH, tenantName));
     }
+
+    public CompletableFuture<Void> deleteTenantAsync(String tenantName) {
+        return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenantName))
+                .thenCompose(clusters -> 
FutureUtil.waitForAll(clusters.stream()
+                        .map(cluster -> 
getCache().delete(joinPath(BASE_POLICIES_PATH, tenantName, cluster)))
+                        .collect(Collectors.toList()))
+                ).thenCompose(__ -> 
deleteIfExistsAsync(joinPath(BASE_POLICIES_PATH, tenantName)));
+    }
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
index d25b308d086..4fe3ba68721 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.metadata.api.MetadataStore;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
@@ -50,4 +51,30 @@ public class TopicResources {
                         .collect(Collectors.toList())
         );
     }
+
+    public CompletableFuture<Void> clearNamespacePersistence(NamespaceName ns) 
{
+        String path = MANAGED_LEDGER_PATH + "/" + ns;
+        return deleteIfExistsAsync(path);
+    }
+
+    public CompletableFuture<Void> clearDomainPersistence(NamespaceName ns) {
+        String path = MANAGED_LEDGER_PATH + "/" + ns + "/persistent";
+        return deleteIfExistsAsync(path);
+    }
+
+    public CompletableFuture<Void> clearTenantPersistence(String tenant) {
+        String path = MANAGED_LEDGER_PATH + "/" + tenant;
+        return deleteIfExistsAsync(path);
+    }
+
+    private CompletableFuture<Void> deleteIfExistsAsync(String path) {
+        return store.exists(path)
+                .thenCompose(exists -> {
+                    if (exists) {
+                        return store.delete(path, Optional.empty());
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                });
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0b0d02b7a6b..e7857e58240 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -52,6 +52,7 @@ import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.mutable.MutableObject;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -109,9 +110,8 @@ import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+@Slf4j
 public abstract class NamespacesBase extends AdminResource {
 
     protected List<String> internalGetTenantNamespaces(String tenant) {
@@ -163,14 +163,14 @@ public abstract class NamespacesBase extends 
AdminResource {
 
     protected void internalDeleteNamespace(AsyncResponse asyncResponse, 
boolean authoritative, boolean force) {
         if (force) {
-            internalDeleteNamespaceForcefully(asyncResponse, authoritative);
+            internalDeleteNamespaceForcefully(asyncResponse);
         } else {
-            internalDeleteNamespace(asyncResponse, authoritative);
+            internalDeleteNamespace(asyncResponse);
         }
     }
 
     @SuppressWarnings("deprecation")
-    protected void internalDeleteNamespace(AsyncResponse asyncResponse, 
boolean authoritative) {
+    protected void internalDeleteNamespace(AsyncResponse asyncResponse) {
         validateTenantOperation(namespaceName.getTenant(), 
TenantOperation.DELETE_NAMESPACE);
         validatePoliciesReadOnlyAccess();
 
@@ -308,14 +308,7 @@ public abstract class NamespacesBase extends AdminResource 
{
                         return FutureUtil.waitForAll(deleteBundleFutures);
                     });
         })
-        .thenCompose(__ -> {
-            // we have successfully removed all the ownership for the 
namespace, the policies znode can be deleted
-            // now
-            final String globalZkPolicyPath = path(POLICIES, 
namespaceName.toString());
-            final String localZkPolicyPath = joinPath(LOCAL_POLICIES_ROOT, 
namespaceName.toString());
-            return namespaceResources().deleteAsync(globalZkPolicyPath)
-                    .thenCompose((ignore -> 
getLocalPolicies().deleteAsync(localZkPolicyPath)));
-        })
+        .thenCompose(__ -> internalClearZkSources())
         .thenAccept(__ -> {
             log.info("[{}] Remove namespace successfully {}", clientAppId(), 
namespaceName);
             asyncResponse.resume(Response.noContent().build());
@@ -327,8 +320,26 @@ public abstract class NamespacesBase extends AdminResource 
{
         });
     }
 
+    // clear zk-node resources for deleting namespace
+    protected CompletableFuture<Void> internalClearZkSources() {
+        // clear resource of `/namespace/{namespaceName}` for zk-node
+        return namespaceResources().deleteNamespaceAsync(namespaceName)
+                .thenCompose(ignore -> 
namespaceResources().getPartitionedTopicResources()
+                        .clearPartitionedTopicMetadataAsync(namespaceName))
+                // clear resource for manager-ledger z-node
+                .thenCompose(ignore -> 
pulsar().getPulsarResources().getTopicResources()
+                        .clearDomainPersistence(namespaceName))
+                .thenCompose(ignore -> 
pulsar().getPulsarResources().getTopicResources()
+                        .clearNamespacePersistence(namespaceName))
+                // we have successfully removed all the ownership for the 
namespace, the policies
+                // z-node can be deleted now
+                .thenCompose(ignore -> 
namespaceResources().deletePoliciesAsync(namespaceName))
+                // clear z-node of local policies
+                .thenCompose(ignore -> 
getLocalPolicies().deleteLocalPoliciesAsync(namespaceName));
+    }
+
     @SuppressWarnings("deprecation")
-    protected void internalDeleteNamespaceForcefully(AsyncResponse 
asyncResponse, boolean authoritative) {
+    protected void internalDeleteNamespaceForcefully(AsyncResponse 
asyncResponse) {
         validateTenantOperation(namespaceName.getTenant(), 
TenantOperation.DELETE_NAMESPACE);
         validatePoliciesReadOnlyAccess();
 
@@ -463,51 +474,21 @@ public abstract class NamespacesBase extends 
AdminResource {
                 }
             }
         } catch (Exception e) {
-            log.error("[{}] Failed to remove owned namespace {}", 
clientAppId(), namespaceName, e);
+            log.error("[{}] Failed to remove forcefully owned namespace {}", 
clientAppId(), namespaceName, e);
             asyncResponse.resume(new RestException(e));
             return;
         }
 
-        FutureUtil.waitForAll(futures).handle((result, exception) -> {
-            if (exception != null) {
-                if (exception.getCause() instanceof PulsarAdminException) {
-                    asyncResponse.resume(new 
RestException((PulsarAdminException) exception.getCause()));
-                    return null;
-                } else {
-                    log.error("[{}] Failed to remove owned namespace {}", 
clientAppId(), namespaceName, exception);
-                    asyncResponse.resume(new 
RestException(exception.getCause()));
+        FutureUtil.waitForAll(futures).thenCompose(__ -> 
internalClearZkSources())
+                .thenAccept(__ -> {
+                    log.info("[{}] Remove namespace successfully {}", 
clientAppId(), namespaceName);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to remove namespace {}", 
clientAppId(), namespaceName, ex.getCause());
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
                     return null;
-                }
-            }
-
-            try {
-                // remove partitioned topics znode
-                final String globalPartitionedPath = 
path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString());
-                // check whether partitioned topics znode exist
-                if (namespaceResources().exists(globalPartitionedPath)) {
-                    deleteRecursive(namespaceResources(), 
globalPartitionedPath);
-                }
-
-                // we have successfully removed all the ownership for the 
namespace, the policies znode can be deleted
-                // now
-                final String globalZkPolicyPath = path(POLICIES, 
namespaceName.toString());
-                final String localZkPolicyPath = joinPath(LOCAL_POLICIES_ROOT, 
namespaceName.toString());
-                namespaceResources().delete(globalZkPolicyPath);
-
-                try {
-                    getLocalPolicies().delete(localZkPolicyPath);
-                } catch (NotFoundException nne) {
-                    // If the z-node with the modified information is not 
there anymore, we're already good
-                }
-            } catch (Exception e) {
-                log.error("[{}] Failed to remove owned namespace {} from ZK", 
clientAppId(), namespaceName, e);
-                asyncResponse.resume(new RestException(e));
-                return null;
-            }
-
-            asyncResponse.resume(Response.noContent().build());
-            return null;
-        });
+                });
     }
 
     protected void internalDeleteNamespaceBundle(String bundleRange, boolean 
authoritative, boolean force) {
@@ -2780,7 +2761,4 @@ public abstract class NamespacesBase extends 
AdminResource {
 
         internalSetPolicies("resource_group_name", rgName);
     }
-
-
-    private static final Logger log = 
LoggerFactory.getLogger(NamespacesBase.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
index f5f3b8e2ca5..9887d56fd8d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
@@ -60,8 +60,8 @@ public class TenantsBase extends PulsarWebResource {
 
     @GET
     @ApiOperation(value = "Get the list of existing tenants.", response = 
String.class, responseContainer = "List")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester 
doesn't have admin permissions"),
-            @ApiResponse(code = 404, message = "Tenant doesn't exist") })
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "The requester 
doesn't have admin permissions"),
+            @ApiResponse(code = 404, message = "Tenant doesn't exist")})
     public void getTenants(@Suspended final AsyncResponse asyncResponse) {
         final String clientAppId = clientAppId();
         try {
@@ -86,8 +86,8 @@ public class TenantsBase extends PulsarWebResource {
     @GET
     @Path("/{tenant}")
     @ApiOperation(value = "Get the admin configuration for a given tenant.")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester 
doesn't have admin permissions"),
-            @ApiResponse(code = 404, message = "Tenant does not exist") })
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "The requester 
doesn't have admin permissions"),
+            @ApiResponse(code = 404, message = "Tenant does not exist")})
     public void getTenantAdmin(@Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "The tenant name") @PathParam("tenant") String 
tenant) {
         final String clientAppId = clientAppId();
@@ -112,11 +112,11 @@ public class TenantsBase extends PulsarWebResource {
     @PUT
     @Path("/{tenant}")
     @ApiOperation(value = "Create a new tenant.", notes = "This operation 
requires Pulsar super-user privileges.")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester 
doesn't have admin permissions"),
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "The requester 
doesn't have admin permissions"),
             @ApiResponse(code = 409, message = "Tenant already exists"),
             @ApiResponse(code = 412, message = "Tenant name is not valid"),
             @ApiResponse(code = 412, message = "Clusters can not be empty"),
-            @ApiResponse(code = 412, message = "Clusters do not exist") })
+            @ApiResponse(code = 412, message = "Clusters do not exist")})
     public void createTenant(@Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "The tenant name") @PathParam("tenant") String 
tenant,
             @ApiParam(value = "TenantInfo") TenantInfoImpl tenantInfo) {
@@ -177,12 +177,12 @@ public class TenantsBase extends PulsarWebResource {
     @POST
     @Path("/{tenant}")
     @ApiOperation(value = "Update the admins for a tenant.",
-    notes = "This operation requires Pulsar super-user privileges.")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester 
doesn't have admin permissions"),
+            notes = "This operation requires Pulsar super-user privileges.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "The requester 
doesn't have admin permissions"),
             @ApiResponse(code = 404, message = "Tenant does not exist"),
             @ApiResponse(code = 409, message = "Tenant already exists"),
             @ApiResponse(code = 412, message = "Clusters can not be empty"),
-            @ApiResponse(code = 412, message = "Clusters do not exist") })
+            @ApiResponse(code = 412, message = "Clusters do not exist")})
     public void updateTenant(@Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "The tenant name") @PathParam("tenant") String 
tenant,
             @ApiParam(value = "TenantInfo") TenantInfoImpl newTenantAdmin) {
@@ -229,10 +229,10 @@ public class TenantsBase extends PulsarWebResource {
     @DELETE
     @Path("/{tenant}")
     @ApiOperation(value = "Delete a tenant and all namespaces and topics under 
it.")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester 
doesn't have admin permissions"),
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "The requester 
doesn't have admin permissions"),
             @ApiResponse(code = 404, message = "Tenant does not exist"),
             @ApiResponse(code = 405, message = "Broker doesn't allow forced 
deletion of tenants"),
-            @ApiResponse(code = 409, message = "The tenant still has active 
namespaces") })
+            @ApiResponse(code = 409, message = "The tenant still has active 
namespaces")})
     public void deleteTenant(@Suspended final AsyncResponse asyncResponse,
             @PathParam("tenant") @ApiParam(value = "The tenant name") String 
tenant,
             @QueryParam("force") @DefaultValue("false") boolean force) {
@@ -255,46 +255,37 @@ public class TenantsBase extends PulsarWebResource {
     }
 
     protected void internalDeleteTenant(AsyncResponse asyncResponse, String 
tenant) {
-        tenantResources().existsAsync(path(POLICIES, tenant)).thenApply(exists 
-> {
+        tenantResources().existsAsync(path(POLICIES, 
tenant)).thenAccept(exists -> {
+            // if tenant not exist, return directly
             if (!exists) {
                 asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Tenant doesn't exist"));
-                return null;
+                return;
             }
-            return hasActiveNamespace(tenant).thenAccept(ns -> {
-                try {
-                    // already fetched children and they should be in the cache
-                    List<CompletableFuture<Void>> clusterList = 
Lists.newArrayList();
-                    for (String cluster : 
tenantResources().getChildrenAsync(path(POLICIES, tenant)).get()) {
-                        
clusterList.add(tenantResources().deleteAsync(path(POLICIES, tenant, cluster)));
-                    }
-                    FutureUtil.waitForAll(clusterList).thenAccept(c -> {
-                        tenantResources().deleteAsync(path(POLICIES, 
tenant)).thenAccept(t -> {
+
+            hasActiveNamespace(tenant)
+                    .thenCompose(__ -> 
tenantResources().deleteTenantAsync(tenant))
+                    .thenCompose(__ -> 
pulsar().getPulsarResources().getTopicResources()
+                            .clearTenantPersistence(tenant))
+                    .thenCompose(__ -> 
pulsar().getPulsarResources().getNamespaceResources()
+                            .deleteTenantAsync(tenant))
+                    .thenCompose(__ -> 
pulsar().getPulsarResources().getNamespaceResources()
+                            
.getPartitionedTopicResources().clearPartitionedTopicTenantAsync(tenant))
+                    .thenCompose(__ -> 
pulsar().getPulsarResources().getLocalPolicies()
+                            .deleteLocalPoliciesTenantAsync(tenant))
+                    .whenComplete((ignore, ex) -> {
+                        Throwable cause = 
FutureUtil.unwrapCompletionException(ex);
+                        if (cause != null) {
+                            log.error("[{}] Failed to delete tenant {}", 
clientAppId(), tenant, cause);
+                            if (cause instanceof IllegalStateException) {
+                                asyncResponse.resume(new 
RestException(Status.CONFLICT, cause));
+                            } else {
+                                
resumeAsyncResponseExceptionally(asyncResponse, cause);
+                            }
+                        } else {
                             log.info("[{}] Deleted tenant {}", clientAppId(), 
tenant);
                             asyncResponse.resume(Response.noContent().build());
-                        }).exceptionally(ex -> {
-                            log.error("Failed to delete tenant {}", tenant, 
ex.getCause());
-                            asyncResponse.resume(new RestException(ex));
-                            return null;
-                        });
-                    }).exceptionally(ex -> {
-                        log.error("Failed to delete clusters under tenant {}", 
tenant, ex.getCause());
-                        asyncResponse.resume(new RestException(ex));
-                        return null;
+                        }
                     });
-                    log.info("[{}] Deleted tenant {}", clientAppId(), tenant);
-                } catch (Exception e) {
-                    log.error("[{}] Failed to delete tenant {}", 
clientAppId(), tenant, e);
-                    asyncResponse.resume(new RestException(e));
-                }
-            }).exceptionally(ex -> {
-                log.error("Failed to delete tenant due to active namespace 
{}", tenant, ex.getCause());
-                if (ex.getCause() instanceof IllegalStateException) {
-                    asyncResponse.resume(new RestException(Status.CONFLICT, 
ex.getCause()));
-                } else {
-                    asyncResponse.resume(new RestException(ex));
-                }
-                return null;
-            });
         });
     }
 
@@ -336,8 +327,6 @@ public class TenantsBase extends PulsarWebResource {
             }
             // delete tenant normally
             internalDeleteTenant(asyncResponse, tenant);
-
-            asyncResponse.resume(Response.noContent().build());
             return null;
         });
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 286a2fa14d2..834798382ab 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -1315,6 +1315,99 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         });
     }
 
+    @Test
+    public void testDeleteTenant() throws Exception {
+        pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
+
+        String tenant = "test-tenant-1";
+        assertFalse(admin.tenants().getTenants().contains(tenant));
+
+        // create tenant
+        admin.tenants().createTenant(tenant,
+                new TenantInfoImpl(Sets.newHashSet("role1", "role2"), 
Sets.newHashSet("test")));
+        assertTrue(admin.tenants().getTenants().contains(tenant));
+
+        // create namespace
+        String namespace = tenant + "/test-ns-1";
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        assertEquals(admin.namespaces().getNamespaces(tenant), 
Lists.newArrayList(namespace));
+
+        // create topic
+        String topic = namespace + "/test-topic-1";
+        admin.topics().createPartitionedTopic(topic, 10);
+        assertFalse(admin.topics().getList(namespace).isEmpty());
+
+        try {
+            admin.namespaces().deleteNamespace(namespace, false);
+            fail("should have failed due to namespace not empty");
+        } catch (PulsarAdminException e) {
+            // Expected: cannot delete non-empty tenant
+        }
+
+        // delete topic
+        admin.topics().deletePartitionedTopic(topic);
+        assertTrue(admin.topics().getList(namespace).isEmpty());
+
+        // delete namespace
+        admin.namespaces().deleteNamespace(namespace, false);
+        
assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace));
+        assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());
+
+        // delete tenant
+        admin.tenants().deleteTenant(tenant);
+        assertFalse(admin.tenants().getTenants().contains(tenant));
+
+        final String managedLedgersPath = "/managed-ledgers/" + tenant;
+        final String partitionedTopicPath = "/admin/partitioned-topics/" + 
tenant;
+        final String localPoliciesPath = "/admin/local-policies/" + tenant;
+        
assertFalse(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());
+        
assertFalse(pulsar.getLocalMetadataStore().exists(partitionedTopicPath).join());
+        
assertFalse(pulsar.getLocalMetadataStore().exists(localPoliciesPath).join());
+    }
+
+    @Test
+    public void testDeleteNamespace() throws Exception {
+        pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
+
+        String tenant = "test-tenant";
+        assertFalse(admin.tenants().getTenants().contains(tenant));
+
+        // create tenant
+        admin.tenants().createTenant(tenant,
+                new TenantInfoImpl(Sets.newHashSet("role1", "role2"), 
Sets.newHashSet("test")));
+        assertTrue(admin.tenants().getTenants().contains(tenant));
+
+        // create namespace
+        String namespace = tenant + "/test-ns";
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        assertEquals(admin.namespaces().getNamespaces(tenant), 
Lists.newArrayList(namespace));
+
+        // create topic
+        String topic = namespace + "/test-topic";
+        admin.topics().createPartitionedTopic(topic, 10);
+        assertFalse(admin.topics().getList(namespace).isEmpty());
+
+        try {
+            admin.namespaces().deleteNamespace(namespace, false);
+            fail("should have failed due to namespace not empty");
+        } catch (PulsarAdminException e) {
+            // Expected: cannot delete non-empty tenant
+        }
+
+        // delete topic
+        admin.topics().deletePartitionedTopic(topic);
+        assertTrue(admin.topics().getList(namespace).isEmpty());
+
+        // delete namespace
+        admin.namespaces().deleteNamespace(namespace, false);
+        
assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace));
+        assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());
+
+
+        final String managedLedgersPath = "/managed-ledgers/" + namespace;
+        
assertFalse(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());
+    }
+
     @Test(timeOut = 30000)
     public void testBacklogNoDelayed() throws PulsarClientException, 
PulsarAdminException, InterruptedException {
         final String topic = 
"persistent://prop-xyz/ns1/precise-back-log-no-delayed-" + 
UUID.randomUUID().toString();


Reply via email to