This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 138ea354f9d [improve][broker] Make some methods in NamespacesBase 
async. (#15518)
138ea354f9d is described below

commit 138ea354f9de30e4b7bc9d4fe6ee16b84cd4a896
Author: Jiwei Guo <[email protected]>
AuthorDate: Sat May 14 09:59:16 2022 +0800

    [improve][broker] Make some methods in NamespacesBase async. (#15518)
    
    ### Motivation
    See PIP #14365  and change tracker #15043.
    
     Make `NamespacesBase` `getTenantNamespaces / createNamespace / getTopics / 
getPolicies / getPermissions` methods to pure async.
---
 .../broker/resources/NamespaceResources.java       |   4 +
 .../pulsar/broker/resources/TenantResources.java   |  38 +++-
 .../apache/pulsar/broker/admin/AdminResource.java  |  10 +
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  74 ++++---
 .../apache/pulsar/broker/admin/v1/Namespaces.java  | 126 ++++++++----
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  96 ++++++---
 .../pulsar/broker/web/PulsarWebResource.java       |  15 ++
 .../org/apache/pulsar/broker/admin/AdminTest.java  | 221 ++++++---------------
 .../apache/pulsar/broker/admin/NamespacesTest.java |  80 ++++----
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  99 +++++++++
 10 files changed, 453 insertions(+), 310 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index ce797a80c7c..c24df6c586f 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -93,6 +93,10 @@ public class NamespaceResources extends 
BaseResources<Policies> {
         create(joinPath(BASE_POLICIES_PATH, ns.toString()), policies);
     }
 
+    public CompletableFuture<Void> createPoliciesAsync(NamespaceName ns, 
Policies policies) {
+        return createAsync(joinPath(BASE_POLICIES_PATH, ns.toString()), 
policies);
+    }
+
     public boolean namespaceExists(NamespaceName ns) throws 
MetadataStoreException {
         String path = joinPath(BASE_POLICIES_PATH, ns.toString());
         return super.exists(path) && super.getChildren(path).isEmpty();
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
index 3313b61c8a1..87f48f7e682 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.resources;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -78,7 +79,7 @@ public class TenantResources extends 
BaseResources<TenantInfo> {
     }
 
     public CompletableFuture<Boolean> tenantExistsAsync(String tenantName) {
-        return getCache().exists(joinPath(BASE_POLICIES_PATH, tenantName));
+        return existsAsync(joinPath(BASE_POLICIES_PATH, tenantName));
     }
 
     public List<String> getListOfNamespaces(String tenant) throws 
MetadataStoreException {
@@ -110,6 +111,41 @@ public class TenantResources extends 
BaseResources<TenantInfo> {
         return namespaces;
     }
 
+    public CompletableFuture<List<String>> getListOfNamespacesAsync(String 
tenant) {
+        // this will return a cluster in v1 and a namespace in v2
+        return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant))
+                .thenCompose(clusterOrNamespaces -> 
clusterOrNamespaces.stream().map(key ->
+                        getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, 
key))
+                                .thenCompose(children -> {
+                                    if (children == null || 
children.isEmpty()) {
+                                        String namespace = 
NamespaceName.get(tenant, key).toString();
+                                        // if the length is 0 then this is 
probably a leftover cluster from namespace
+                                        // created with the v1 admin format 
(prop/cluster/ns) and then deleted, so no
+                                        // need to add it to the list
+                                        return 
getAsync(joinPath(BASE_POLICIES_PATH, namespace))
+                                           .thenApply(opt -> opt.isPresent() ? 
Collections.singletonList(namespace)
+                                                   : new ArrayList<String>())
+                                           .exceptionally(ex -> {
+                                                Throwable cause = 
FutureUtil.unwrapCompletionException(ex);
+                                                if (cause instanceof 
MetadataStoreException
+                                                        
.ContentDeserializationException) {
+                                                    return new ArrayList<>();
+                                                }
+                                                throw 
FutureUtil.wrapToCompletionException(ex);
+                                            });
+                                    } else {
+                                        CompletableFuture<List<String>> ret = 
new CompletableFuture();
+                                        ret.complete(children.stream().map(ns 
-> NamespaceName.get(tenant, key, ns)
+                                                
.toString()).collect(Collectors.toList()));
+                                        return ret;
+                                    }
+                                
})).reduce(CompletableFuture.completedFuture(new ArrayList<>()),
+                                        (accumulator, n) -> 
accumulator.thenCompose(namespaces -> n.thenApply(m -> {
+                                            namespaces.addAll(m);
+                                            return namespaces;
+                                    }))));
+    }
+
     public CompletableFuture<List<String>> getActiveNamespaces(String tenant, 
String cluster) {
         return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, cluster));
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index ec502e134f2..3a396411c63 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -323,6 +323,11 @@ public abstract class AdminResource extends 
PulsarWebResource {
                         return FutureUtil.failedFuture(new RestException(e));
                     }
                     policies.get().bundles = bundleData != null ? bundleData : 
policies.get().bundles;
+                    if (policies.get().is_allow_auto_update_schema == null) {
+                        // the type changed from boolean to Boolean. return 
broker value here for keeping compatibility.
+                        policies.get().is_allow_auto_update_schema = 
pulsar().getConfig()
+                                .isAllowAutoUpdateSchemaEnabled();
+                    }
                     return CompletableFuture.completedFuture(policies.get());
                 });
             } else {
@@ -534,6 +539,11 @@ public abstract class AdminResource extends 
PulsarWebResource {
         }
     }
 
+    protected CompletableFuture<List<String>> 
getPartitionedTopicListAsync(TopicDomain topicDomain) {
+        return namespaceResources().getPartitionedTopicResources()
+                .listPartitionedTopicsAsync(namespaceName, topicDomain);
+    }
+
     protected List<String> getTopicPartitionList(TopicDomain topicDomain) {
         try {
             return 
getPulsarResources().getTopicResources().getExistingPartitions(topicName)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 49a221946ee..39a4eb13060 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -103,7 +103,6 @@ import 
org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
-import 
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.slf4j.Logger;
@@ -111,52 +110,47 @@ import org.slf4j.LoggerFactory;
 
 public abstract class NamespacesBase extends AdminResource {
 
-    protected List<String> internalGetTenantNamespaces(String tenant) {
-        checkNotNull(tenant, "Tenant should not be null");
+    protected CompletableFuture<List<String>> 
internalGetTenantNamespaces(String tenant) {
+        if (tenant == null) {
+            return FutureUtil.failedFuture(new 
RestException(Status.BAD_REQUEST, "Tenant should not be null"));
+        }
         try {
             NamedEntity.checkName(tenant);
         } catch (IllegalArgumentException e) {
             log.warn("[{}] Tenant name is invalid {}", clientAppId(), tenant, 
e);
-            throw new RestException(Status.PRECONDITION_FAILED, "Tenant name 
is not valid");
-        }
-        validateTenantOperation(tenant, TenantOperation.LIST_NAMESPACES);
-
-        try {
-            if (!tenantResources().tenantExists(tenant)) {
-                throw new RestException(Status.NOT_FOUND, "Tenant not found");
-            }
-
-            return tenantResources().getListOfNamespaces(tenant);
-        } catch (Exception e) {
-            log.error("[{}] Failed to get namespaces list: {}", clientAppId(), 
e);
-            throw new RestException(e);
+            return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid"));
         }
+        return validateTenantOperationAsync(tenant, 
TenantOperation.LIST_NAMESPACES)
+                .thenCompose(__ -> tenantResources().tenantExistsAsync(tenant))
+                .thenCompose(existed -> {
+                    if (!existed) {
+                        throw new RestException(Status.NOT_FOUND, "Tenant not 
found");
+                    }
+                    return tenantResources().getListOfNamespacesAsync(tenant);
+                });
     }
 
-    protected void internalCreateNamespace(Policies policies) {
-        validateTenantOperation(namespaceName.getTenant(), 
TenantOperation.CREATE_NAMESPACE);
-        validatePoliciesReadOnlyAccess();
-        validatePolicies(namespaceName, policies);
-
-        try {
-            int maxNamespacesPerTenant = 
pulsar().getConfiguration().getMaxNamespacesPerTenant();
-            // no distributed locks are added here.In a concurrent scenario, 
the threshold will be exceeded.
-            if (maxNamespacesPerTenant > 0) {
-                List<String> namespaces = 
tenantResources().getListOfNamespaces(namespaceName.getTenant());
-                if (namespaces != null && namespaces.size() > 
maxNamespacesPerTenant) {
-                    throw new RestException(Status.PRECONDITION_FAILED,
-                            "Exceed the maximum number of namespace in tenant 
:" + namespaceName.getTenant());
-                }
-            }
-            namespaceResources().createPolicies(namespaceName, policies);
-            log.info("[{}] Created namespace {}", clientAppId(), 
namespaceName);
-        } catch (AlreadyExistsException e) {
-            log.warn("[{}] Failed to create namespace {} - already exists", 
clientAppId(), namespaceName);
-            throw new RestException(Status.CONFLICT, "Namespace already 
exists");
-        } catch (Exception e) {
-            log.error("[{}] Failed to create namespace {}", clientAppId(), 
namespaceName, e);
-            throw new RestException(e);
-        }
+    protected CompletableFuture<Void> internalCreateNamespace(Policies 
policies) {
+        return validateTenantOperationAsync(namespaceName.getTenant(), 
TenantOperation.CREATE_NAMESPACE)
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenAccept(__ -> validatePolicies(namespaceName, policies))
+                .thenCompose(__ -> {
+                    int maxNamespacesPerTenant = 
pulsar().getConfiguration().getMaxNamespacesPerTenant();
+                    // no distributed locks are added here.In a concurrent 
scenario, the threshold will be exceeded.
+                    if (maxNamespacesPerTenant > 0) {
+                        return 
tenantResources().getListOfNamespacesAsync(namespaceName.getTenant())
+                                .thenAccept(namespaces -> {
+                                    if (namespaces != null && 
namespaces.size() > maxNamespacesPerTenant) {
+                                        throw new 
RestException(Status.PRECONDITION_FAILED,
+                                                "Exceed the maximum number of 
namespace in tenant :"
+                                                        + 
namespaceName.getTenant());
+                                    }
+                                });
+                    }
+                    return CompletableFuture.completedFuture(null);
+                })
+                .thenCompose(__ -> 
namespaceResources().createPoliciesAsync(namespaceName, policies))
+                .thenAccept(__ -> log.info("[{}] Created namespace {}", 
clientAppId(), namespaceName));
     }
 
     protected void internalDeleteNamespace(AsyncResponse asyncResponse, 
boolean authoritative, boolean force) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index c21c8b8910f..0ccc505d5e8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -28,6 +28,7 @@ import io.swagger.annotations.ApiResponses;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
@@ -42,6 +43,7 @@ import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import org.apache.pulsar.broker.admin.impl.NamespacesBase;
 import org.apache.pulsar.broker.web.RestException;
@@ -67,6 +69,8 @@ import 
org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrat
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
 import org.apache.pulsar.common.policies.data.TenantOperation;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,8 +88,15 @@ public class Namespaces extends NamespacesBase {
             response = String.class, responseContainer = "Set")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Property doesn't exist")})
-    public List<String> getTenantNamespaces(@PathParam("property") String 
property) {
-        return internalGetTenantNamespaces(property);
+    public void getTenantNamespaces(@Suspended AsyncResponse response,
+                                    @PathParam("property") String property) {
+        internalGetTenantNamespaces(property)
+                .thenAccept(response::resume)
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get namespaces list: {}", 
clientAppId(), ex);
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -125,21 +136,20 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = {
             @ApiResponse(code = 403, message = "Don't have admin or operate 
permission on the namespace"),
             @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist")})
-    public void getTopics(@PathParam("property") String property,
-                                  @PathParam("cluster") String cluster, 
@PathParam("namespace") String namespace,
-                                  @QueryParam("mode") 
@DefaultValue("PERSISTENT") Mode mode,
-                                  @Suspended AsyncResponse asyncResponse) {
+    public void getTopics(@Suspended AsyncResponse response,
+                          @PathParam("property") String property,
+                          @PathParam("cluster") String cluster,
+                          @PathParam("namespace") String namespace,
+                          @QueryParam("mode") @DefaultValue("PERSISTENT") Mode 
mode) {
         validateNamespaceName(property, cluster, namespace);
-        validateNamespaceOperation(NamespaceName.get(property, namespace), 
NamespaceOperation.GET_TOPICS);
-
-        // Validate that namespace exists, throws 404 if it doesn't exist
-        getNamespacePolicies(namespaceName);
-
-        pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
-                .thenAccept(asyncResponse::resume)
+        validateNamespaceOperationAsync(NamespaceName.get(property, 
namespace), NamespaceOperation.GET_TOPICS)
+                // Validate that namespace exists, throws 404 if it doesn't 
exist
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenCompose(__ -> 
pulsar().getNamespaceService().getListOfTopics(namespaceName, mode))
+                .thenAccept(response::resume)
                 .exceptionally(ex -> {
                     log.error("Failed to get topics list for namespace {}", 
namespaceName, ex);
-                    asyncResponse.resume(ex);
+                    resumeAsyncResponseExceptionally(response, ex);
                     return null;
                 });
     }
@@ -150,11 +160,20 @@ public class Namespaces extends NamespacesBase {
             response = Policies.class)
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist")})
-    public Policies getPolicies(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace) {
+    public void getPolicies(@Suspended AsyncResponse response,
+                            @PathParam("property") String property,
+                            @PathParam("cluster") String cluster,
+                            @PathParam("namespace") String namespace) {
         validateNamespaceName(property, cluster, namespace);
-        validateNamespacePolicyOperation(NamespaceName.get(property, 
namespace), PolicyName.ALL, PolicyOperation.READ);
-        return getNamespacePolicies(namespaceName);
+        validateNamespacePolicyOperationAsync(NamespaceName.get(property, 
namespace), PolicyName.ALL,
+                PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(response::resume)
+                .exceptionally(ex -> {
+                    log.error("Failed to get policies for namespace {}", 
namespaceName, ex);
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @SuppressWarnings("deprecation")
@@ -165,29 +184,46 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist"),
             @ApiResponse(code = 409, message = "Namespace already exists"),
             @ApiResponse(code = 412, message = "Namespace name is not valid") 
})
-    public void createNamespace(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, BundlesData 
initialBundles) {
+    public void createNamespace(@Suspended AsyncResponse response,
+                                @PathParam("property") String property,
+                                @PathParam("cluster") String cluster,
+                                @PathParam("namespace") String namespace,
+                                BundlesData initialBundles) {
         validateNamespaceName(property, cluster, namespace);
 
+        CompletableFuture<Void> ret;
         if (!namespaceName.isGlobal()) {
             // If the namespace is non global, make sure property has the 
access on the cluster. For global namespace,
             // same check is made at the time of setting replication.
-            validateClusterForTenant(namespaceName.getTenant(), 
namespaceName.getCluster());
+            ret = validateClusterForTenantAsync(namespaceName.getTenant(), 
namespaceName.getCluster());
+        } else {
+            ret = CompletableFuture.completedFuture(null);
         }
-
-        Policies policies = new Policies();
-        if (initialBundles != null && initialBundles.getNumBundles() > 0) {
-            if (initialBundles.getBoundaries() == null || 
initialBundles.getBoundaries().size() == 0) {
-                policies.bundles = getBundles(initialBundles.getNumBundles());
+        ret.thenApply(__ -> {
+            Policies policies = new Policies();
+            if (initialBundles != null && initialBundles.getNumBundles() > 0) {
+                if (initialBundles.getBoundaries() == null || 
initialBundles.getBoundaries().size() == 0) {
+                    policies.bundles = 
getBundles(initialBundles.getNumBundles());
+                } else {
+                    policies.bundles = validateBundlesData(initialBundles);
+                }
             } else {
-                policies.bundles = validateBundlesData(initialBundles);
+                int defaultNumberOfBundles = 
config().getDefaultNumberOfNamespaceBundles();
+                policies.bundles = getBundles(defaultNumberOfBundles);
             }
-        } else {
-            int defaultNumberOfBundles = 
config().getDefaultNumberOfNamespaceBundles();
-            policies.bundles = getBundles(defaultNumberOfBundles);
-        }
-
-        internalCreateNamespace(policies);
+            return policies;
+        }).thenCompose(this::internalCreateNamespace)
+          .thenAccept(__ -> response.resume(Response.noContent().build()))
+          .exceptionally(ex -> {
+                Throwable root = FutureUtil.unwrapCompletionException(ex);
+                if (root instanceof 
MetadataStoreException.AlreadyExistsException) {
+                    response.resume(new RestException(Status.CONFLICT, 
"Namespace already exists"));
+                } else {
+                    log.error("[{}] Failed to create namespace {}", 
clientAppId(), namespaceName, ex);
+                    resumeAsyncResponseExceptionally(response, ex);
+                }
+                return null;
+           });
     }
 
     @DELETE
@@ -200,9 +236,9 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 405, message = "Broker doesn't allow forced 
deletion of namespaces"),
             @ApiResponse(code = 409, message = "Namespace is not empty") })
     public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, 
@PathParam("property") String property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace,
-            @QueryParam("force") @DefaultValue("false") boolean force,
-            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+                                @PathParam("cluster") String cluster, 
@PathParam("namespace") String namespace,
+                                @QueryParam("force") @DefaultValue("false") 
boolean force,
+                                @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative) {
         try {
             validateNamespaceName(property, cluster, namespace);
             internalDeleteNamespace(asyncResponse, authoritative, force);
@@ -236,13 +272,19 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist"),
             @ApiResponse(code = 409, message = "Namespace is not empty") })
-    public Map<String, Set<AuthAction>> getPermissions(@PathParam("property") 
String property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace) {
+    public void getPermissions(@Suspended AsyncResponse response,
+                               @PathParam("property") String property,
+                               @PathParam("cluster") String cluster,
+                               @PathParam("namespace") String namespace) {
         validateNamespaceName(property, cluster, namespace);
-        validateNamespaceOperation(NamespaceName.get(property, namespace), 
NamespaceOperation.GET_PERMISSION);
-
-        Policies policies = getNamespacePolicies(namespaceName);
-        return policies.auth_policies.getNamespaceAuthentication();
+        validateNamespaceOperationAsync(NamespaceName.get(property, 
namespace), NamespaceOperation.GET_PERMISSION)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> 
response.resume(policies.auth_policies.getNamespaceAuthentication()))
+                .exceptionally(ex -> {
+                    log.error("Failed to get permissions for namespace {}", 
namespaceName, ex);
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @GET
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index e3f82b72d2a..2769b5b3bc3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -75,7 +75,9 @@ import 
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,8 +93,15 @@ public class Namespaces extends NamespacesBase {
             response = String.class, responseContainer = "Set")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant doesn't exist")})
-    public List<String> getTenantNamespaces(@PathParam("tenant") String 
tenant) {
-        return internalGetTenantNamespaces(tenant);
+    public void getTenantNamespaces(@Suspended final AsyncResponse response,
+                                    @PathParam("tenant") String tenant) {
+        internalGetTenantNamespaces(tenant)
+                .thenAccept(response::resume)
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get namespaces list: {}", 
clientAppId(), ex);
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -102,21 +111,19 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = {
             @ApiResponse(code = 403, message = "Don't have admin or operate 
permission on the namespace"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist")})
-    public void getTopics(@PathParam("tenant") String tenant,
-                                  @PathParam("namespace") String namespace,
-                                  @QueryParam("mode") 
@DefaultValue("PERSISTENT") Mode mode,
-                                  @Suspended AsyncResponse asyncResponse) {
-        validateNamespaceName(tenant, namespace);
-        validateNamespaceOperation(NamespaceName.get(tenant, namespace), 
NamespaceOperation.GET_TOPICS);
-
-        // Validate that namespace exists, throws 404 if it doesn't exist
-        getNamespacePolicies(namespaceName);
-
-        pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
-                .thenAccept(asyncResponse::resume)
+    public void getTopics(@Suspended AsyncResponse response,
+                          @PathParam("tenant") String tenant,
+                          @PathParam("namespace") String namespace,
+                          @QueryParam("mode") @DefaultValue("PERSISTENT") Mode 
mode) {
+        validateNamespaceName(tenant, namespace);
+        validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace), 
NamespaceOperation.GET_TOPICS)
+                // Validate that namespace exists, throws 404 if it doesn't 
exist
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenCompose(__ -> 
pulsar().getNamespaceService().getListOfTopics(namespaceName, mode))
+                .thenAccept(response::resume)
                 .exceptionally(ex -> {
                     log.error("Failed to get topics list for namespace {}", 
namespaceName, ex);
-                    asyncResponse.resume(ex);
+                    resumeAsyncResponseExceptionally(response, ex);
                     return null;
                 });
     }
@@ -126,10 +133,19 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Get the dump all the policies specified for a 
namespace.", response = Policies.class)
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist") })
-    public Policies getPolicies(@PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace) {
-        validateNamespaceName(tenant, namespace);
-        validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), 
PolicyName.ALL, PolicyOperation.READ);
-        return getNamespacePolicies(namespaceName);
+    public void getPolicies(@Suspended AsyncResponse response,
+                            @PathParam("tenant") String tenant,
+                            @PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, namespace);
+        validateNamespacePolicyOperationAsync(NamespaceName.get(tenant, 
namespace), PolicyName.ALL,
+                PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(response::resume)
+                .exceptionally(ex -> {
+                    log.error("Failed to get policies for namespace {}", 
namespaceName, ex);
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @PUT
@@ -139,11 +155,24 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 404, message = "Tenant or cluster doesn't 
exist"),
             @ApiResponse(code = 409, message = "Namespace already exists"),
             @ApiResponse(code = 412, message = "Namespace name is not valid") 
})
-    public void createNamespace(@PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace,
-            @ApiParam(value = "Policies for the namespace") Policies policies) 
{
+    public void createNamespace(@Suspended AsyncResponse response,
+                                @PathParam("tenant") String tenant,
+                                @PathParam("namespace") String namespace,
+                                @ApiParam(value = "Policies for the 
namespace") Policies policies) {
         validateNamespaceName(tenant, namespace);
         policies = getDefaultPolicesIfNull(policies);
-        internalCreateNamespace(policies);
+        internalCreateNamespace(policies)
+                .thenAccept(__ -> 
response.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    Throwable root = FutureUtil.unwrapCompletionException(ex);
+                    if (root instanceof 
MetadataStoreException.AlreadyExistsException) {
+                        response.resume(new 
RestException(Response.Status.CONFLICT, "Namespace already exists"));
+                    } else {
+                        log.error("[{}] Failed to create namespace {}", 
clientAppId(), namespaceName, ex);
+                        resumeAsyncResponseExceptionally(response, ex);
+                    }
+                    return null;
+                });
     }
 
     @DELETE
@@ -156,9 +185,9 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 405, message = "Broker doesn't allow forced 
deletion of namespaces"),
             @ApiResponse(code = 409, message = "Namespace is not empty") })
     public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, 
@PathParam("tenant") String tenant,
-            @PathParam("namespace") String namespace,
-            @QueryParam("force") @DefaultValue("false") boolean force,
-            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+                                @PathParam("namespace") String namespace,
+                                @QueryParam("force") @DefaultValue("false") 
boolean force,
+                                @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative) {
         try {
             validateNamespaceName(tenant, namespace);
             internalDeleteNamespace(asyncResponse, authoritative, force);
@@ -191,13 +220,18 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist"),
             @ApiResponse(code = 409, message = "Namespace is not empty") })
-    public Map<String, Set<AuthAction>> getPermissions(@PathParam("tenant") 
String tenant,
-            @PathParam("namespace") String namespace) {
+    public void getPermissions(@Suspended AsyncResponse response,
+                                                       @PathParam("tenant") 
String tenant,
+                                                       @PathParam("namespace") 
String namespace) {
         validateNamespaceName(tenant, namespace);
-        validateNamespaceOperation(NamespaceName.get(tenant, namespace), 
NamespaceOperation.GET_PERMISSION);
-
-        Policies policies = getNamespacePolicies(namespaceName);
-        return policies.auth_policies.getNamespaceAuthentication();
+        validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace), 
NamespaceOperation.GET_PERMISSION)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> 
response.resume(policies.auth_policies.getNamespaceAuthentication()))
+                .exceptionally(ex -> {
+                    log.error("Failed to get permissions for namespace {}", 
namespaceName, ex);
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @GET
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 1690210a80a..38c61d3688d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -415,6 +415,21 @@ public abstract class PulsarWebResource {
         log.info("Successfully validated clusters on tenant [{}]", tenant);
     }
 
+    protected CompletableFuture<Void> validateClusterForTenantAsync(String 
tenant, String cluster) {
+        return 
pulsar().getPulsarResources().getTenantResources().getTenantAsync(tenant)
+                .thenAccept(tenantInfo -> {
+                    if (!tenantInfo.isPresent()) {
+                        throw new RestException(Status.NOT_FOUND, "Tenant does 
not exist");
+                    }
+                    if 
(!tenantInfo.get().getAllowedClusters().contains(cluster)) {
+                        String msg = String.format("Cluster [%s] is not in the 
list of allowed clusters list"
+                                        + " for tenant [%s]", cluster, tenant);
+                        log.info(msg);
+                        throw new RestException(Status.FORBIDDEN, msg);
+                    }
+                });
+    }
+
     protected CompletableFuture<Void> validateClusterOwnershipAsync(String 
cluster) {
         return getClusterDataIfDifferentCluster(pulsar(), cluster, 
clientAppId())
                 .thenAccept(differentClusterData -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index bfc29250028..583a87275bd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -38,17 +38,12 @@ import java.lang.reflect.Field;
 import java.net.URI;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
 import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.container.TimeoutHandler;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
@@ -196,7 +191,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
                 new ClientConfiguration().getZkLedgersRootPath(),
                 conf.isBookkeeperMetadataStoreSeparated() ? 
conf.getBookkeeperMetadataStoreUrl() : null,
                 
pulsar.getWorkerConfig().map(WorkerConfig::getStateStorageServiceUrl).orElse(null));
-        Object response = asynRequests(ctx -> 
brokers.getInternalConfigurationData(ctx));
+        Object response = asyncRequests(ctx -> 
brokers.getInternalConfigurationData(ctx));
         assertTrue(response instanceof InternalConfigurationData);
         assertEquals(response, expectedData);
     }
@@ -204,18 +199,18 @@ public class AdminTest extends 
MockedPulsarServiceBaseTest {
     @Test
     @SuppressWarnings("unchecked")
     public void clusters() throws Exception {
-        assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)), 
Sets.newHashSet());
+        assertEquals(asyncRequests(ctx -> clusters.getClusters(ctx)), 
Sets.newHashSet());
         verify(clusters, never()).validateSuperUserAccessAsync();
 
-        asynRequests(ctx -> clusters.createCluster(ctx,
+        asyncRequests(ctx -> clusters.createCluster(ctx,
                 "use", 
ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080";).build()));
         // ensure to read from ZooKeeper directly
         //clusters.clustersListCache().clear();
-        assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)), 
Sets.newHashSet("use"));
+        assertEquals(asyncRequests(ctx -> clusters.getClusters(ctx)), 
Sets.newHashSet("use"));
 
         // Check creating existing cluster
         try {
-            asynRequests(ctx -> clusters.createCluster(ctx, "use",
+            asyncRequests(ctx -> clusters.createCluster(ctx, "use",
                     
ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080";).build()));
             fail("should have failed");
         } catch (RestException e) {
@@ -224,23 +219,23 @@ public class AdminTest extends 
MockedPulsarServiceBaseTest {
 
         // Check deleting non-existing cluster
         try {
-            asynRequests(ctx -> clusters.deleteCluster(ctx, "usc"));
+            asyncRequests(ctx -> clusters.deleteCluster(ctx, "usc"));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.NOT_FOUND.getStatusCode());
         }
 
-        assertEquals(asynRequests(ctx -> clusters.getCluster(ctx, "use")),
+        assertEquals(asyncRequests(ctx -> clusters.getCluster(ctx, "use")),
                 
ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080";).build());
 
-        asynRequests(ctx -> clusters.updateCluster(ctx, "use",
+        asyncRequests(ctx -> clusters.updateCluster(ctx, "use",
                 
ClusterDataImpl.builder().serviceUrl("http://new-broker.messaging.use.example.com:8080";).build()));
 
-        assertEquals(asynRequests(ctx -> clusters.getCluster(ctx, "use")),
+        assertEquals(asyncRequests(ctx -> clusters.getCluster(ctx, "use")),
                 
ClusterData.builder().serviceUrl("http://new-broker.messaging.use.example.com:8080";).build());
 
         try {
-            asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, 
"use"));
+            asyncRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, 
"use"));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 404);
@@ -260,38 +255,38 @@ public class AdminTest extends 
MockedPulsarServiceBaseTest {
                 .build();
         AsyncResponse response = mock(AsyncResponse.class);
         clusters.setNamespaceIsolationPolicy(response,"use", "policy1", 
policyData);
-        asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, 
"use"));
+        asyncRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, 
"use"));
 
         try {
-            asynRequests(ctx -> clusters.deleteCluster(ctx, "use"));
+            asyncRequests(ctx -> clusters.deleteCluster(ctx, "use"));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 412);
         }
 
         clusters.deleteNamespaceIsolationPolicy("use", "policy1");
-        assertTrue(((Map<String, NamespaceIsolationDataImpl>) asynRequests(ctx 
->
+        assertTrue(((Map<String, NamespaceIsolationDataImpl>) 
asyncRequests(ctx ->
                 clusters.getNamespaceIsolationPolicies(ctx, 
"use"))).isEmpty());
 
-        asynRequests(ctx -> clusters.deleteCluster(ctx, "use"));
-        assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)), 
Sets.newHashSet());
+        asyncRequests(ctx -> clusters.deleteCluster(ctx, "use"));
+        assertEquals(asyncRequests(ctx -> clusters.getClusters(ctx)), 
Sets.newHashSet());
 
         try {
-            asynRequests(ctx -> clusters.getCluster(ctx, "use"));
+            asyncRequests(ctx -> clusters.getCluster(ctx, "use"));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 404);
         }
 
         try {
-            asynRequests(ctx -> clusters.updateCluster(ctx, "use", 
ClusterDataImpl.builder().build()));
+            asyncRequests(ctx -> clusters.updateCluster(ctx, "use", 
ClusterDataImpl.builder().build()));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 404);
         }
 
         try {
-            asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, 
"use"));
+            asyncRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, 
"use"));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 404);
@@ -311,7 +306,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         clusterCache.invalidateAll();
         store.invalidateAll();
         try {
-            asynRequests(ctx -> clusters.getClusters(ctx));
+            asyncRequests(ctx -> clusters.getClusters(ctx));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -322,7 +317,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
                     && path.equals("/admin/clusters/test");
             });
         try {
-            asynRequests(ctx -> clusters.createCluster(ctx, "test",
+            asyncRequests(ctx -> clusters.createCluster(ctx, "test",
                     
ClusterDataImpl.builder().serviceUrl("http://broker.messaging.test.example.com:8080";).build()));
             fail("should have failed");
         } catch (RestException e) {
@@ -336,7 +331,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         clusterCache.invalidateAll();
         store.invalidateAll();
         try {
-            asynRequests(ctx -> clusters.updateCluster(ctx, "test",
+            asyncRequests(ctx -> clusters.updateCluster(ctx, "test",
                     
ClusterDataImpl.builder().serviceUrl("http://broker.messaging.test.example.com";).build()));
             fail("should have failed");
         } catch (RestException e) {
@@ -349,7 +344,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
             });
 
         try {
-            asynRequests(ctx -> clusters.getCluster(ctx, "test"));
+            asyncRequests(ctx -> clusters.getCluster(ctx, "test"));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -361,7 +356,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
             });
 
         try {
-            asynRequests(ctx -> clusters.deleteCluster(ctx, "use"));
+            asyncRequests(ctx -> clusters.deleteCluster(ctx, "use"));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -375,7 +370,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         isolationPolicyCache.invalidateAll();
         store.invalidateAll();
         try {
-            asynRequests(ctx -> clusters.deleteCluster(ctx, "use"));
+            asyncRequests(ctx -> clusters.deleteCluster(ctx, "use"));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -383,7 +378,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
 
         // Check name validations
         try {
-            asynRequests(ctx -> clusters.createCluster(ctx, "bf@",
+            asyncRequests(ctx -> clusters.createCluster(ctx, "bf@",
                     
ClusterDataImpl.builder().serviceUrl("http://dummy.messaging.example.com";).build()));
             fail("should have filed");
         } catch (RestException e) {
@@ -392,7 +387,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
 
         // Check authentication and listener name
         try {
-            asynRequests(ctx -> clusters.createCluster(ctx, "auth", 
ClusterDataImpl.builder()
+            asyncRequests(ctx -> clusters.createCluster(ctx, "auth", 
ClusterDataImpl.builder()
                     .serviceUrl("http://dummy.web.example.com";)
                     .serviceUrlTls("")
                     .brokerServiceUrl("http://dummy.messaging.example.com";)
@@ -401,7 +396,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
                     .authenticationParameters("authenticationParameters")
                     .listenerName("listenerName")
                     .build()));
-            ClusterData cluster = (ClusterData) asynRequests(ctx -> 
clusters.getCluster(ctx, "auth"));
+            ClusterData cluster = (ClusterData) asyncRequests(ctx -> 
clusters.getCluster(ctx, "auth"));
             assertEquals(cluster.getAuthenticationPlugin(), 
"authenticationPlugin");
             assertEquals(cluster.getAuthenticationParameters(), 
"authenticationParameters");
             assertEquals(cluster.getListenerName(), "listenerName");
@@ -412,23 +407,14 @@ public class AdminTest extends 
MockedPulsarServiceBaseTest {
         verify(clusters, times(2)).validateSuperUserAccess();
     }
 
-    Object asynRequests(Consumer<TestAsyncResponse> function) throws Exception 
{
-        TestAsyncResponse ctx = new TestAsyncResponse();
-        function.accept(ctx);
-        ctx.latch.await();
-        if (ctx.e != null) {
-            throw (Exception) ctx.e;
-        }
-        return ctx.response;
-    }
     @Test
     public void properties() throws Throwable {
-        Object response = asynRequests(ctx -> properties.getTenants(ctx));
+        Object response = asyncRequests(ctx -> properties.getTenants(ctx));
         assertEquals(response, Lists.newArrayList());
         verify(properties, times(1)).validateSuperUserAccess();
 
         // create local cluster
-        asynRequests(ctx -> clusters.createCluster(ctx, configClusterName, 
ClusterDataImpl.builder().build()));
+        asyncRequests(ctx -> clusters.createCluster(ctx, configClusterName, 
ClusterDataImpl.builder().build()));
 
         Set<String> allowedClusters = Sets.newHashSet();
         allowedClusters.add(configClusterName);
@@ -436,14 +422,14 @@ public class AdminTest extends 
MockedPulsarServiceBaseTest {
                 .adminRoles(Sets.newHashSet("role1", "role2"))
                 .allowedClusters(allowedClusters)
                 .build();
-        response = asynRequests(ctx -> properties.createTenant(ctx, 
"test-property", tenantInfo));
+        response = asyncRequests(ctx -> properties.createTenant(ctx, 
"test-property", tenantInfo));
         verify(properties, times(2)).validateSuperUserAccess();
 
-        response = asynRequests(ctx -> properties.getTenants(ctx));
+        response = asyncRequests(ctx -> properties.getTenants(ctx));
         assertEquals(response, Lists.newArrayList("test-property"));
         verify(properties, times(3)).validateSuperUserAccess();
 
-        response = asynRequests(ctx -> properties.getTenantAdmin(ctx, 
"test-property"));
+        response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, 
"test-property"));
         assertEquals(response, tenantInfo);
         verify(properties, times(4)).validateSuperUserAccess();
 
@@ -451,21 +437,21 @@ public class AdminTest extends 
MockedPulsarServiceBaseTest {
                 .adminRoles(Sets.newHashSet("role1", "other-role"))
                 .allowedClusters(allowedClusters)
                 .build();
-        response = asynRequests(ctx -> properties.updateTenant(ctx, 
"test-property", newPropertyAdmin));
+        response = asyncRequests(ctx -> properties.updateTenant(ctx, 
"test-property", newPropertyAdmin));
         verify(properties, times(5)).validateSuperUserAccess();
 
         // Wait for updateTenant to take effect
         Thread.sleep(100);
 
-        response = asynRequests(ctx -> properties.getTenantAdmin(ctx, 
"test-property"));
+        response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, 
"test-property"));
         assertEquals(response, newPropertyAdmin);
-        response = asynRequests(ctx -> properties.getTenantAdmin(ctx, 
"test-property"));
+        response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, 
"test-property"));
         assertNotSame(response, tenantInfo);
         verify(properties, times(7)).validateSuperUserAccess();
 
         // Check creating existing property
         try {
-            response = asynRequests(ctx -> properties.createTenant(ctx, 
"test-property", tenantInfo));
+            response = asyncRequests(ctx -> properties.createTenant(ctx, 
"test-property", tenantInfo));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.CONFLICT.getStatusCode());
@@ -473,14 +459,14 @@ public class AdminTest extends 
MockedPulsarServiceBaseTest {
 
         // Check non-existing property
         try {
-            response = asynRequests(ctx -> properties.getTenantAdmin(ctx, 
"non-existing"));
+            response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, 
"non-existing"));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.NOT_FOUND.getStatusCode());
         }
 
         try {
-            response = asynRequests(ctx -> properties.updateTenant(ctx, 
"xxx-non-existing", newPropertyAdmin));
+            response = asyncRequests(ctx -> properties.updateTenant(ctx, 
"xxx-non-existing", newPropertyAdmin));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.NOT_FOUND.getStatusCode());
@@ -488,7 +474,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
 
         // Check deleting non-existing property
         try {
-            response = asynRequests(ctx -> properties.deleteTenant(ctx, 
"non-existing", false));
+            response = asyncRequests(ctx -> properties.deleteTenant(ctx, 
"non-existing", false));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.NOT_FOUND.getStatusCode());
@@ -505,7 +491,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
             return op == MockZooKeeper.Op.GET_CHILDREN && 
path.equals("/admin/policies");
         });
         try {
-            response = asynRequests(ctx -> properties.getTenants(ctx));
+            response = asyncRequests(ctx -> properties.getTenants(ctx));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -515,7 +501,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
             return op == MockZooKeeper.Op.GET && 
path.equals("/admin/policies/my-tenant");
         });
         try {
-            response = asynRequests(ctx -> properties.getTenantAdmin(ctx, 
"my-tenant"));
+            response = asyncRequests(ctx -> properties.getTenantAdmin(ctx, 
"my-tenant"));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -525,7 +511,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
             return op == MockZooKeeper.Op.GET && 
path.equals("/admin/policies/my-tenant");
         });
         try {
-            response = asynRequests(ctx -> properties.updateTenant(ctx, 
"my-tenant", newPropertyAdmin));
+            response = asyncRequests(ctx -> properties.updateTenant(ctx, 
"my-tenant", newPropertyAdmin));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -535,7 +521,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
             return op == MockZooKeeper.Op.CREATE && 
path.equals("/admin/policies/test");
         });
         try {
-            response = asynRequests(ctx -> properties.createTenant(ctx, 
"test", tenantInfo));
+            response = asyncRequests(ctx -> properties.createTenant(ctx, 
"test", tenantInfo));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -547,28 +533,28 @@ public class AdminTest extends 
MockedPulsarServiceBaseTest {
         try {
             cache.invalidateAll();
             store.invalidateAll();
-            response = asynRequests(ctx -> properties.deleteTenant(ctx, 
"test-property", false));
+            response = asyncRequests(ctx -> properties.deleteTenant(ctx, 
"test-property", false));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.INTERNAL_SERVER_ERROR.getStatusCode());
         }
 
-        response = asynRequests(ctx -> properties.createTenant(ctx, 
"error-property", tenantInfo));
+        response = asyncRequests(ctx -> properties.createTenant(ctx, 
"error-property", tenantInfo));
 
         mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> 
{
             return op == MockZooKeeper.Op.DELETE && 
path.equals("/admin/policies/error-property");
         });
         try {
-            response = asynRequests(ctx -> properties.deleteTenant(ctx, 
"error-property", false));
+            response = asyncRequests(ctx -> properties.deleteTenant(ctx, 
"error-property", false));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.INTERNAL_SERVER_ERROR.getStatusCode());
         }
 
-        response = asynRequests(ctx -> properties.deleteTenant(ctx, 
"test-property", false));
-        response = asynRequests(ctx -> properties.deleteTenant(ctx, 
"error-property", false));
+        response = asyncRequests(ctx -> properties.deleteTenant(ctx, 
"test-property", false));
+        response = asyncRequests(ctx -> properties.deleteTenant(ctx, 
"error-property", false));
         response = Lists.newArrayList();
-        response = asynRequests(ctx -> properties.getTenants(ctx));
+        response = asyncRequests(ctx -> properties.getTenants(ctx));
         assertEquals(response, Lists.newArrayList());
 
         // Create a namespace to test deleting a non-empty property
@@ -576,12 +562,12 @@ public class AdminTest extends 
MockedPulsarServiceBaseTest {
                 .adminRoles(Sets.newHashSet("role1", "other-role"))
                 .allowedClusters(Sets.newHashSet("use"))
                 .build();
-        response = asynRequests(ctx -> properties.createTenant(ctx, 
"my-tenant", newPropertyAdmin2));
+        response = asyncRequests(ctx -> properties.createTenant(ctx, 
"my-tenant", newPropertyAdmin2));
 
-        namespaces.createNamespace("my-tenant", "use", "my-namespace", 
BundlesData.builder().build());
+        response = asyncRequests(ctx -> 
namespaces.createNamespace(ctx,"my-tenant", "use", "my-namespace", 
BundlesData.builder().build()));
 
         try {
-            response = asynRequests(ctx -> properties.deleteTenant(ctx, 
"my-tenant", false));
+            response = asyncRequests(ctx -> properties.deleteTenant(ctx, 
"my-tenant", false));
             fail("should have failed");
         } catch (RestException e) {
             // Ok
@@ -589,7 +575,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
 
         // Check name validation
         try {
-            response = asynRequests(ctx -> properties.createTenant(ctx, 
"test&", tenantInfo));
+            response = asyncRequests(ctx -> properties.createTenant(ctx, 
"test&", tenantInfo));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.PRECONDITION_FAILED.getStatusCode());
@@ -597,7 +583,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
 
         // Check tenantInfo is null
         try {
-            response = asynRequests(ctx -> properties.createTenant(ctx, 
"tenant-config-is-null", null));
+            response = asyncRequests(ctx -> properties.createTenant(ctx, 
"tenant-config-is-null", null));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.PRECONDITION_FAILED.getStatusCode());
@@ -611,7 +597,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
                 .allowedClusters(blankClusters)
                 .build();
         try {
-            response = asynRequests(ctx -> properties.createTenant(ctx, 
"tenant-config-is-empty", tenantWithEmptyCluster));
+            response = asyncRequests(ctx -> properties.createTenant(ctx, 
"tenant-config-is-empty", tenantWithEmptyCluster));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.PRECONDITION_FAILED.getStatusCode());
@@ -625,7 +611,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
                 .allowedClusters(containBlankClusters)
                 .build();
         try {
-            response = asynRequests(ctx -> properties.createTenant(ctx, 
"tenant-config-contain-empty", tenantContainEmptyCluster));
+            response = asyncRequests(ctx -> properties.createTenant(ctx, 
"tenant-config-contain-empty", tenantContainEmptyCluster));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.PRECONDITION_FAILED.getStatusCode());
@@ -636,13 +622,13 @@ public class AdminTest extends 
MockedPulsarServiceBaseTest {
         ArgumentCaptor<Response> captor = 
ArgumentCaptor.forClass(Response.class);
         verify(response2, timeout(5000).times(1)).resume(captor.capture());
         assertEquals(captor.getValue().getStatus(), 
Status.NO_CONTENT.getStatusCode());
-        response = asynRequests(ctx -> properties.deleteTenant(ctx, 
"my-tenant", false));
+        response = asyncRequests(ctx -> properties.deleteTenant(ctx, 
"my-tenant", false));
     }
 
     @Test
     @SuppressWarnings("unchecked")
     public void brokers() throws Exception {
-        asynRequests(ctx -> clusters.createCluster(ctx, "use", 
ClusterDataImpl.builder()
+        asyncRequests(ctx -> clusters.createCluster(ctx, "use", 
ClusterDataImpl.builder()
                 .serviceUrl("http://broker.messaging.use.example.com";)
                 .serviceUrlTls("https://broker.messaging.use.example.com:4443";)
                 .build()));
@@ -654,12 +640,12 @@ public class AdminTest extends 
MockedPulsarServiceBaseTest {
         Field uriField = PulsarWebResource.class.getDeclaredField("uri");
         uriField.setAccessible(true);
         uriField.set(brokers, mockUri);
-        Object res = asynRequests(ctx -> brokers.getActiveBrokers(ctx, "use"));
+        Object res = asyncRequests(ctx -> brokers.getActiveBrokers(ctx, 
"use"));
         assertTrue(res instanceof Set);
         Set<String> activeBrokers = (Set<String>) res;
         assertEquals(activeBrokers.size(), 1);
         assertEquals(activeBrokers, 
Sets.newHashSet(pulsar.getAdvertisedAddress() + ":" + 
pulsar.getListenPortHTTP().get()));
-        Object leaderBrokerRes = asynRequests(ctx -> 
brokers.getLeaderBroker(ctx));
+        Object leaderBrokerRes = asyncRequests(ctx -> 
brokers.getLeaderBroker(ctx));
         assertTrue(leaderBrokerRes instanceof BrokerInfo);
         BrokerInfo leaderBroker = (BrokerInfo)leaderBrokerRes;
         assertEquals(leaderBroker.getServiceUrl(), 
pulsar.getLeaderElectionService().getCurrentLeader().map(LeaderBroker::getServiceUrl).get());
@@ -707,8 +693,8 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
                 .allowedClusters(Collections.singleton(cluster))
                 .build();
         ClusterDataImpl clusterData = 
ClusterDataImpl.builder().serviceUrl(cluster).build();
-        asynRequests(ctx -> clusters.createCluster(ctx, cluster, clusterData 
));
-        asynRequests(ctx -> properties.createTenant(ctx, property, admin));
+        asyncRequests(ctx -> clusters.createCluster(ctx, cluster, clusterData 
));
+        asyncRequests(ctx -> properties.createTenant(ctx, property, admin));
 
         // customized bandwidth for this namespace
         double customizeBandwidth = 3000;
@@ -876,87 +862,4 @@ public class AdminTest extends MockedPulsarServiceBaseTest 
{
         
Assert.assertEquals(responseCaptor.getValue().getResponse().getStatus(), 
Status.INTERNAL_SERVER_ERROR.getStatusCode());
         
Assert.assertTrue(((ErrorData)responseCaptor.getValue().getResponse().getEntity()).reason.contains("500
 error contains error message"));
     }
-
-
-    static class TestAsyncResponse implements AsyncResponse {
-
-        Object response;
-        Throwable e;
-        CountDownLatch latch = new CountDownLatch(1);
-
-        @Override
-        public boolean resume(Object response) {
-            this.response = response;
-            latch.countDown();
-            return true;
-        }
-
-        @Override
-        public boolean resume(Throwable response) {
-            this.e = response;
-            latch.countDown();
-            return true;
-        }
-
-        @Override
-        public boolean cancel() {
-            return false;
-        }
-
-        @Override
-        public boolean cancel(int retryAfter) {
-            return false;
-        }
-
-        @Override
-        public boolean cancel(Date retryAfter) {
-            return false;
-        }
-
-        @Override
-        public boolean isSuspended() {
-            return false;
-        }
-
-        @Override
-        public boolean isCancelled() {
-            return false;
-        }
-
-        @Override
-        public boolean isDone() {
-            return false;
-        }
-
-        @Override
-        public boolean setTimeout(long time, TimeUnit unit) {
-            return false;
-        }
-
-        @Override
-        public void setTimeoutHandler(TimeoutHandler handler) {
-
-        }
-
-        @Override
-        public Collection<Class<?>> register(Class<?> callback) {
-            return null;
-        }
-
-        @Override
-        public Map<Class<?>, Collection<Class<?>>> register(Class<?> callback, 
Class<?>... callbacks) {
-            return null;
-        }
-
-        @Override
-        public Collection<Class<?>> register(Object callback) {
-            return null;
-        }
-
-        @Override
-        public Map<Class<?>, Collection<Class<?>>> register(Object callback, 
Object... callbacks) {
-            return null;
-        }
-
-    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 0eb87491a52..a02f7d6c934 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -203,8 +203,8 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
     @Test
     public void testCreateNamespaces() throws Exception {
         try {
-            namespaces.createNamespace(this.testTenant, "other-colo", 
"my-namespace",
-                    BundlesData.builder().build());
+            asyncRequests(response -> namespaces.createNamespace(response, 
this.testTenant, "other-colo", "my-namespace",
+                    BundlesData.builder().build()));
             fail("should have failed");
         } catch (RestException e) {
             // Ok, cluster doesn't exist
@@ -217,24 +217,24 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         createTestNamespaces(nsnames, BundlesData.builder().build());
 
         try {
-            namespaces.createNamespace(this.testTenant, "use", 
"create-namespace-1",
-                    BundlesData.builder().build());
+            asyncRequests(response -> namespaces.createNamespace(response, 
this.testTenant, "use", "create-namespace-1",
+                    BundlesData.builder().build()));
             fail("should have failed");
         } catch (RestException e) {
             // Ok, namespace already exists
         }
 
         try {
-            namespaces.createNamespace("non-existing-tenant", "use", 
"create-namespace-1",
-                    BundlesData.builder().build());
+            asyncRequests(response -> 
namespaces.createNamespace(response,"non-existing-tenant", "use", 
"create-namespace-1",
+                    BundlesData.builder().build()));
             fail("should have failed");
         } catch (RestException e) {
             // Ok, tenant doesn't exist
         }
 
         try {
-            namespaces.createNamespace(this.testTenant, "use", 
"create-namespace-#",
-                    BundlesData.builder().build());
+            asyncRequests(response -> namespaces.createNamespace(response, 
this.testTenant, "use", "create-namespace-#",
+                    BundlesData.builder().build()));
             fail("should have failed");
         } catch (RestException e) {
             // Ok, invalid namespace name
@@ -246,7 +246,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
                     && 
path.equals("/admin/policies/my-tenant/use/my-namespace-3");
             });
         try {
-            namespaces.createNamespace(this.testTenant, "use", 
"my-namespace-3", BundlesData.builder().build());
+            asyncRequests(response -> namespaces.createNamespace(response, 
this.testTenant, "use", "my-namespace-3", BundlesData.builder().build()));
             fail("should have failed");
         } catch (RestException e) {
             // Ok
@@ -263,18 +263,24 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
                 this.testLocalNamespaces.get(1).toString(), 
this.testLocalNamespaces.get(2).toString(),
                 this.testGlobalNamespaces.get(0).toString());
         expectedList.sort(null);
-        assertEquals(namespaces.getTenantNamespaces(this.testTenant), 
expectedList);
+        AsyncResponse response = mock(AsyncResponse.class);
+        namespaces.getTenantNamespaces(response, this.testTenant);
+        ArgumentCaptor<Response> captor = 
ArgumentCaptor.forClass(Response.class);
+        verify(response, timeout(5000).times(1)).resume(captor.capture());
+        List<String> namespacesList = (List<String>) captor.getValue();
+        namespacesList.sort(null);
+        assertEquals(namespacesList, expectedList);
 
         try {
             // check the tenant name is valid
-            namespaces.getTenantNamespaces(this.testTenant + "/default");
+            asyncRequests(ctx -> namespaces.getTenantNamespaces(ctx, 
this.testTenant + "/default"));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.PRECONDITION_FAILED.getStatusCode());
         }
 
         try {
-            namespaces.getTenantNamespaces("non-existing-tenant");
+            asyncRequests(ctx -> namespaces.getTenantNamespaces(ctx, 
"non-existing-tenant"));
             fail("should have failed");
         } catch (RestException e) {
             // Ok, does not exist
@@ -299,7 +305,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         tenantCache.invalidateAll();
         store.invalidateAll();
         try {
-            namespaces.getTenantNamespaces(this.testTenant);
+            asyncRequests(ctx -> namespaces.getTenantNamespaces(ctx, 
this.testTenant));
             fail("should have failed");
         } catch (RestException e) {
             // Ok
@@ -321,46 +327,46 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
     @Test(enabled = false)
     public void testGrantAndRevokePermissions() throws Exception {
         Policies expectedPolicies = new Policies();
-        assertEquals(namespaces.getPolicies(this.testTenant, 
this.testLocalCluster,
-                this.testLocalNamespaces.get(0).getLocalName()), 
expectedPolicies);
-        assertEquals(namespaces.getPermissions(this.testTenant, 
this.testLocalCluster,
-                this.testLocalNamespaces.get(0).getLocalName()), 
expectedPolicies.auth_policies.getNamespaceAuthentication());
+        assertEquals(asyncRequests(ctx -> namespaces.getPolicies(ctx, 
this.testTenant, this.testLocalCluster,
+                this.testLocalNamespaces.get(0).getLocalName())), 
expectedPolicies);
+        assertEquals(asyncRequests(ctx -> namespaces.getPermissions(ctx, 
this.testTenant, this.testLocalCluster,
+                this.testLocalNamespaces.get(0).getLocalName())), 
expectedPolicies.auth_policies.getNamespaceAuthentication());
 
         namespaces.grantPermissionOnNamespace(this.testTenant, 
this.testLocalCluster,
                 this.testLocalNamespaces.get(0).getLocalName(), "my-role", 
EnumSet.of(AuthAction.produce));
 
         
expectedPolicies.auth_policies.getNamespaceAuthentication().put("my-role", 
EnumSet.of(AuthAction.produce));
-        assertEquals(namespaces.getPolicies(this.testTenant, 
this.testLocalCluster,
-                this.testLocalNamespaces.get(0).getLocalName()), 
expectedPolicies);
-        assertEquals(namespaces.getPermissions(this.testTenant, 
this.testLocalCluster,
-                this.testLocalNamespaces.get(0).getLocalName()), 
expectedPolicies.auth_policies.getNamespaceAuthentication());
+        assertEquals(asyncRequests(ctx -> namespaces.getPolicies(ctx, 
this.testTenant, this.testLocalCluster,
+                this.testLocalNamespaces.get(0).getLocalName())), 
expectedPolicies);
+        assertEquals(asyncRequests(ctx -> namespaces.getPermissions(ctx, 
this.testTenant, this.testLocalCluster,
+                this.testLocalNamespaces.get(0).getLocalName())), 
expectedPolicies.auth_policies.getNamespaceAuthentication());
 
         namespaces.grantPermissionOnNamespace(this.testTenant, 
this.testLocalCluster,
                 this.testLocalNamespaces.get(0).getLocalName(), "other-role", 
EnumSet.of(AuthAction.consume));
         
expectedPolicies.auth_policies.getNamespaceAuthentication().put("other-role", 
EnumSet.of(AuthAction.consume));
-        assertEquals(namespaces.getPolicies(this.testTenant, 
this.testLocalCluster,
-                this.testLocalNamespaces.get(0).getLocalName()), 
expectedPolicies);
-        assertEquals(namespaces.getPermissions(this.testTenant, 
this.testLocalCluster,
-                this.testLocalNamespaces.get(0).getLocalName()), 
expectedPolicies.auth_policies.getNamespaceAuthentication());
+        assertEquals(asyncRequests(ctx -> namespaces.getPolicies(ctx, 
this.testTenant, this.testLocalCluster,
+                this.testLocalNamespaces.get(0).getLocalName())), 
expectedPolicies);
+        assertEquals(asyncRequests(ctx -> namespaces.getPermissions(ctx, 
this.testTenant, this.testLocalCluster,
+                this.testLocalNamespaces.get(0).getLocalName())), 
expectedPolicies.auth_policies.getNamespaceAuthentication());
 
         namespaces.revokePermissionsOnNamespace(this.testTenant, 
this.testLocalCluster,
                 this.testLocalNamespaces.get(0).getLocalName(), "my-role");
         
expectedPolicies.auth_policies.getNamespaceAuthentication().remove("my-role");
-        assertEquals(namespaces.getPolicies(this.testTenant, 
this.testLocalCluster,
-                this.testLocalNamespaces.get(0).getLocalName()), 
expectedPolicies);
-        assertEquals(namespaces.getPermissions(this.testTenant, 
this.testLocalCluster,
-                this.testLocalNamespaces.get(0).getLocalName()), 
expectedPolicies.auth_policies.getNamespaceAuthentication());
+        assertEquals(asyncRequests(ctx -> namespaces.getPolicies(ctx, 
this.testTenant, this.testLocalCluster,
+                this.testLocalNamespaces.get(0).getLocalName())), 
expectedPolicies);
+        assertEquals(asyncRequests(ctx -> namespaces.getPermissions(ctx, 
this.testTenant, this.testLocalCluster,
+                this.testLocalNamespaces.get(0).getLocalName())), 
expectedPolicies.auth_policies.getNamespaceAuthentication());
 
         // Non-existing namespaces
         try {
-            namespaces.getPolicies(this.testTenant, this.testLocalCluster, 
"non-existing-namespace-1");
+            asyncRequests(ctx -> namespaces.getPolicies(ctx, this.testTenant, 
this.testLocalCluster, "non-existing-namespace-1"));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.NOT_FOUND.getStatusCode());
         }
 
         try {
-            namespaces.getPermissions(this.testTenant, this.testLocalCluster, 
"non-existing-namespace-1");
+            asyncRequests(ctx -> namespaces.getPermissions(ctx, 
this.testTenant, this.testLocalCluster, "non-existing-namespace-1"));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 
Status.NOT_FOUND.getStatusCode());
@@ -393,7 +399,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
             });
 
         try {
-            namespaces.getPolicies(testNs.getTenant(), testNs.getCluster(), 
testNs.getLocalName());
+            asyncRequests(ctx -> namespaces.getPolicies(ctx, 
testNs.getTenant(), testNs.getCluster(), testNs.getLocalName()));
             fail("should have failed");
         } catch (RestException e) {
             // Ok
@@ -407,7 +413,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
                 return true;
             });
         try {
-            namespaces.getPermissions(testNs.getTenant(), testNs.getCluster(), 
testNs.getLocalName());
+            asyncRequests(ctx -> namespaces.getPermissions(ctx, 
testNs.getTenant(), testNs.getCluster(), testNs.getLocalName()));
             fail("should have failed");
         } catch (RestException e) {
             // Ok
@@ -758,7 +764,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         List<String> nsList = 
Lists.newArrayList(this.testLocalNamespaces.get(1).toString(),
                 this.testLocalNamespaces.get(2).toString());
         nsList.sort(null);
-        assertEquals(namespaces.getTenantNamespaces(this.testTenant), nsList);
+        assertEquals(asyncRequests(ctx -> namespaces.getTenantNamespaces(ctx, 
this.testTenant)), nsList);
 
         testNs = this.testLocalNamespaces.get(1);
         // setup ownership to localhost
@@ -978,16 +984,16 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
 
     private void createBundledTestNamespaces(String property, String cluster, 
String namespace, BundlesData bundle)
             throws Exception {
-        namespaces.createNamespace(property, cluster, namespace, bundle);
+        asyncRequests(ctx -> namespaces.createNamespace(ctx, property, 
cluster, namespace, bundle));
     }
 
     private void createGlobalTestNamespaces(String property, String namespace, 
BundlesData bundle) throws Exception {
-        namespaces.createNamespace(property, "global", namespace, bundle);
+        asyncRequests(ctx -> namespaces.createNamespace(ctx, property, 
"global", namespace, bundle));
     }
 
     private void createTestNamespaces(List<NamespaceName> nsnames, BundlesData 
bundle) throws Exception {
         for (NamespaceName nsName : nsnames) {
-            namespaces.createNamespace(nsName.getTenant(), 
nsName.getCluster(), nsName.getLocalName(), bundle);
+            asyncRequests(ctx -> namespaces.createNamespace(ctx, 
nsName.getTenant(), nsName.getCluster(), nsName.getLocalName(), bundle));
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 55631aea465..88fe6ece0bb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -33,11 +33,15 @@ import java.net.URI;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -70,6 +74,9 @@ import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.TimeoutHandler;
+
 /**
  * Base class for all tests that need a Pulsar instance without a ZK and BK 
cluster.
  */
@@ -499,5 +506,97 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
         }
     }
 
+    protected Object asyncRequests(Consumer<TestAsyncResponse> function) 
throws Exception {
+        TestAsyncResponse ctx = new TestAsyncResponse();
+        function.accept(ctx);
+        ctx.latch.await();
+        if (ctx.e != null) {
+            throw (Exception) ctx.e;
+        }
+        return ctx.response;
+    }
+
+    public static class TestAsyncResponse implements AsyncResponse {
+
+        Object response;
+        Throwable e;
+        CountDownLatch latch = new CountDownLatch(1);
+
+        @Override
+        public boolean resume(Object response) {
+            this.response = response;
+            latch.countDown();
+            return true;
+        }
+
+        @Override
+        public boolean resume(Throwable response) {
+            this.e = response;
+            latch.countDown();
+            return true;
+        }
+
+        @Override
+        public boolean cancel() {
+            return false;
+        }
+
+        @Override
+        public boolean cancel(int retryAfter) {
+            return false;
+        }
+
+        @Override
+        public boolean cancel(Date retryAfter) {
+            return false;
+        }
+
+        @Override
+        public boolean isSuspended() {
+            return false;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return false;
+        }
+
+        @Override
+        public boolean isDone() {
+            return false;
+        }
+
+        @Override
+        public boolean setTimeout(long time, TimeUnit unit) {
+            return false;
+        }
+
+        @Override
+        public void setTimeoutHandler(TimeoutHandler handler) {
+
+        }
+
+        @Override
+        public Collection<Class<?>> register(Class<?> callback) {
+            return null;
+        }
+
+        @Override
+        public Map<Class<?>, Collection<Class<?>>> register(Class<?> callback, 
Class<?>... callbacks) {
+            return null;
+        }
+
+        @Override
+        public Collection<Class<?>> register(Object callback) {
+            return null;
+        }
+
+        @Override
+        public Map<Class<?>, Collection<Class<?>>> register(Object callback, 
Object... callbacks) {
+            return null;
+        }
+
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
 }

Reply via email to