This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new fa90384b161 fix znode leakage caused by deleting tenant #12711#12972
(#18698)
fa90384b161 is described below
commit fa90384b1613cfe361927acf25ee32349e7513ec
Author: congbo <[email protected]>
AuthorDate: Fri Dec 9 10:19:42 2022 +0800
fix znode leakage caused by deleting tenant #12711#12972 (#18698)
---
.../pulsar/broker/resources/BaseResources.java | 36 ++++++++-
.../broker/resources/LocalPoliciesResources.java | 24 ++++++
.../broker/resources/NamespaceResources.java | 48 +++++++++--
.../pulsar/broker/resources/TenantResources.java | 11 +++
.../pulsar/broker/resources/TopicResources.java | 27 +++++++
.../pulsar/broker/admin/impl/NamespacesBase.java | 92 ++++++++-------------
.../pulsar/broker/admin/impl/TenantsBase.java | 83 +++++++++----------
.../apache/pulsar/broker/admin/AdminApi2Test.java | 93 ++++++++++++++++++++++
8 files changed, 304 insertions(+), 110 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
index 839011cc95f..b0f0d9c9efa 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
@@ -26,9 +26,10 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import java.util.stream.Collectors;
-import com.google.common.base.Joiner;
import lombok.Getter;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -151,6 +152,23 @@ public class BaseResources<T> {
return cache.delete(path);
}
+ protected CompletableFuture<Void> deleteIfExistsAsync(String path) {
+ CompletableFuture<Void> future = new CompletableFuture<Void>();
+ deleteAsync(path).whenComplete((ignore, ex) -> {
+ if (ex != null) {
+ if (ex.getCause() instanceof
MetadataStoreException.NotFoundException) {
+ // if not found, this path has been deleted
+ future.complete(null);
+ } else {
+ future.completeExceptionally(ex);
+ }
+ } else {
+ future.complete(null);
+ }
+ });
+ return future;
+ }
+
public boolean exists(String path) throws MetadataStoreException {
try {
return existsAsync(path).get(operationTimeoutSec,
TimeUnit.SECONDS);
@@ -175,4 +193,20 @@ public class BaseResources<T> {
Joiner.on('/').appendTo(sb, parts);
return sb.toString();
}
+
+ protected CompletableFuture<Void> deleteRecursiveAsync(String path) {
+ return getChildrenAsync(path)
+ .thenCompose(children -> FutureUtil.waitForAll(
+ children.stream()
+ .map(child -> deleteRecursiveAsync(path + "/"
+ child))
+ .collect(Collectors.toList())))
+ .thenCompose(__ -> existsAsync(path))
+ .thenCompose(exists -> {
+ if (exists) {
+ return deleteAsync(path);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ });
+ }
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
index ebaa38e6b10..31ee471ec38 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
@@ -18,12 +18,36 @@
*/
package org.apache.pulsar.broker.resources;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import java.util.concurrent.CompletableFuture;
public class LocalPoliciesResources extends BaseResources<LocalPolicies> {
+ public static final String LOCAL_POLICIES_ROOT = "/admin/local-policies";
+
public LocalPoliciesResources(MetadataStoreExtended configurationStore,
int operationTimeoutSec) {
super(configurationStore, LocalPolicies.class, operationTimeoutSec);
}
+
+ public CompletableFuture<Void> deleteLocalPoliciesAsync(NamespaceName ns) {
+ CompletableFuture<Void> completableFuture =
deleteIfExistsAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString()));
+ // in order to delete the cluster for namespace v1
+ if (ns.getCluster() != null) {
+ String clusterPath = joinPath(LOCAL_POLICIES_ROOT, ns.getTenant(),
ns.getCluster());
+ return getChildrenAsync(clusterPath).thenCompose(nss -> {
+ if (nss.isEmpty()) {
+ return deleteIfExistsAsync(clusterPath);
+ }
+ return completableFuture;
+ });
+ } else {
+ return completableFuture;
+ }
+ }
+
+ public CompletableFuture<Void> deleteLocalPoliciesTenantAsync(String
tenant) {
+ return deleteIfExistsAsync(joinPath(LOCAL_POLICIES_ROOT, tenant));
+ }
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index f4d876d2534..f9fb3eee802 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -22,9 +22,8 @@ import com.fasterxml.jackson.core.type.TypeReference;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-
import lombok.Getter;
-
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -35,10 +34,12 @@ import
org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@Getter
+@Slf4j
public class NamespaceResources extends BaseResources<Policies> {
- private IsolationPolicyResources isolationPolicies;
- private PartitionedTopicResources partitionedTopicResources;
- private MetadataStoreExtended configurationStore;
+ private final IsolationPolicyResources isolationPolicies;
+ private final PartitionedTopicResources partitionedTopicResources;
+ private final MetadataStoreExtended configurationStore;
+ private static final String NAMESPACE_BASE_PATH = "/namespace";
public NamespaceResources(MetadataStoreExtended configurationStore, int
operationTimeoutSec) {
super(configurationStore, Policies.class, operationTimeoutSec);
@@ -47,10 +48,25 @@ public class NamespaceResources extends
BaseResources<Policies> {
partitionedTopicResources = new
PartitionedTopicResources(configurationStore, operationTimeoutSec);
}
+ public CompletableFuture<Void> deletePoliciesAsync(NamespaceName ns){
+ return deleteIfExistsAsync(joinPath(BASE_POLICIES_PATH,
ns.toString()));
+ }
public CompletableFuture<Optional<Policies>>
getPoliciesAsync(NamespaceName ns) {
return getCache().get(joinPath(BASE_POLICIES_PATH, ns.toString()));
}
+ // clear resource of `/namespace/{namespaceName}` for zk-node
+ public CompletableFuture<Void> deleteNamespaceAsync(NamespaceName ns) {
+ final String namespacePath = joinPath(NAMESPACE_BASE_PATH,
ns.toString());
+ return deleteIfExistsAsync(namespacePath);
+ }
+
+ // clear resource of `/namespace/{tenant}` for zk-node
+ public CompletableFuture<Void> deleteTenantAsync(String tenant) {
+ final String tenantPath = joinPath(NAMESPACE_BASE_PATH, tenant);
+ return deleteIfExistsAsync(tenantPath);
+ }
+
public static class IsolationPolicyResources extends
BaseResources<Map<String, NamespaceIsolationDataImpl>> {
public IsolationPolicyResources(MetadataStoreExtended store, int
operationTimeoutSec) {
super(store, new TypeReference<Map<String,
NamespaceIsolationDataImpl>>() {
@@ -74,5 +90,27 @@ public class NamespaceResources extends
BaseResources<Policies> {
return createAsync(joinPath(PARTITIONED_TOPIC_PATH,
tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName()), tm);
}
+
+ public CompletableFuture<Void>
clearPartitionedTopicMetadataAsync(NamespaceName namespaceName) {
+ final String globalPartitionedPath =
joinPath(PARTITIONED_TOPIC_PATH, namespaceName.toString());
+
+ CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
+
+ deleteRecursiveAsync(globalPartitionedPath)
+ .thenAccept(ignore -> {
+ log.info("Clear partitioned topic metadata [{}]
success.", namespaceName);
+ completableFuture.complete(null);
+ }).exceptionally(ex -> {
+ log.error("Clear partitioned topic metadata failed.");
+ completableFuture.completeExceptionally(ex.getCause());
+ return null;
+ });
+
+ return completableFuture;
+ }
+
+ public CompletableFuture<Void> clearPartitionedTopicTenantAsync(String
tenant) {
+ return deleteIfExistsAsync(joinPath(PARTITIONED_TOPIC_PATH,
tenant));
+ }
}
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
index 78d80b9d6f7..90a15cd6bdd 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
@@ -20,7 +20,10 @@ package org.apache.pulsar.broker.resources;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
public class TenantResources extends BaseResources<TenantInfo> {
@@ -31,4 +34,12 @@ public class TenantResources extends
BaseResources<TenantInfo> {
public CompletableFuture<Optional<TenantInfo>> getTenantAsync(String
tenantName) {
return getAsync(joinPath(BASE_POLICIES_PATH, tenantName));
}
+
+ public CompletableFuture<Void> deleteTenantAsync(String tenantName) {
+ return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenantName))
+ .thenCompose(clusters ->
FutureUtil.waitForAll(clusters.stream()
+ .map(cluster ->
getCache().delete(joinPath(BASE_POLICIES_PATH, tenantName, cluster)))
+ .collect(Collectors.toList()))
+ ).thenCompose(__ ->
deleteIfExistsAsync(joinPath(BASE_POLICIES_PATH, tenantName)));
+ }
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
index d25b308d086..4fe3ba68721 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.metadata.api.MetadataStore;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@@ -50,4 +51,30 @@ public class TopicResources {
.collect(Collectors.toList())
);
}
+
+ public CompletableFuture<Void> clearNamespacePersistence(NamespaceName ns)
{
+ String path = MANAGED_LEDGER_PATH + "/" + ns;
+ return deleteIfExistsAsync(path);
+ }
+
+ public CompletableFuture<Void> clearDomainPersistence(NamespaceName ns) {
+ String path = MANAGED_LEDGER_PATH + "/" + ns + "/persistent";
+ return deleteIfExistsAsync(path);
+ }
+
+ public CompletableFuture<Void> clearTenantPersistence(String tenant) {
+ String path = MANAGED_LEDGER_PATH + "/" + tenant;
+ return deleteIfExistsAsync(path);
+ }
+
+ private CompletableFuture<Void> deleteIfExistsAsync(String path) {
+ return store.exists(path)
+ .thenCompose(exists -> {
+ if (exists) {
+ return store.delete(path, Optional.empty());
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ });
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0b0d02b7a6b..e7857e58240 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -52,6 +52,7 @@ import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.mutable.MutableObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
@@ -109,9 +110,8 @@ import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+@Slf4j
public abstract class NamespacesBase extends AdminResource {
protected List<String> internalGetTenantNamespaces(String tenant) {
@@ -163,14 +163,14 @@ public abstract class NamespacesBase extends
AdminResource {
protected void internalDeleteNamespace(AsyncResponse asyncResponse,
boolean authoritative, boolean force) {
if (force) {
- internalDeleteNamespaceForcefully(asyncResponse, authoritative);
+ internalDeleteNamespaceForcefully(asyncResponse);
} else {
- internalDeleteNamespace(asyncResponse, authoritative);
+ internalDeleteNamespace(asyncResponse);
}
}
@SuppressWarnings("deprecation")
- protected void internalDeleteNamespace(AsyncResponse asyncResponse,
boolean authoritative) {
+ protected void internalDeleteNamespace(AsyncResponse asyncResponse) {
validateTenantOperation(namespaceName.getTenant(),
TenantOperation.DELETE_NAMESPACE);
validatePoliciesReadOnlyAccess();
@@ -308,14 +308,7 @@ public abstract class NamespacesBase extends AdminResource
{
return FutureUtil.waitForAll(deleteBundleFutures);
});
})
- .thenCompose(__ -> {
- // we have successfully removed all the ownership for the
namespace, the policies znode can be deleted
- // now
- final String globalZkPolicyPath = path(POLICIES,
namespaceName.toString());
- final String localZkPolicyPath = joinPath(LOCAL_POLICIES_ROOT,
namespaceName.toString());
- return namespaceResources().deleteAsync(globalZkPolicyPath)
- .thenCompose((ignore ->
getLocalPolicies().deleteAsync(localZkPolicyPath)));
- })
+ .thenCompose(__ -> internalClearZkSources())
.thenAccept(__ -> {
log.info("[{}] Remove namespace successfully {}", clientAppId(),
namespaceName);
asyncResponse.resume(Response.noContent().build());
@@ -327,8 +320,26 @@ public abstract class NamespacesBase extends AdminResource
{
});
}
+ // clear zk-node resources for deleting namespace
+ protected CompletableFuture<Void> internalClearZkSources() {
+ // clear resource of `/namespace/{namespaceName}` for zk-node
+ return namespaceResources().deleteNamespaceAsync(namespaceName)
+ .thenCompose(ignore ->
namespaceResources().getPartitionedTopicResources()
+ .clearPartitionedTopicMetadataAsync(namespaceName))
+ // clear resource for manager-ledger z-node
+ .thenCompose(ignore ->
pulsar().getPulsarResources().getTopicResources()
+ .clearDomainPersistence(namespaceName))
+ .thenCompose(ignore ->
pulsar().getPulsarResources().getTopicResources()
+ .clearNamespacePersistence(namespaceName))
+ // we have successfully removed all the ownership for the
namespace, the policies
+ // z-node can be deleted now
+ .thenCompose(ignore ->
namespaceResources().deletePoliciesAsync(namespaceName))
+ // clear z-node of local policies
+ .thenCompose(ignore ->
getLocalPolicies().deleteLocalPoliciesAsync(namespaceName));
+ }
+
@SuppressWarnings("deprecation")
- protected void internalDeleteNamespaceForcefully(AsyncResponse
asyncResponse, boolean authoritative) {
+ protected void internalDeleteNamespaceForcefully(AsyncResponse
asyncResponse) {
validateTenantOperation(namespaceName.getTenant(),
TenantOperation.DELETE_NAMESPACE);
validatePoliciesReadOnlyAccess();
@@ -463,51 +474,21 @@ public abstract class NamespacesBase extends
AdminResource {
}
}
} catch (Exception e) {
- log.error("[{}] Failed to remove owned namespace {}",
clientAppId(), namespaceName, e);
+ log.error("[{}] Failed to remove forcefully owned namespace {}",
clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return;
}
- FutureUtil.waitForAll(futures).handle((result, exception) -> {
- if (exception != null) {
- if (exception.getCause() instanceof PulsarAdminException) {
- asyncResponse.resume(new
RestException((PulsarAdminException) exception.getCause()));
- return null;
- } else {
- log.error("[{}] Failed to remove owned namespace {}",
clientAppId(), namespaceName, exception);
- asyncResponse.resume(new
RestException(exception.getCause()));
+ FutureUtil.waitForAll(futures).thenCompose(__ ->
internalClearZkSources())
+ .thenAccept(__ -> {
+ log.info("[{}] Remove namespace successfully {}",
clientAppId(), namespaceName);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to remove namespace {}",
clientAppId(), namespaceName, ex.getCause());
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
- }
- }
-
- try {
- // remove partitioned topics znode
- final String globalPartitionedPath =
path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString());
- // check whether partitioned topics znode exist
- if (namespaceResources().exists(globalPartitionedPath)) {
- deleteRecursive(namespaceResources(),
globalPartitionedPath);
- }
-
- // we have successfully removed all the ownership for the
namespace, the policies znode can be deleted
- // now
- final String globalZkPolicyPath = path(POLICIES,
namespaceName.toString());
- final String localZkPolicyPath = joinPath(LOCAL_POLICIES_ROOT,
namespaceName.toString());
- namespaceResources().delete(globalZkPolicyPath);
-
- try {
- getLocalPolicies().delete(localZkPolicyPath);
- } catch (NotFoundException nne) {
- // If the z-node with the modified information is not
there anymore, we're already good
- }
- } catch (Exception e) {
- log.error("[{}] Failed to remove owned namespace {} from ZK",
clientAppId(), namespaceName, e);
- asyncResponse.resume(new RestException(e));
- return null;
- }
-
- asyncResponse.resume(Response.noContent().build());
- return null;
- });
+ });
}
protected void internalDeleteNamespaceBundle(String bundleRange, boolean
authoritative, boolean force) {
@@ -2780,7 +2761,4 @@ public abstract class NamespacesBase extends
AdminResource {
internalSetPolicies("resource_group_name", rgName);
}
-
-
- private static final Logger log =
LoggerFactory.getLogger(NamespacesBase.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
index f5f3b8e2ca5..9887d56fd8d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
@@ -60,8 +60,8 @@ public class TenantsBase extends PulsarWebResource {
@GET
@ApiOperation(value = "Get the list of existing tenants.", response =
String.class, responseContainer = "List")
- @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester
doesn't have admin permissions"),
- @ApiResponse(code = 404, message = "Tenant doesn't exist") })
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "The requester
doesn't have admin permissions"),
+ @ApiResponse(code = 404, message = "Tenant doesn't exist")})
public void getTenants(@Suspended final AsyncResponse asyncResponse) {
final String clientAppId = clientAppId();
try {
@@ -86,8 +86,8 @@ public class TenantsBase extends PulsarWebResource {
@GET
@Path("/{tenant}")
@ApiOperation(value = "Get the admin configuration for a given tenant.")
- @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester
doesn't have admin permissions"),
- @ApiResponse(code = 404, message = "Tenant does not exist") })
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "The requester
doesn't have admin permissions"),
+ @ApiResponse(code = 404, message = "Tenant does not exist")})
public void getTenantAdmin(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "The tenant name") @PathParam("tenant") String
tenant) {
final String clientAppId = clientAppId();
@@ -112,11 +112,11 @@ public class TenantsBase extends PulsarWebResource {
@PUT
@Path("/{tenant}")
@ApiOperation(value = "Create a new tenant.", notes = "This operation
requires Pulsar super-user privileges.")
- @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester
doesn't have admin permissions"),
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "The requester
doesn't have admin permissions"),
@ApiResponse(code = 409, message = "Tenant already exists"),
@ApiResponse(code = 412, message = "Tenant name is not valid"),
@ApiResponse(code = 412, message = "Clusters can not be empty"),
- @ApiResponse(code = 412, message = "Clusters do not exist") })
+ @ApiResponse(code = 412, message = "Clusters do not exist")})
public void createTenant(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "The tenant name") @PathParam("tenant") String
tenant,
@ApiParam(value = "TenantInfo") TenantInfoImpl tenantInfo) {
@@ -177,12 +177,12 @@ public class TenantsBase extends PulsarWebResource {
@POST
@Path("/{tenant}")
@ApiOperation(value = "Update the admins for a tenant.",
- notes = "This operation requires Pulsar super-user privileges.")
- @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester
doesn't have admin permissions"),
+ notes = "This operation requires Pulsar super-user privileges.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "The requester
doesn't have admin permissions"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 409, message = "Tenant already exists"),
@ApiResponse(code = 412, message = "Clusters can not be empty"),
- @ApiResponse(code = 412, message = "Clusters do not exist") })
+ @ApiResponse(code = 412, message = "Clusters do not exist")})
public void updateTenant(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "The tenant name") @PathParam("tenant") String
tenant,
@ApiParam(value = "TenantInfo") TenantInfoImpl newTenantAdmin) {
@@ -229,10 +229,10 @@ public class TenantsBase extends PulsarWebResource {
@DELETE
@Path("/{tenant}")
@ApiOperation(value = "Delete a tenant and all namespaces and topics under
it.")
- @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester
doesn't have admin permissions"),
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "The requester
doesn't have admin permissions"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 405, message = "Broker doesn't allow forced
deletion of tenants"),
- @ApiResponse(code = 409, message = "The tenant still has active
namespaces") })
+ @ApiResponse(code = 409, message = "The tenant still has active
namespaces")})
public void deleteTenant(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") @ApiParam(value = "The tenant name") String
tenant,
@QueryParam("force") @DefaultValue("false") boolean force) {
@@ -255,46 +255,37 @@ public class TenantsBase extends PulsarWebResource {
}
protected void internalDeleteTenant(AsyncResponse asyncResponse, String
tenant) {
- tenantResources().existsAsync(path(POLICIES, tenant)).thenApply(exists
-> {
+ tenantResources().existsAsync(path(POLICIES,
tenant)).thenAccept(exists -> {
+ // if tenant not exist, return directly
if (!exists) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Tenant doesn't exist"));
- return null;
+ return;
}
- return hasActiveNamespace(tenant).thenAccept(ns -> {
- try {
- // already fetched children and they should be in the cache
- List<CompletableFuture<Void>> clusterList =
Lists.newArrayList();
- for (String cluster :
tenantResources().getChildrenAsync(path(POLICIES, tenant)).get()) {
-
clusterList.add(tenantResources().deleteAsync(path(POLICIES, tenant, cluster)));
- }
- FutureUtil.waitForAll(clusterList).thenAccept(c -> {
- tenantResources().deleteAsync(path(POLICIES,
tenant)).thenAccept(t -> {
+
+ hasActiveNamespace(tenant)
+ .thenCompose(__ ->
tenantResources().deleteTenantAsync(tenant))
+ .thenCompose(__ ->
pulsar().getPulsarResources().getTopicResources()
+ .clearTenantPersistence(tenant))
+ .thenCompose(__ ->
pulsar().getPulsarResources().getNamespaceResources()
+ .deleteTenantAsync(tenant))
+ .thenCompose(__ ->
pulsar().getPulsarResources().getNamespaceResources()
+
.getPartitionedTopicResources().clearPartitionedTopicTenantAsync(tenant))
+ .thenCompose(__ ->
pulsar().getPulsarResources().getLocalPolicies()
+ .deleteLocalPoliciesTenantAsync(tenant))
+ .whenComplete((ignore, ex) -> {
+ Throwable cause =
FutureUtil.unwrapCompletionException(ex);
+ if (cause != null) {
+ log.error("[{}] Failed to delete tenant {}",
clientAppId(), tenant, cause);
+ if (cause instanceof IllegalStateException) {
+ asyncResponse.resume(new
RestException(Status.CONFLICT, cause));
+ } else {
+
resumeAsyncResponseExceptionally(asyncResponse, cause);
+ }
+ } else {
log.info("[{}] Deleted tenant {}", clientAppId(),
tenant);
asyncResponse.resume(Response.noContent().build());
- }).exceptionally(ex -> {
- log.error("Failed to delete tenant {}", tenant,
ex.getCause());
- asyncResponse.resume(new RestException(ex));
- return null;
- });
- }).exceptionally(ex -> {
- log.error("Failed to delete clusters under tenant {}",
tenant, ex.getCause());
- asyncResponse.resume(new RestException(ex));
- return null;
+ }
});
- log.info("[{}] Deleted tenant {}", clientAppId(), tenant);
- } catch (Exception e) {
- log.error("[{}] Failed to delete tenant {}",
clientAppId(), tenant, e);
- asyncResponse.resume(new RestException(e));
- }
- }).exceptionally(ex -> {
- log.error("Failed to delete tenant due to active namespace
{}", tenant, ex.getCause());
- if (ex.getCause() instanceof IllegalStateException) {
- asyncResponse.resume(new RestException(Status.CONFLICT,
ex.getCause()));
- } else {
- asyncResponse.resume(new RestException(ex));
- }
- return null;
- });
});
}
@@ -336,8 +327,6 @@ public class TenantsBase extends PulsarWebResource {
}
// delete tenant normally
internalDeleteTenant(asyncResponse, tenant);
-
- asyncResponse.resume(Response.noContent().build());
return null;
});
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 286a2fa14d2..834798382ab 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -1315,6 +1315,99 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
});
}
+ @Test
+ public void testDeleteTenant() throws Exception {
+ pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
+
+ String tenant = "test-tenant-1";
+ assertFalse(admin.tenants().getTenants().contains(tenant));
+
+ // create tenant
+ admin.tenants().createTenant(tenant,
+ new TenantInfoImpl(Sets.newHashSet("role1", "role2"),
Sets.newHashSet("test")));
+ assertTrue(admin.tenants().getTenants().contains(tenant));
+
+ // create namespace
+ String namespace = tenant + "/test-ns-1";
+ admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+ assertEquals(admin.namespaces().getNamespaces(tenant),
Lists.newArrayList(namespace));
+
+ // create topic
+ String topic = namespace + "/test-topic-1";
+ admin.topics().createPartitionedTopic(topic, 10);
+ assertFalse(admin.topics().getList(namespace).isEmpty());
+
+ try {
+ admin.namespaces().deleteNamespace(namespace, false);
+ fail("should have failed due to namespace not empty");
+ } catch (PulsarAdminException e) {
+ // Expected: cannot delete non-empty tenant
+ }
+
+ // delete topic
+ admin.topics().deletePartitionedTopic(topic);
+ assertTrue(admin.topics().getList(namespace).isEmpty());
+
+ // delete namespace
+ admin.namespaces().deleteNamespace(namespace, false);
+
assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace));
+ assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());
+
+ // delete tenant
+ admin.tenants().deleteTenant(tenant);
+ assertFalse(admin.tenants().getTenants().contains(tenant));
+
+ final String managedLedgersPath = "/managed-ledgers/" + tenant;
+ final String partitionedTopicPath = "/admin/partitioned-topics/" +
tenant;
+ final String localPoliciesPath = "/admin/local-policies/" + tenant;
+
assertFalse(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());
+
assertFalse(pulsar.getLocalMetadataStore().exists(partitionedTopicPath).join());
+
assertFalse(pulsar.getLocalMetadataStore().exists(localPoliciesPath).join());
+ }
+
+ @Test
+ public void testDeleteNamespace() throws Exception {
+ pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
+
+ String tenant = "test-tenant";
+ assertFalse(admin.tenants().getTenants().contains(tenant));
+
+ // create tenant
+ admin.tenants().createTenant(tenant,
+ new TenantInfoImpl(Sets.newHashSet("role1", "role2"),
Sets.newHashSet("test")));
+ assertTrue(admin.tenants().getTenants().contains(tenant));
+
+ // create namespace
+ String namespace = tenant + "/test-ns";
+ admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+ assertEquals(admin.namespaces().getNamespaces(tenant),
Lists.newArrayList(namespace));
+
+ // create topic
+ String topic = namespace + "/test-topic";
+ admin.topics().createPartitionedTopic(topic, 10);
+ assertFalse(admin.topics().getList(namespace).isEmpty());
+
+ try {
+ admin.namespaces().deleteNamespace(namespace, false);
+ fail("should have failed due to namespace not empty");
+ } catch (PulsarAdminException e) {
+ // Expected: cannot delete non-empty tenant
+ }
+
+ // delete topic
+ admin.topics().deletePartitionedTopic(topic);
+ assertTrue(admin.topics().getList(namespace).isEmpty());
+
+ // delete namespace
+ admin.namespaces().deleteNamespace(namespace, false);
+
assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace));
+ assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());
+
+
+ final String managedLedgersPath = "/managed-ledgers/" + namespace;
+
assertFalse(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());
+ }
+
@Test(timeOut = 30000)
public void testBacklogNoDelayed() throws PulsarClientException,
PulsarAdminException, InterruptedException {
final String topic =
"persistent://prop-xyz/ns1/precise-back-log-no-delayed-" +
UUID.randomUUID().toString();