This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2cc34af  Process requests asynchronously on some REST APIs (2) (#4778)
2cc34af is described below

commit 2cc34afc0320e66a996b9517d6729a495563fc54
Author: massakam <[email protected]>
AuthorDate: Tue Jul 23 14:40:48 2019 +0900

    Process requests asynchronously on some REST APIs (2) (#4778)
    
    Master Issue: #4756
    
    ### Motivation
    
    This is a continuation of https://github.com/apache/pulsar/pull/4765.
    
    ### Modifications
    
    Added async rest handlers to the following APIs:
    ```
    DELETE /admin/namespaces/{tenant}/{cluster}/{namespace}
    PUT    /admin/namespaces/{tenant}/{cluster}/{namespace}/unload
    POST   /admin/namespaces/{tenant}/{cluster}/{namespace}/clearBacklog
    POST   
/admin/namespaces/{tenant}/{cluster}/{namespace}/clearBacklog/{subscription}
    POST   
/admin/namespaces/{tenant}/{cluster}/{namespace}/unsubscribe/{subscription}
    
    DELETE /admin/v2/namespaces/{tenant}/{namespace}
    PUT    /admin/v2/namespaces/{tenant}/{namespace}/unload
    POST   /admin/v2/namespaces/{tenant}/{namespace}/clearBacklog
    POST   /admin/v2/namespaces/{tenant}/{namespace}/clearBacklog/{subscription}
    POST   /admin/v2/namespaces/{tenant}/{namespace}/unsubscribe/{subscription}
    ```
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 260 +++++++++++++--------
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |  73 ++++--
 .../broker/admin/v1/NonPersistentTopics.java       |   4 +
 .../pulsar/broker/admin/v1/PersistentTopics.java   |   9 +-
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  72 ++++--
 .../broker/admin/v2/NonPersistentTopics.java       |   4 +
 .../pulsar/broker/admin/v2/PersistentTopics.java   |   9 +-
 .../org/apache/pulsar/broker/admin/AdminTest.java  |  10 +-
 .../apache/pulsar/broker/admin/NamespacesTest.java | 139 ++++++-----
 .../org/apache/pulsar/client/admin/Namespaces.java |  62 ++++-
 .../client/admin/internal/NamespacesImpl.java      |  94 ++++++--
 11 files changed, 511 insertions(+), 225 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index c6b7806..fac78a0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -47,6 +47,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
@@ -129,7 +130,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     @SuppressWarnings("deprecation")
-    protected void internalDeleteNamespace(boolean authoritative) {
+    protected void internalDeleteNamespace(AsyncResponse asyncResponse, 
boolean authoritative) {
         validateAdminAccessForTenant(namespaceName.getTenant());
         validatePoliciesReadOnlyAccess();
 
@@ -179,9 +180,11 @@ public abstract class NamespacesBase extends AdminResource 
{
                 }
             }
         } catch (WebApplicationException wae) {
-            throw wae;
+            asyncResponse.resume(wae);
+            return;
         } catch (Exception e) {
-            throw new RestException(e);
+            asyncResponse.resume(new RestException(e));
+            return;
         }
 
         boolean isEmpty;
@@ -190,12 +193,16 @@ public abstract class NamespacesBase extends 
AdminResource {
                     && 
getPartitionedTopicList(TopicDomain.persistent).isEmpty()
                     && 
getPartitionedTopicList(TopicDomain.non_persistent).isEmpty();
         } catch (Exception e) {
-            throw new RestException(e);
+            asyncResponse.resume(new RestException(e));
+            return;
         }
 
         if (!isEmpty) {
-            log.debug("Found topics on namespace {}", namespaceName);
-            throw new RestException(Status.CONFLICT, "Cannot delete non empty 
namespace");
+            if (log.isDebugEnabled()) {
+                log.debug("Found topics on namespace {}", namespaceName);
+            }
+            asyncResponse.resume(new RestException(Status.CONFLICT, "Cannot 
delete non empty namespace"));
+            return;
         }
 
         // set the policies to deleted so that somebody else cannot acquire 
this namespace
@@ -206,35 +213,58 @@ public abstract class NamespacesBase extends 
AdminResource {
             policiesCache().invalidate(path(POLICIES, 
namespaceName.toString()));
         } catch (Exception e) {
             log.error("[{}] Failed to delete namespace on global ZK {}", 
clientAppId(), namespaceName, e);
-            throw new RestException(e);
+            asyncResponse.resume(new RestException(e));
+            return;
         }
 
         // remove from owned namespace map and ephemeral node from ZK
+        final List<CompletableFuture<Void>> futures = Lists.newArrayList();
         try {
             NamespaceBundles bundles = 
pulsar().getNamespaceService().getNamespaceBundleFactory()
                     .getBundles(namespaceName);
             for (NamespaceBundle bundle : bundles.getBundles()) {
                 // check if the bundle is owned by any broker, if not then we 
do not need to delete the bundle
                 if 
(pulsar().getNamespaceService().getOwner(bundle).isPresent()) {
-                    
pulsar().getAdminClient().namespaces().deleteNamespaceBundle(namespaceName.toString(),
-                            bundle.getBundleRange());
+                    futures.add(pulsar().getAdminClient().namespaces()
+                            
.deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange()));
                 }
             }
-
-            // we have successfully removed all the ownership for the 
namespace, the policies znode can be deleted now
-            final String globalZkPolicyPath = path(POLICIES, 
namespaceName.toString());
-            final String lcaolZkPolicyPath = joinPath(LOCAL_POLICIES_ROOT, 
namespaceName.toString());
-            globalZk().delete(globalZkPolicyPath, -1);
-            localZk().delete(lcaolZkPolicyPath, -1);
-            policiesCache().invalidate(globalZkPolicyPath);
-            localCacheService().policiesCache().invalidate(lcaolZkPolicyPath);
-        } catch (PulsarAdminException cae) {
-            throw new RestException(cae);
         } catch (Exception e) {
             log.error("[{}] Failed to remove owned namespace {}", 
clientAppId(), namespaceName, e);
-            // avoid throwing exception in case of the second failure
+            asyncResponse.resume(new RestException(e));
+            return;
         }
 
+        FutureUtil.waitForAll(futures).handle((result, exception) -> {
+            if (exception != null) {
+                if (exception.getCause() instanceof PulsarAdminException) {
+                    asyncResponse.resume(new 
RestException((PulsarAdminException) exception.getCause()));
+                    return null;
+                } else {
+                    log.error("[{}] Failed to remove owned namespace {}", 
clientAppId(), namespaceName, exception);
+                    asyncResponse.resume(new 
RestException(exception.getCause()));
+                    return null;
+                }
+            }
+
+            try {
+                // we have successfully removed all the ownership for the 
namespace, the policies znode can be deleted
+                // now
+                final String globalZkPolicyPath = path(POLICIES, 
namespaceName.toString());
+                final String lcaolZkPolicyPath = joinPath(LOCAL_POLICIES_ROOT, 
namespaceName.toString());
+                globalZk().delete(globalZkPolicyPath, -1);
+                localZk().delete(lcaolZkPolicyPath, -1);
+                policiesCache().invalidate(globalZkPolicyPath);
+                
localCacheService().policiesCache().invalidate(lcaolZkPolicyPath);
+            } catch (Exception e) {
+                log.error("[{}] Failed to remove owned namespace {} from ZK", 
clientAppId(), namespaceName, e);
+                asyncResponse.resume(new RestException(e));
+                return null;
+            }
+
+            asyncResponse.resume(Response.ok().build());
+            return null;
+        });
     }
 
     @SuppressWarnings("deprecation")
@@ -274,7 +304,9 @@ public abstract class NamespacesBase extends AdminResource {
                     }
                     URI redirect = 
UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
                             
.port(replClusterUrl.getPort()).replaceQueryParam("authoritative", 
false).build();
-                    log.debug("[{}] Redirecting the rest call to {}: 
cluster={}", clientAppId(), redirect, replCluster);
+                    if(log.isDebugEnabled()) {
+                        log.debug("[{}] Redirecting the rest call to {}: 
cluster={}", clientAppId(), redirect, replCluster);
+                    }
                     throw new 
WebApplicationException(Response.temporaryRedirect(redirect).build());
                 }
             }
@@ -554,7 +586,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     @SuppressWarnings("deprecation")
-    protected void internalUnloadNamespace() {
+    protected void internalUnloadNamespace(AsyncResponse asyncResponse) {
         log.info("[{}] Unloading namespace {}", clientAppId(), namespaceName);
 
         validateSuperUserAccess();
@@ -569,18 +601,35 @@ public abstract class NamespacesBase extends 
AdminResource {
 
         Policies policies = getNamespacePolicies(namespaceName);
 
+        final List<CompletableFuture<Void>> futures = Lists.newArrayList();
         List<String> boundaries = policies.bundles.getBoundaries();
         for (int i = 0; i < boundaries.size() - 1; i++) {
             String bundle = String.format("%s_%s", boundaries.get(i), 
boundaries.get(i + 1));
             try {
-                
pulsar().getAdminClient().namespaces().unloadNamespaceBundle(namespaceName.toString(),
 bundle);
-            } catch (PulsarServerException | PulsarAdminException e) {
-                log.error(String.format("[%s] Failed to unload namespace %s", 
clientAppId(), namespaceName), e);
-                throw new RestException(e);
+                
futures.add(pulsar().getAdminClient().namespaces().unloadNamespaceBundleAsync(namespaceName.toString(),
+                        bundle));
+            } catch (PulsarServerException e) {
+                log.error("[{}] Failed to unload namespace {}", clientAppId(), 
namespaceName, e);
+                asyncResponse.resume(new RestException(e));
+                return;
             }
         }
 
-        log.info("[{}] Successfully unloaded all the bundles in namespace {}", 
clientAppId(), namespaceName);
+        FutureUtil.waitForAll(futures).handle((result, exception) -> {
+            if (exception != null) {
+                log.error("[{}] Failed to unload namespace {}", clientAppId(), 
namespaceName, exception);
+                if (exception.getCause() instanceof PulsarAdminException) {
+                    asyncResponse.resume(new 
RestException((PulsarAdminException) exception.getCause()));
+                    return null;
+                } else {
+                    asyncResponse.resume(new 
RestException(exception.getCause()));
+                    return null;
+                }
+            }
+            log.info("[{}] Successfully unloaded all the bundles in namespace 
{}", clientAppId(), namespaceName);
+            asyncResponse.resume(Response.ok().build());
+            return null;
+        });
     }
 
     
@@ -1114,41 +1163,45 @@ public abstract class NamespacesBase extends 
AdminResource {
         }
     }
 
-    protected void internalClearNamespaceBacklog(boolean authoritative) {
+    protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse, 
boolean authoritative) {
         validateAdminAccessForTenant(namespaceName.getTenant());
 
+        final List<CompletableFuture<Void>> futures = Lists.newArrayList();
         try {
             NamespaceBundles bundles = 
pulsar().getNamespaceService().getNamespaceBundleFactory()
                     .getBundles(namespaceName);
-            Exception exception = null;
             for (NamespaceBundle nsBundle : bundles.getBundles()) {
-                try {
-                    // check if the bundle is owned by any broker, if not then 
there is no backlog on this bundle to
-                    // clear
-                    if 
(pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) {
-                        // TODO: make this admin call asynchronous
-                        
pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklog(namespaceName.toString(),
-                                nsBundle.getBundleRange());
-                    }
-                } catch (Exception e) {
-                    if (exception == null) {
-                        exception = e;
-                    }
+                // check if the bundle is owned by any broker, if not then 
there is no backlog on this bundle to clear
+                if 
(pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) {
+                    futures.add(pulsar().getAdminClient().namespaces()
+                            
.clearNamespaceBundleBacklogAsync(namespaceName.toString(), 
nsBundle.getBundleRange()));
                 }
             }
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+            return;
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+            return;
+        }
+
+        FutureUtil.waitForAll(futures).handle((result, exception) -> {
             if (exception != null) {
-                if (exception instanceof PulsarAdminException) {
-                    throw new RestException((PulsarAdminException) exception);
+                log.warn("[{}] Failed to clear backlog on the bundles for 
namespace {}: {}", clientAppId(),
+                        namespaceName, exception.getCause().getMessage());
+                if (exception.getCause() instanceof PulsarAdminException) {
+                    asyncResponse.resume(new 
RestException((PulsarAdminException) exception.getCause()));
+                    return null;
                 } else {
-                    throw new RestException(exception.getCause());
+                    asyncResponse.resume(new 
RestException(exception.getCause()));
+                    return null;
                 }
             }
-        } catch (WebApplicationException wae) {
-            throw wae;
-        } catch (Exception e) {
-            throw new RestException(e);
-        }
-        log.info("[{}] Successfully cleared backlog on all the bundles for 
namespace {}", clientAppId(), namespaceName);
+            log.info("[{}] Successfully cleared backlog on all the bundles for 
namespace {}", clientAppId(),
+                    namespaceName);
+            asyncResponse.resume(Response.ok().build());
+            return null;
+        });
     }
 
     @SuppressWarnings("deprecation")
@@ -1172,42 +1225,46 @@ public abstract class NamespacesBase extends 
AdminResource {
                 bundleRange);
     }
 
-    protected void internalClearNamespaceBacklogForSubscription(String 
subscription, boolean authoritative) {
+    protected void internalClearNamespaceBacklogForSubscription(AsyncResponse 
asyncResponse, String subscription,
+            boolean authoritative) {
         validateAdminAccessForTenant(namespaceName.getTenant());
 
+        final List<CompletableFuture<Void>> futures = Lists.newArrayList();
         try {
             NamespaceBundles bundles = 
pulsar().getNamespaceService().getNamespaceBundleFactory()
                     .getBundles(namespaceName);
-            Exception exception = null;
             for (NamespaceBundle nsBundle : bundles.getBundles()) {
-                try {
-                    // check if the bundle is owned by any broker, if not then 
there is no backlog on this bundle to
-                    // clear
-                    if 
(pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) {
-                        // TODO: make this admin call asynchronous
-                        
pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklogForSubscription(
-                                namespaceName.toString(), 
nsBundle.getBundleRange(), subscription);
-                    }
-                } catch (Exception e) {
-                    if (exception == null) {
-                        exception = e;
-                    }
+                // check if the bundle is owned by any broker, if not then 
there is no backlog on this bundle to clear
+                if 
(pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) {
+                    
futures.add(pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklogForSubscriptionAsync(
+                            namespaceName.toString(), 
nsBundle.getBundleRange(), subscription));
                 }
             }
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+            return;
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+            return;
+        }
+
+        FutureUtil.waitForAll(futures).handle((result, exception) -> {
             if (exception != null) {
-                if (exception instanceof PulsarAdminException) {
-                    throw new RestException((PulsarAdminException) exception);
+                log.warn("[{}] Failed to clear backlog for subscription {} on 
the bundles for namespace {}: {}",
+                        clientAppId(), subscription, namespaceName, 
exception.getCause().getMessage());
+                if (exception.getCause() instanceof PulsarAdminException) {
+                    asyncResponse.resume(new 
RestException((PulsarAdminException) exception.getCause()));
+                    return null;
                 } else {
-                    throw new RestException(exception.getCause());
+                    asyncResponse.resume(new 
RestException(exception.getCause()));
+                    return null;
                 }
             }
-        } catch (WebApplicationException wae) {
-            throw wae;
-        } catch (Exception e) {
-            throw new RestException(e);
-        }
-        log.info("[{}] Successfully cleared backlog for subscription {} on all 
the bundles for namespace {}",
-                clientAppId(), subscription, namespaceName);
+            log.info("[{}] Successfully cleared backlog for subscription {} on 
all the bundles for namespace {}",
+                    clientAppId(), subscription, namespaceName);
+            asyncResponse.resume(Response.ok().build());
+            return null;
+        });
     }
 
     @SuppressWarnings("deprecation")
@@ -1232,41 +1289,46 @@ public abstract class NamespacesBase extends 
AdminResource {
                 subscription, namespaceName, bundleRange);
     }
 
-    protected void internalUnsubscribeNamespace(String subscription, boolean 
authoritative) {
+    protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, 
String subscription,
+            boolean authoritative) {
         validateAdminAccessForTenant(namespaceName.getTenant());
 
+        final List<CompletableFuture<Void>> futures = Lists.newArrayList();
         try {
             NamespaceBundles bundles = 
pulsar().getNamespaceService().getNamespaceBundleFactory()
                     .getBundles(namespaceName);
-            Exception exception = null;
             for (NamespaceBundle nsBundle : bundles.getBundles()) {
-                try {
-                    // check if the bundle is owned by any broker, if not then 
there are no subscriptions
-                    if 
(pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) {
-                        // TODO: make this admin call asynchronous
-                        
pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundle(namespaceName.toString(),
-                                nsBundle.getBundleRange(), subscription);
-                    }
-                } catch (Exception e) {
-                    if (exception == null) {
-                        exception = e;
-                    }
+                // check if the bundle is owned by any broker, if not then 
there are no subscriptions
+                if 
(pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) {
+                    
futures.add(pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundleAsync(
+                            namespaceName.toString(), 
nsBundle.getBundleRange(), subscription));
                 }
             }
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+            return;
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+            return;
+        }
+
+        FutureUtil.waitForAll(futures).handle((result, exception) -> {
             if (exception != null) {
-                if (exception instanceof PulsarAdminException) {
-                    throw new RestException((PulsarAdminException) exception);
+                log.warn("[{}] Failed to unsubscribe {} on the bundles for 
namespace {}: {}", clientAppId(),
+                        subscription, namespaceName, 
exception.getCause().getMessage());
+                if (exception.getCause() instanceof PulsarAdminException) {
+                    asyncResponse.resume(new 
RestException((PulsarAdminException) exception.getCause()));
+                    return null;
                 } else {
-                    throw new RestException(exception.getCause());
+                    asyncResponse.resume(new 
RestException(exception.getCause()));
+                    return null;
                 }
             }
-        } catch (WebApplicationException wae) {
-            throw wae;
-        } catch (Exception e) {
-            throw new RestException(e);
-        }
-        log.info("[{}] Successfully unsubscribed {} on all the bundles for 
namespace {}", clientAppId(), subscription,
-                namespaceName);
+            log.info("[{}] Successfully unsubscribed {} on all the bundles for 
namespace {}", clientAppId(),
+                    subscription, namespaceName);
+            asyncResponse.resume(Response.ok().build());
+            return null;
+        });
     }
 
     @SuppressWarnings("deprecation")
@@ -1619,7 +1681,9 @@ public abstract class NamespacesBase extends 
AdminResource {
             partitions.add(String.format("0x%08x", partBoundary));
         }
         if (partitions.size() != initialBundles.getBoundaries().size()) {
-            log.debug("Input bundles included repeated partition points. 
Ignored.");
+            if (log.isDebugEnabled()) {
+                log.debug("Input bundles included repeated partition points. 
Ignored.");
+            }
         }
         try {
             NamespaceBundleFactory.validateFullRange(partitions);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 0fec21d..a47bc23 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -51,6 +51,9 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response.Status;
 import java.util.List;
@@ -178,11 +181,17 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist"),
             @ApiResponse(code = 409, message = "Namespace is not empty") })
-    public void deleteNamespace(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace,
+    public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, 
@PathParam("property") String property,
+            @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        validateNamespaceName(property, cluster, namespace);
-        internalDeleteNamespace(authoritative);
+        try {
+            validateNamespaceName(property, cluster, namespace);
+            internalDeleteNamespace(asyncResponse, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @DELETE
@@ -400,10 +409,16 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist"),
             @ApiResponse(code = 412, message = "Namespace is already unloaded 
or Namespace has bundles activated") })
-    public void unloadNamespace(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace) {
-        validateNamespaceName(property, cluster, namespace);
-        internalUnloadNamespace();
+    public void unloadNamespace(@Suspended final AsyncResponse asyncResponse, 
@PathParam("property") String property,
+            @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace) {
+        try {
+            validateNamespaceName(property, cluster, namespace);
+            internalUnloadNamespace(asyncResponse);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @PUT
@@ -605,11 +620,18 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(hidden = true, value = "Clear backlog for all topics on a 
namespace.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public void clearNamespaceBacklog(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
+    public void clearNamespaceBacklog(@Suspended final AsyncResponse 
asyncResponse,
+            @PathParam("property") String property, @PathParam("cluster") 
String cluster,
             @PathParam("namespace") String namespace,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        validateNamespaceName(property, cluster, namespace);
-        internalClearNamespaceBacklog(authoritative);
+        try {
+            validateNamespaceName(property, cluster, namespace);
+            internalClearNamespaceBacklog(asyncResponse, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @POST
@@ -630,12 +652,18 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(hidden = true, value = "Clear backlog for a given 
subscription on all topics on a namespace.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public void clearNamespaceBacklogForSubscription(@PathParam("property") 
String property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace,
-            @PathParam("subscription") String subscription,
+    public void clearNamespaceBacklogForSubscription(@Suspended final 
AsyncResponse asyncResponse,
+            @PathParam("property") String property, @PathParam("cluster") 
String cluster,
+            @PathParam("namespace") String namespace, 
@PathParam("subscription") String subscription,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        validateNamespaceName(property, cluster, namespace);
-        internalClearNamespaceBacklogForSubscription(subscription, 
authoritative);
+        try {
+            validateNamespaceName(property, cluster, namespace);
+            internalClearNamespaceBacklogForSubscription(asyncResponse, 
subscription, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @POST
@@ -656,11 +684,18 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(hidden = true, value = "Unsubscribes the given subscription 
on all topics on a namespace.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public void unsubscribeNamespace(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
+    public void unsubscribeNamespace(@Suspended final AsyncResponse 
asyncResponse,
+            @PathParam("property") String property, @PathParam("cluster") 
String cluster,
             @PathParam("namespace") String namespace, 
@PathParam("subscription") String subscription,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        validateNamespaceName(property, cluster, namespace);
-        internalUnsubscribeNamespace(subscription, authoritative);
+        try {
+            validateNamespaceName(property, cluster, namespace);
+            internalUnsubscribeNamespace(asyncResponse, subscription, 
authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @POST
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index ced620a..f1347e3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -37,6 +37,7 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
@@ -180,6 +181,9 @@ public class NonPersistentTopics extends PersistentTopics {
                 // check cluster ownership for a given global namespace: 
redirect if peer-cluster owns it
                 validateGlobalNamespaceOwnership(nsName);
             }
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+            return;
         } catch (Exception e) {
             asyncResponse.resume(new RestException(e));
             return;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 78efca2..87df589 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -32,6 +32,7 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
@@ -76,8 +77,10 @@ public class PersistentTopics extends PersistentTopicsBase {
         try {
             validateNamespaceName(property, cluster, namespace);
             asyncResponse.resume(internalGetList());
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
         } catch (Exception e) {
-            asyncResponse.resume(e instanceof RestException ? e : new 
RestException(e));
+            asyncResponse.resume(new RestException(e));
         }
     }
 
@@ -286,6 +289,8 @@ public class PersistentTopics extends PersistentTopicsBase {
         try {
             validateTopicName(property, cluster, namespace, encodedTopic);
             internalGetPartitionedStats(asyncResponse, authoritative, 
perPartition);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
         } catch (Exception e) {
             asyncResponse.resume(new RestException(e));
         }
@@ -306,6 +311,8 @@ public class PersistentTopics extends PersistentTopicsBase {
         try {
             validateTopicName(property, cluster, namespace, encodedTopic);
             internalGetPartitionedStatsInternal(asyncResponse, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
         } catch (Exception e) {
             asyncResponse.resume(new RestException(e));
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index d29972e..435892e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -32,6 +32,9 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 
 import org.apache.pulsar.broker.admin.impl.NamespacesBase;
@@ -126,10 +129,17 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist"),
             @ApiResponse(code = 409, message = "Namespace is not empty") })
-    public void deleteNamespace(@PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace,
+    public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, 
@PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        validateNamespaceName(tenant, namespace);
-        internalDeleteNamespace(authoritative);
+        try {
+            validateNamespaceName(tenant, namespace);
+            internalDeleteNamespace(asyncResponse, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @DELETE
@@ -302,9 +312,16 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or namespace doesn't 
exist"),
             @ApiResponse(code = 412, message = "Namespace is already unloaded 
or Namespace has bundles activated") })
-    public void unloadNamespace(@PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace) {
-        validateNamespaceName(tenant, namespace);
-        internalUnloadNamespace();
+    public void unloadNamespace(@Suspended final AsyncResponse asyncResponse, 
@PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace) {
+        try {
+            validateNamespaceName(tenant, namespace);
+            internalUnloadNamespace(asyncResponse);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @PUT
@@ -545,10 +562,17 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Clear backlog for all topics on a namespace.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public void clearNamespaceBacklog(@PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace,
+    public void clearNamespaceBacklog(@Suspended final AsyncResponse 
asyncResponse, @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        validateNamespaceName(tenant, namespace);
-        internalClearNamespaceBacklog(authoritative);
+        try {
+            validateNamespaceName(tenant, namespace);
+            internalClearNamespaceBacklog(asyncResponse, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @POST
@@ -568,11 +592,18 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Clear backlog for a given subscription on all 
topics on a namespace.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public void clearNamespaceBacklogForSubscription(@PathParam("tenant") 
String tenant,
-            @PathParam("namespace") String namespace, 
@PathParam("subscription") String subscription,
+    public void clearNamespaceBacklogForSubscription(@Suspended final 
AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant, @PathParam("namespace") String 
namespace,
+            @PathParam("subscription") String subscription,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        validateNamespaceName(tenant, namespace);
-        internalClearNamespaceBacklogForSubscription(subscription, 
authoritative);
+        try {
+            validateNamespaceName(tenant, namespace);
+            internalClearNamespaceBacklogForSubscription(asyncResponse, 
subscription, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @POST
@@ -593,11 +624,18 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Unsubscribes the given subscription on all topics 
on a namespace.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public void unsubscribeNamespace(@PathParam("tenant") String tenant, 
@PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, 
@PathParam("subscription") String subscription,
+    public void unsubscribeNamespace(@Suspended final AsyncResponse 
asyncResponse, @PathParam("tenant") String tenant,
+            @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace,
+            @PathParam("subscription") String subscription,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        validateNamespaceName(tenant, namespace);
-        internalUnsubscribeNamespace(subscription, authoritative);
+        try {
+            validateNamespaceName(tenant, namespace);
+            internalUnsubscribeNamespace(asyncResponse, subscription, 
authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @POST
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 9807c9f..8125f5b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -38,6 +38,7 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
@@ -240,6 +241,9 @@ public class NonPersistentTopics extends PersistentTopics {
 
             // check cluster ownership for a given global namespace: redirect 
if peer-cluster owns it
             validateGlobalNamespaceOwnership(namespaceName);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+            return;
         } catch (Exception e) {
             asyncResponse.resume(new RestException(e));
             return;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 04aa79f..96a5ca9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -32,6 +32,7 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
@@ -83,8 +84,10 @@ public class PersistentTopics extends PersistentTopicsBase {
         try {
             validateNamespaceName(tenant, namespace);
             asyncResponse.resume(internalGetList());
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
         } catch (Exception e) {
-            asyncResponse.resume(e instanceof RestException ? e : new 
RestException(e));
+            asyncResponse.resume(new RestException(e));
         }
     }
 
@@ -482,6 +485,8 @@ public class PersistentTopics extends PersistentTopicsBase {
         try {
             validatePartitionedTopicName(tenant, namespace, encodedTopic);
             internalGetPartitionedStats(asyncResponse, authoritative, 
perPartition);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
         } catch (Exception e) {
             asyncResponse.resume(new RestException(e));
         }
@@ -510,6 +515,8 @@ public class PersistentTopics extends PersistentTopicsBase {
         try {
             validateTopicName(tenant, namespace, encodedTopic);
             internalGetPartitionedStatsInternal(asyncResponse, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
         } catch (Exception e) {
             asyncResponse.resume(new RestException(e));
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 643020d..b277ffe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
@@ -47,6 +48,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
 import javax.ws.rs.core.UriInfo;
@@ -84,6 +86,7 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooDefs.Ids;
+import org.mockito.ArgumentCaptor;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -505,8 +508,11 @@ public class AdminTest extends MockedPulsarServiceBaseTest 
{
         properties.createTenant("tenant-config-is-null", null);
         assertEquals(properties.getTenantAdmin("tenant-config-is-null"), 
nullTenantInfo);
 
-
-        namespaces.deleteNamespace("my-tenant", "use", "my-namespace", false);
+        AsyncResponse response = mock(AsyncResponse.class);
+        namespaces.deleteNamespace(response, "my-tenant", "use", 
"my-namespace", false);
+        ArgumentCaptor<Response> captor = 
ArgumentCaptor.forClass(Response.class);
+        verify(response, timeout(5000).times(1)).resume(captor.capture());
+        assertEquals(captor.getValue().getStatus(), Status.OK.getStatusCode());
         properties.deleteTenant("my-tenant");
         properties.deleteTenant("tenant-config-is-null");
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 138404e..763a42c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
@@ -43,9 +44,12 @@ import java.net.URL;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 import javax.ws.rs.ClientErrorException;
 import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
@@ -79,6 +83,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.ZooDefs;
+import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 import org.testng.annotations.AfterMethod;
@@ -525,16 +530,14 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         // Trick to force redirection
         conf.setAuthorizationEnabled(true);
 
-        try {
-            namespaces.deleteNamespace(this.testTenant, this.testOtherCluster,
-                    this.testLocalNamespaces.get(2).getLocalName(), false);
-            fail("Should have raised exception to redirect request");
-        } catch (WebApplicationException wae) {
-            // OK
-            assertEquals(wae.getResponse().getStatus(), 
Status.TEMPORARY_REDIRECT.getStatusCode());
-            assertEquals(wae.getResponse().getLocation().toString(),
-                    
UriBuilder.fromUri(uri).host("broker-usc.com").port(BROKER_WEBSERVICE_PORT).toString());
-        }
+        AsyncResponse response = mock(AsyncResponse.class);
+        namespaces.deleteNamespace(response, this.testTenant, 
this.testOtherCluster,
+                this.testLocalNamespaces.get(2).getLocalName(), false);
+        ArgumentCaptor<WebApplicationException> captor = 
ArgumentCaptor.forClass(WebApplicationException.class);
+        verify(response, timeout(5000).times(1)).resume(captor.capture());
+        assertEquals(captor.getValue().getResponse().getStatus(), 
Status.TEMPORARY_REDIRECT.getStatusCode());
+        assertEquals(captor.getValue().getResponse().getLocation().toString(),
+                
UriBuilder.fromUri(uri).host("broker-usc.com").port(BROKER_WEBSERVICE_PORT).toString());
 
         uri = URI.create("http://localhost"; + ":" + BROKER_WEBSERVICE_PORT + 
"/admin/namespace/"
                 + this.testLocalNamespaces.get(2).toString() + "/unload");
@@ -572,27 +575,23 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         doReturn(uri).when(uriInfo).getRequestUri();
         doReturn(true).when(namespaces).isLeaderBroker();
 
-        try {
-            
namespaces.deleteNamespace(this.testLocalNamespaces.get(2).getTenant(),
-                    this.testLocalNamespaces.get(2).getCluster(), 
this.testLocalNamespaces.get(2).getLocalName(),
-                    false);
-            fail("Should have raised exception to redirect request");
-        } catch (WebApplicationException wae) {
-            // OK
-            assertEquals(wae.getResponse().getStatus(), 
Status.TEMPORARY_REDIRECT.getStatusCode());
-            assertEquals(wae.getResponse().getLocation().toString(),
-                    
UriBuilder.fromUri(uri).host("broker-usc.com").port(BROKER_WEBSERVICE_PORT).toString());
-        }
+        response = mock(AsyncResponse.class);
+        namespaces.deleteNamespace(response, 
this.testLocalNamespaces.get(2).getTenant(),
+                this.testLocalNamespaces.get(2).getCluster(), 
this.testLocalNamespaces.get(2).getLocalName(), false);
+        captor = ArgumentCaptor.forClass(WebApplicationException.class);
+        verify(response, timeout(5000).times(1)).resume(captor.capture());
+        assertEquals(captor.getValue().getResponse().getStatus(), 
Status.TEMPORARY_REDIRECT.getStatusCode());
+        assertEquals(captor.getValue().getResponse().getLocation().toString(),
+                
UriBuilder.fromUri(uri).host("broker-usc.com").port(BROKER_WEBSERVICE_PORT).toString());
     }
 
     @Test
     public void testDeleteNamespaces() throws Exception {
-        try {
-            namespaces.deleteNamespace(this.testTenant, this.testLocalCluster, 
"non-existing-namespace-1", false);
-            fail("should have failed");
-        } catch (RestException e) {
-            assertEquals(e.getResponse().getStatus(), 
Status.NOT_FOUND.getStatusCode());
-        }
+        AsyncResponse response = mock(AsyncResponse.class);
+        namespaces.deleteNamespace(response, this.testTenant, 
this.testLocalCluster, "non-existing-namespace-1", false);
+        ArgumentCaptor<RestException> errorCaptor = 
ArgumentCaptor.forClass(RestException.class);
+        verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
+        assertEquals(errorCaptor.getValue().getResponse().getStatus(), 
Status.NOT_FOUND.getStatusCode());
 
         NamespaceName testNs = this.testLocalNamespaces.get(1);
         TopicName topicName = 
TopicName.get(testNs.getPersistentTopicName("my-topic"));
@@ -603,39 +602,49 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
         
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, 
false, false, false);
         doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
-        try {
-            namespaces.deleteNamespace(testNs.getTenant(), 
testNs.getCluster(), testNs.getLocalName(), false);
-            fail("should have failed");
-        } catch (RestException e) {
-            // Ok, namespace not empty
-            assertEquals(e.getResponse().getStatus(), 
Status.CONFLICT.getStatusCode());
-        }
+
+        response = mock(AsyncResponse.class);
+        namespaces.deleteNamespace(response, testNs.getTenant(), 
testNs.getCluster(), testNs.getLocalName(), false);
+        errorCaptor = ArgumentCaptor.forClass(RestException.class);
+        // Ok, namespace not empty
+        verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
+        assertEquals(errorCaptor.getValue().getResponse().getStatus(), 
Status.CONFLICT.getStatusCode());
+
         // delete the topic from ZK
         mockZookKeeper.delete("/managed-ledgers/" + 
topicName.getPersistenceNamingEncoding(), -1);
 
         ZkUtils.createFullPathOptimistic(mockZookKeeper,
                 "/admin/partitioned-topics/" + 
topicName.getPersistenceNamingEncoding(),
                 new byte[0], null, null);
-        try {
-            namespaces.deleteNamespace(testNs.getTenant(), 
testNs.getCluster(), testNs.getLocalName(), false);
-            fail("should have failed");
-        } catch (RestException e) {
-            // Ok, namespace not empty
-            assertEquals(e.getResponse().getStatus(), 
Status.CONFLICT.getStatusCode());
-        }
+
+        response = mock(AsyncResponse.class);
+        namespaces.deleteNamespace(response, testNs.getTenant(), 
testNs.getCluster(), testNs.getLocalName(), false);
+        errorCaptor = ArgumentCaptor.forClass(RestException.class);
+        // Ok, namespace not empty
+        verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
+        assertEquals(errorCaptor.getValue().getResponse().getStatus(), 
Status.CONFLICT.getStatusCode());
+
         mockZookKeeper.delete("/admin/partitioned-topics/" + 
topicName.getPersistenceNamingEncoding(), -1);
 
         testNs = this.testGlobalNamespaces.get(0);
         // setup ownership to localhost
         
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, 
false, false, false);
         doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
-        namespaces.deleteNamespace(testNs.getTenant(), testNs.getCluster(), 
testNs.getLocalName(), false);
+        response = mock(AsyncResponse.class);
+        namespaces.deleteNamespace(response, testNs.getTenant(), 
testNs.getCluster(), testNs.getLocalName(), false);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        assertEquals(responseCaptor.getValue().getStatus(), 
Status.OK.getStatusCode());
 
         testNs = this.testLocalNamespaces.get(0);
         // setup ownership to localhost
         
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, 
false, false, false);
         doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
-        namespaces.deleteNamespace(testNs.getTenant(), testNs.getCluster(), 
testNs.getLocalName(), false);
+        response = mock(AsyncResponse.class);
+        namespaces.deleteNamespace(response, testNs.getTenant(), 
testNs.getCluster(), testNs.getLocalName(), false);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        assertEquals(responseCaptor.getValue().getStatus(), 
Status.OK.getStatusCode());
         List<String> nsList = 
Lists.newArrayList(this.testLocalNamespaces.get(1).toString(),
                 this.testLocalNamespaces.get(2).toString());
         nsList.sort(null);
@@ -647,7 +656,11 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         // setup ownership to localhost
         
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, 
false, false, false);
         doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
-        namespaces.deleteNamespace(testNs.getTenant(), testNs.getCluster(), 
testNs.getLocalName(), false);
+        response = mock(AsyncResponse.class);
+        namespaces.deleteNamespace(response, testNs.getTenant(), 
testNs.getCluster(), testNs.getLocalName(), false);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        assertEquals(responseCaptor.getValue().getStatus(), 
Status.OK.getStatusCode());
     }
 
     @Test
@@ -682,9 +695,11 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
                     }
                 }));
 
-        doThrow(new PulsarAdminException.PreconditionFailedException(
-                new 
ClientErrorException(Status.PRECONDITION_FAILED))).when(namespacesAdmin)
-                        .deleteNamespaceBundle(Mockito.anyString(), 
Mockito.anyString());
+        CompletableFuture<Void> preconditionFailed = new CompletableFuture<>();
+        preconditionFailed.completeExceptionally(new 
PulsarAdminException.PreconditionFailedException(
+                new ClientErrorException(Status.PRECONDITION_FAILED)));
+        doReturn(preconditionFailed).when(namespacesAdmin)
+                .deleteNamespaceBundleAsync(Mockito.anyString(), 
Mockito.anyString());
 
         try {
             namespaces.deleteNamespaceBundle(testTenant, testLocalCluster, 
bundledNsLocal, "0x00000000_0x80000000",
@@ -694,19 +709,18 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
             assertEquals(re.getResponse().getStatus(), 
Status.PRECONDITION_FAILED.getStatusCode());
         }
 
-        try {
-            namespaces.deleteNamespace(testTenant, testLocalCluster, 
bundledNsLocal, false);
-            fail("Should have failed");
-        } catch (RestException re) {
-            assertEquals(re.getResponse().getStatus(), 
Status.PRECONDITION_FAILED.getStatusCode());
-        }
+        AsyncResponse response = mock(AsyncResponse.class);
+        namespaces.deleteNamespace(response, testTenant, testLocalCluster, 
bundledNsLocal, false);
+        ArgumentCaptor<RestException> captor = 
ArgumentCaptor.forClass(RestException.class);
+        verify(response, timeout(5000).times(1)).resume(captor.capture());
+        assertEquals(captor.getValue().getResponse().getStatus(), 
Status.PRECONDITION_FAILED.getStatusCode());
 
         NamespaceBundles nsBundles = 
nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
         // make one bundle owned
         
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(nsBundles.getBundles().get(0),
 false,
                 true, false);
         
doReturn(true).when(nsSvc).isServiceUnitOwned(nsBundles.getBundles().get(0));
-        doNothing().when(namespacesAdmin).deleteNamespaceBundle(
+        
doReturn(CompletableFuture.completedFuture(null)).when(namespacesAdmin).deleteNamespaceBundleAsync(
                 testTenant + "/" + testLocalCluster + "/" + bundledNsLocal, 
"0x00000000_0x80000000");
 
         try {
@@ -717,12 +731,11 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
             assertEquals(re.getResponse().getStatus(), 
Status.PRECONDITION_FAILED.getStatusCode());
         }
 
-        try {
-            namespaces.deleteNamespace(testTenant, testLocalCluster, 
bundledNsLocal, false);
-            fail("should have failed");
-        } catch (RestException re) {
-            assertEquals(re.getResponse().getStatus(), 
Status.PRECONDITION_FAILED.getStatusCode());
-        }
+        response = mock(AsyncResponse.class);
+        namespaces.deleteNamespace(response, testTenant, testLocalCluster, 
bundledNsLocal, false);
+        captor = ArgumentCaptor.forClass(RestException.class);
+        verify(response, timeout(5000).times(1)).resume(captor.capture());
+        assertEquals(captor.getValue().getResponse().getStatus(), 
Status.PRECONDITION_FAILED.getStatusCode());
 
         // ensure all three bundles are owned by the local broker
         for (NamespaceBundle bundle : nsBundles.getBundles()) {
@@ -744,7 +757,11 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         doNothing().when(namespaces).validateBundleOwnership(bundle, false, 
true);
 
         // The namespace unload should succeed on all the bundles
-        namespaces.unloadNamespace(testNs.getTenant(), testNs.getCluster(), 
testNs.getLocalName());
+        AsyncResponse response = mock(AsyncResponse.class);
+        namespaces.unloadNamespace(response, testNs.getTenant(), 
testNs.getCluster(), testNs.getLocalName());
+        ArgumentCaptor<Response> captor = 
ArgumentCaptor.forClass(Response.class);
+        verify(response, timeout(5000).times(1)).resume(captor.capture());
+        assertEquals(captor.getValue().getStatus(), Status.OK.getStatusCode());
     }
 
     @Test
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index a828993..c69a4a0 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.admin;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
@@ -304,6 +305,20 @@ public interface Namespaces {
     void deleteNamespaceBundle(String namespace, String bundleRange) throws 
PulsarAdminException;
 
     /**
+     * Delete an existing bundle in a namespace asynchronously.
+     * <p>
+     * The bundle needs to be empty.
+     *
+     * @param namespace
+     *            Namespace name
+     * @param bundleRange
+     *            range of the bundle
+     *
+     * @return a future that can be used to track when the bundle is deleted
+     */
+    CompletableFuture<Void> deleteNamespaceBundleAsync(String namespace, 
String bundleRange);
+
+    /**
      * Get permissions on a namespace.
      * <p>
      * Retrieve the permissions for a namespace.
@@ -884,13 +899,25 @@ public interface Namespaces {
      * Unload namespace bundle
      *
      * @param namespace
-     * @bundle range of bundle to unload
+     * @param bundle
+     *           range of bundle to unload
      * @throws PulsarAdminException
      *             Unexpected error
      */
     void unloadNamespaceBundle(String namespace, String bundle) throws 
PulsarAdminException;
 
     /**
+     * Unload namespace bundle asynchronously
+     *
+     * @param namespace
+     * @param bundle
+     *           range of bundle to unload
+     *
+     * @return a future that can be used to track when the bundle is unloaded
+     */
+    CompletableFuture<Void> unloadNamespaceBundleAsync(String namespace, 
String bundle);
+
+    /**
      * Split namespace bundle
      *
      * @param namespace
@@ -1014,6 +1041,16 @@ public interface Namespaces {
     void clearNamespaceBundleBacklog(String namespace, String bundle) throws 
PulsarAdminException;
 
     /**
+     * Clear backlog for all topics on a namespace bundle asynchronously
+     *
+     * @param namespace
+     * @param bundle
+     *
+     * @return a future that can be used to track when the bundle is cleared
+     */
+    CompletableFuture<Void> clearNamespaceBundleBacklogAsync(String namespace, 
String bundle);
+
+    /**
      * Clear backlog for a given subscription on all topics on a namespace 
bundle
      *
      * @param namespace
@@ -1026,6 +1063,18 @@ public interface Namespaces {
             throws PulsarAdminException;
 
     /**
+     * Clear backlog for a given subscription on all topics on a namespace 
bundle asynchronously
+     *
+     * @param namespace
+     * @param bundle
+     * @param subscription
+     *
+     * @return a future that can be used to track when the bundle is cleared
+     */
+    CompletableFuture<Void> 
clearNamespaceBundleBacklogForSubscriptionAsync(String namespace, String bundle,
+            String subscription);
+
+    /**
      * Unsubscribes the given subscription on all topics on a namespace
      *
      * @param namespace
@@ -1045,6 +1094,17 @@ public interface Namespaces {
     void unsubscribeNamespaceBundle(String namespace, String bundle, String 
subscription) throws PulsarAdminException;
 
     /**
+     * Unsubscribes the given subscription on all topics on a namespace bundle 
asynchronously
+     *
+     * @param namespace
+     * @param bundle
+     * @param subscription
+     *
+     * @return a future that can be used to track when the subscription is 
unsubscribed
+     */
+    CompletableFuture<Void> unsubscribeNamespaceBundleAsync(String namespace, 
String bundle, String subscription);
+
+    /**
      * Set the encryption required status for all topics within a namespace.
      * <p>
      * When encryption required is true, the broker will prevent to store 
unencrypted messages.
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 08f7b87..a5470a4 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -23,6 +23,8 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.client.Entity;
@@ -192,15 +194,23 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
     @Override
     public void deleteNamespaceBundle(String namespace, String bundleRange) 
throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, bundleRange);
-            request(path).delete(ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            deleteNamespaceBundleAsync(namespace, bundleRange).get();
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> deleteNamespaceBundleAsync(String 
namespace, String bundleRange) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, bundleRange);
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public Map<String, Set<AuthAction>> getPermissions(String namespace) 
throws PulsarAdminException {
         try {
             NamespaceName ns = NamespaceName.get(namespace);
@@ -496,15 +506,23 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
     @Override
     public void unloadNamespaceBundle(String namespace, String bundle) throws 
PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, bundle, "unload");
-            request(path).put(Entity.entity("", MediaType.APPLICATION_JSON), 
ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            unloadNamespaceBundleAsync(namespace, bundle).get();
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> unloadNamespaceBundleAsync(String 
namespace, String bundle) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, bundle, "unload");
+        return asyncPutRequest(path, Entity.entity("", 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void splitNamespaceBundle(String namespace, String bundle, boolean 
unloadSplitBundles)
             throws PulsarAdminException {
         try {
@@ -631,27 +649,44 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
     @Override
     public void clearNamespaceBundleBacklog(String namespace, String bundle) 
throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, bundle, "clearBacklog");
-            request(path).post(Entity.entity("", MediaType.APPLICATION_JSON), 
ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            clearNamespaceBundleBacklogAsync(namespace, bundle).get();
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> clearNamespaceBundleBacklogAsync(String 
namespace, String bundle) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, bundle, "clearBacklog");
+        return asyncPostRequest(path, Entity.entity("", 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void clearNamespaceBundleBacklogForSubscription(String namespace, 
String bundle, String subscription)
             throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, bundle, "clearBacklog", 
subscription);
-            request(path).post(Entity.entity("", MediaType.APPLICATION_JSON), 
ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            clearNamespaceBundleBacklogForSubscriptionAsync(namespace, bundle, 
subscription).get();
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> 
clearNamespaceBundleBacklogForSubscriptionAsync(String namespace, String bundle,
+            String subscription) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, bundle, "clearBacklog", 
subscription);
+        return asyncPostRequest(path, Entity.entity("", 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void unsubscribeNamespace(String namespace, String subscription) 
throws PulsarAdminException {
         try {
             NamespaceName ns = NamespaceName.get(namespace);
@@ -666,15 +701,24 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
     public void unsubscribeNamespaceBundle(String namespace, String bundle, 
String subscription)
             throws PulsarAdminException {
         try {
-            NamespaceName ns = NamespaceName.get(namespace);
-            WebTarget path = namespacePath(ns, bundle, "unsubscribe", 
subscription);
-            request(path).post(Entity.entity("", MediaType.APPLICATION_JSON), 
ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            unsubscribeNamespaceBundleAsync(namespace, bundle, 
subscription).get();
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> unsubscribeNamespaceBundleAsync(String 
namespace, String bundle,
+            String subscription) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, bundle, "unsubscribe", 
subscription);
+        return asyncPostRequest(path, Entity.entity("", 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void setSubscriptionAuthMode(String namespace, SubscriptionAuthMode 
subscriptionAuthMode) throws PulsarAdminException {
         try {
             NamespaceName ns = NamespaceName.get(namespace);

Reply via email to