This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2cc34af Process requests asynchronously on some REST APIs (2) (#4778)
2cc34af is described below
commit 2cc34afc0320e66a996b9517d6729a495563fc54
Author: massakam <[email protected]>
AuthorDate: Tue Jul 23 14:40:48 2019 +0900
Process requests asynchronously on some REST APIs (2) (#4778)
Master Issue: #4756
### Motivation
This is a continuation of https://github.com/apache/pulsar/pull/4765.
### Modifications
Added async rest handlers to the following APIs:
```
DELETE /admin/namespaces/{tenant}/{cluster}/{namespace}
PUT /admin/namespaces/{tenant}/{cluster}/{namespace}/unload
POST /admin/namespaces/{tenant}/{cluster}/{namespace}/clearBacklog
POST
/admin/namespaces/{tenant}/{cluster}/{namespace}/clearBacklog/{subscription}
POST
/admin/namespaces/{tenant}/{cluster}/{namespace}/unsubscribe/{subscription}
DELETE /admin/v2/namespaces/{tenant}/{namespace}
PUT /admin/v2/namespaces/{tenant}/{namespace}/unload
POST /admin/v2/namespaces/{tenant}/{namespace}/clearBacklog
POST /admin/v2/namespaces/{tenant}/{namespace}/clearBacklog/{subscription}
POST /admin/v2/namespaces/{tenant}/{namespace}/unsubscribe/{subscription}
```
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 260 +++++++++++++--------
.../apache/pulsar/broker/admin/v1/Namespaces.java | 73 ++++--
.../broker/admin/v1/NonPersistentTopics.java | 4 +
.../pulsar/broker/admin/v1/PersistentTopics.java | 9 +-
.../apache/pulsar/broker/admin/v2/Namespaces.java | 72 ++++--
.../broker/admin/v2/NonPersistentTopics.java | 4 +
.../pulsar/broker/admin/v2/PersistentTopics.java | 9 +-
.../org/apache/pulsar/broker/admin/AdminTest.java | 10 +-
.../apache/pulsar/broker/admin/NamespacesTest.java | 139 ++++++-----
.../org/apache/pulsar/client/admin/Namespaces.java | 62 ++++-
.../client/admin/internal/NamespacesImpl.java | 94 ++++++--
11 files changed, 511 insertions(+), 225 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index c6b7806..fac78a0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -47,6 +47,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
@@ -129,7 +130,7 @@ public abstract class NamespacesBase extends AdminResource {
}
@SuppressWarnings("deprecation")
- protected void internalDeleteNamespace(boolean authoritative) {
+ protected void internalDeleteNamespace(AsyncResponse asyncResponse,
boolean authoritative) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
@@ -179,9 +180,11 @@ public abstract class NamespacesBase extends AdminResource
{
}
}
} catch (WebApplicationException wae) {
- throw wae;
+ asyncResponse.resume(wae);
+ return;
} catch (Exception e) {
- throw new RestException(e);
+ asyncResponse.resume(new RestException(e));
+ return;
}
boolean isEmpty;
@@ -190,12 +193,16 @@ public abstract class NamespacesBase extends
AdminResource {
&&
getPartitionedTopicList(TopicDomain.persistent).isEmpty()
&&
getPartitionedTopicList(TopicDomain.non_persistent).isEmpty();
} catch (Exception e) {
- throw new RestException(e);
+ asyncResponse.resume(new RestException(e));
+ return;
}
if (!isEmpty) {
- log.debug("Found topics on namespace {}", namespaceName);
- throw new RestException(Status.CONFLICT, "Cannot delete non empty
namespace");
+ if (log.isDebugEnabled()) {
+ log.debug("Found topics on namespace {}", namespaceName);
+ }
+ asyncResponse.resume(new RestException(Status.CONFLICT, "Cannot
delete non empty namespace"));
+ return;
}
// set the policies to deleted so that somebody else cannot acquire
this namespace
@@ -206,35 +213,58 @@ public abstract class NamespacesBase extends
AdminResource {
policiesCache().invalidate(path(POLICIES,
namespaceName.toString()));
} catch (Exception e) {
log.error("[{}] Failed to delete namespace on global ZK {}",
clientAppId(), namespaceName, e);
- throw new RestException(e);
+ asyncResponse.resume(new RestException(e));
+ return;
}
// remove from owned namespace map and ephemeral node from ZK
+ final List<CompletableFuture<Void>> futures = Lists.newArrayList();
try {
NamespaceBundles bundles =
pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);
for (NamespaceBundle bundle : bundles.getBundles()) {
// check if the bundle is owned by any broker, if not then we
do not need to delete the bundle
if
(pulsar().getNamespaceService().getOwner(bundle).isPresent()) {
-
pulsar().getAdminClient().namespaces().deleteNamespaceBundle(namespaceName.toString(),
- bundle.getBundleRange());
+ futures.add(pulsar().getAdminClient().namespaces()
+
.deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange()));
}
}
-
- // we have successfully removed all the ownership for the
namespace, the policies znode can be deleted now
- final String globalZkPolicyPath = path(POLICIES,
namespaceName.toString());
- final String lcaolZkPolicyPath = joinPath(LOCAL_POLICIES_ROOT,
namespaceName.toString());
- globalZk().delete(globalZkPolicyPath, -1);
- localZk().delete(lcaolZkPolicyPath, -1);
- policiesCache().invalidate(globalZkPolicyPath);
- localCacheService().policiesCache().invalidate(lcaolZkPolicyPath);
- } catch (PulsarAdminException cae) {
- throw new RestException(cae);
} catch (Exception e) {
log.error("[{}] Failed to remove owned namespace {}",
clientAppId(), namespaceName, e);
- // avoid throwing exception in case of the second failure
+ asyncResponse.resume(new RestException(e));
+ return;
}
+ FutureUtil.waitForAll(futures).handle((result, exception) -> {
+ if (exception != null) {
+ if (exception.getCause() instanceof PulsarAdminException) {
+ asyncResponse.resume(new
RestException((PulsarAdminException) exception.getCause()));
+ return null;
+ } else {
+ log.error("[{}] Failed to remove owned namespace {}",
clientAppId(), namespaceName, exception);
+ asyncResponse.resume(new
RestException(exception.getCause()));
+ return null;
+ }
+ }
+
+ try {
+ // we have successfully removed all the ownership for the
namespace, the policies znode can be deleted
+ // now
+ final String globalZkPolicyPath = path(POLICIES,
namespaceName.toString());
+ final String lcaolZkPolicyPath = joinPath(LOCAL_POLICIES_ROOT,
namespaceName.toString());
+ globalZk().delete(globalZkPolicyPath, -1);
+ localZk().delete(lcaolZkPolicyPath, -1);
+ policiesCache().invalidate(globalZkPolicyPath);
+
localCacheService().policiesCache().invalidate(lcaolZkPolicyPath);
+ } catch (Exception e) {
+ log.error("[{}] Failed to remove owned namespace {} from ZK",
clientAppId(), namespaceName, e);
+ asyncResponse.resume(new RestException(e));
+ return null;
+ }
+
+ asyncResponse.resume(Response.ok().build());
+ return null;
+ });
}
@SuppressWarnings("deprecation")
@@ -274,7 +304,9 @@ public abstract class NamespacesBase extends AdminResource {
}
URI redirect =
UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
.port(replClusterUrl.getPort()).replaceQueryParam("authoritative",
false).build();
- log.debug("[{}] Redirecting the rest call to {}:
cluster={}", clientAppId(), redirect, replCluster);
+ if(log.isDebugEnabled()) {
+ log.debug("[{}] Redirecting the rest call to {}:
cluster={}", clientAppId(), redirect, replCluster);
+ }
throw new
WebApplicationException(Response.temporaryRedirect(redirect).build());
}
}
@@ -554,7 +586,7 @@ public abstract class NamespacesBase extends AdminResource {
}
@SuppressWarnings("deprecation")
- protected void internalUnloadNamespace() {
+ protected void internalUnloadNamespace(AsyncResponse asyncResponse) {
log.info("[{}] Unloading namespace {}", clientAppId(), namespaceName);
validateSuperUserAccess();
@@ -569,18 +601,35 @@ public abstract class NamespacesBase extends
AdminResource {
Policies policies = getNamespacePolicies(namespaceName);
+ final List<CompletableFuture<Void>> futures = Lists.newArrayList();
List<String> boundaries = policies.bundles.getBoundaries();
for (int i = 0; i < boundaries.size() - 1; i++) {
String bundle = String.format("%s_%s", boundaries.get(i),
boundaries.get(i + 1));
try {
-
pulsar().getAdminClient().namespaces().unloadNamespaceBundle(namespaceName.toString(),
bundle);
- } catch (PulsarServerException | PulsarAdminException e) {
- log.error(String.format("[%s] Failed to unload namespace %s",
clientAppId(), namespaceName), e);
- throw new RestException(e);
+
futures.add(pulsar().getAdminClient().namespaces().unloadNamespaceBundleAsync(namespaceName.toString(),
+ bundle));
+ } catch (PulsarServerException e) {
+ log.error("[{}] Failed to unload namespace {}", clientAppId(),
namespaceName, e);
+ asyncResponse.resume(new RestException(e));
+ return;
}
}
- log.info("[{}] Successfully unloaded all the bundles in namespace {}",
clientAppId(), namespaceName);
+ FutureUtil.waitForAll(futures).handle((result, exception) -> {
+ if (exception != null) {
+ log.error("[{}] Failed to unload namespace {}", clientAppId(),
namespaceName, exception);
+ if (exception.getCause() instanceof PulsarAdminException) {
+ asyncResponse.resume(new
RestException((PulsarAdminException) exception.getCause()));
+ return null;
+ } else {
+ asyncResponse.resume(new
RestException(exception.getCause()));
+ return null;
+ }
+ }
+ log.info("[{}] Successfully unloaded all the bundles in namespace
{}", clientAppId(), namespaceName);
+ asyncResponse.resume(Response.ok().build());
+ return null;
+ });
}
@@ -1114,41 +1163,45 @@ public abstract class NamespacesBase extends
AdminResource {
}
}
- protected void internalClearNamespaceBacklog(boolean authoritative) {
+ protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse,
boolean authoritative) {
validateAdminAccessForTenant(namespaceName.getTenant());
+ final List<CompletableFuture<Void>> futures = Lists.newArrayList();
try {
NamespaceBundles bundles =
pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);
- Exception exception = null;
for (NamespaceBundle nsBundle : bundles.getBundles()) {
- try {
- // check if the bundle is owned by any broker, if not then
there is no backlog on this bundle to
- // clear
- if
(pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) {
- // TODO: make this admin call asynchronous
-
pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklog(namespaceName.toString(),
- nsBundle.getBundleRange());
- }
- } catch (Exception e) {
- if (exception == null) {
- exception = e;
- }
+ // check if the bundle is owned by any broker, if not then
there is no backlog on this bundle to clear
+ if
(pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) {
+ futures.add(pulsar().getAdminClient().namespaces()
+
.clearNamespaceBundleBacklogAsync(namespaceName.toString(),
nsBundle.getBundleRange()));
}
}
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ return;
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+
+ FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
- if (exception instanceof PulsarAdminException) {
- throw new RestException((PulsarAdminException) exception);
+ log.warn("[{}] Failed to clear backlog on the bundles for
namespace {}: {}", clientAppId(),
+ namespaceName, exception.getCause().getMessage());
+ if (exception.getCause() instanceof PulsarAdminException) {
+ asyncResponse.resume(new
RestException((PulsarAdminException) exception.getCause()));
+ return null;
} else {
- throw new RestException(exception.getCause());
+ asyncResponse.resume(new
RestException(exception.getCause()));
+ return null;
}
}
- } catch (WebApplicationException wae) {
- throw wae;
- } catch (Exception e) {
- throw new RestException(e);
- }
- log.info("[{}] Successfully cleared backlog on all the bundles for
namespace {}", clientAppId(), namespaceName);
+ log.info("[{}] Successfully cleared backlog on all the bundles for
namespace {}", clientAppId(),
+ namespaceName);
+ asyncResponse.resume(Response.ok().build());
+ return null;
+ });
}
@SuppressWarnings("deprecation")
@@ -1172,42 +1225,46 @@ public abstract class NamespacesBase extends
AdminResource {
bundleRange);
}
- protected void internalClearNamespaceBacklogForSubscription(String
subscription, boolean authoritative) {
+ protected void internalClearNamespaceBacklogForSubscription(AsyncResponse
asyncResponse, String subscription,
+ boolean authoritative) {
validateAdminAccessForTenant(namespaceName.getTenant());
+ final List<CompletableFuture<Void>> futures = Lists.newArrayList();
try {
NamespaceBundles bundles =
pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);
- Exception exception = null;
for (NamespaceBundle nsBundle : bundles.getBundles()) {
- try {
- // check if the bundle is owned by any broker, if not then
there is no backlog on this bundle to
- // clear
- if
(pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) {
- // TODO: make this admin call asynchronous
-
pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklogForSubscription(
- namespaceName.toString(),
nsBundle.getBundleRange(), subscription);
- }
- } catch (Exception e) {
- if (exception == null) {
- exception = e;
- }
+ // check if the bundle is owned by any broker, if not then
there is no backlog on this bundle to clear
+ if
(pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) {
+
futures.add(pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklogForSubscriptionAsync(
+ namespaceName.toString(),
nsBundle.getBundleRange(), subscription));
}
}
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ return;
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+
+ FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
- if (exception instanceof PulsarAdminException) {
- throw new RestException((PulsarAdminException) exception);
+ log.warn("[{}] Failed to clear backlog for subscription {} on
the bundles for namespace {}: {}",
+ clientAppId(), subscription, namespaceName,
exception.getCause().getMessage());
+ if (exception.getCause() instanceof PulsarAdminException) {
+ asyncResponse.resume(new
RestException((PulsarAdminException) exception.getCause()));
+ return null;
} else {
- throw new RestException(exception.getCause());
+ asyncResponse.resume(new
RestException(exception.getCause()));
+ return null;
}
}
- } catch (WebApplicationException wae) {
- throw wae;
- } catch (Exception e) {
- throw new RestException(e);
- }
- log.info("[{}] Successfully cleared backlog for subscription {} on all
the bundles for namespace {}",
- clientAppId(), subscription, namespaceName);
+ log.info("[{}] Successfully cleared backlog for subscription {} on
all the bundles for namespace {}",
+ clientAppId(), subscription, namespaceName);
+ asyncResponse.resume(Response.ok().build());
+ return null;
+ });
}
@SuppressWarnings("deprecation")
@@ -1232,41 +1289,46 @@ public abstract class NamespacesBase extends
AdminResource {
subscription, namespaceName, bundleRange);
}
- protected void internalUnsubscribeNamespace(String subscription, boolean
authoritative) {
+ protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse,
String subscription,
+ boolean authoritative) {
validateAdminAccessForTenant(namespaceName.getTenant());
+ final List<CompletableFuture<Void>> futures = Lists.newArrayList();
try {
NamespaceBundles bundles =
pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);
- Exception exception = null;
for (NamespaceBundle nsBundle : bundles.getBundles()) {
- try {
- // check if the bundle is owned by any broker, if not then
there are no subscriptions
- if
(pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) {
- // TODO: make this admin call asynchronous
-
pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundle(namespaceName.toString(),
- nsBundle.getBundleRange(), subscription);
- }
- } catch (Exception e) {
- if (exception == null) {
- exception = e;
- }
+ // check if the bundle is owned by any broker, if not then
there are no subscriptions
+ if
(pulsar().getNamespaceService().getOwner(nsBundle).isPresent()) {
+
futures.add(pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundleAsync(
+ namespaceName.toString(),
nsBundle.getBundleRange(), subscription));
}
}
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ return;
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+
+ FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
- if (exception instanceof PulsarAdminException) {
- throw new RestException((PulsarAdminException) exception);
+ log.warn("[{}] Failed to unsubscribe {} on the bundles for
namespace {}: {}", clientAppId(),
+ subscription, namespaceName,
exception.getCause().getMessage());
+ if (exception.getCause() instanceof PulsarAdminException) {
+ asyncResponse.resume(new
RestException((PulsarAdminException) exception.getCause()));
+ return null;
} else {
- throw new RestException(exception.getCause());
+ asyncResponse.resume(new
RestException(exception.getCause()));
+ return null;
}
}
- } catch (WebApplicationException wae) {
- throw wae;
- } catch (Exception e) {
- throw new RestException(e);
- }
- log.info("[{}] Successfully unsubscribed {} on all the bundles for
namespace {}", clientAppId(), subscription,
- namespaceName);
+ log.info("[{}] Successfully unsubscribed {} on all the bundles for
namespace {}", clientAppId(),
+ subscription, namespaceName);
+ asyncResponse.resume(Response.ok().build());
+ return null;
+ });
}
@SuppressWarnings("deprecation")
@@ -1619,7 +1681,9 @@ public abstract class NamespacesBase extends
AdminResource {
partitions.add(String.format("0x%08x", partBoundary));
}
if (partitions.size() != initialBundles.getBoundaries().size()) {
- log.debug("Input bundles included repeated partition points.
Ignored.");
+ if (log.isDebugEnabled()) {
+ log.debug("Input bundles included repeated partition points.
Ignored.");
+ }
}
try {
NamespaceBundleFactory.validateFullRange(partitions);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 0fec21d..a47bc23 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -51,6 +51,9 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status;
import java.util.List;
@@ -178,11 +181,17 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist"),
@ApiResponse(code = 409, message = "Namespace is not empty") })
- public void deleteNamespace(@PathParam("property") String property,
@PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace,
+ public void deleteNamespace(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property,
+ @PathParam("cluster") String cluster, @PathParam("namespace")
String namespace,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
- validateNamespaceName(property, cluster, namespace);
- internalDeleteNamespace(authoritative);
+ try {
+ validateNamespaceName(property, cluster, namespace);
+ internalDeleteNamespace(asyncResponse, authoritative);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
}
@DELETE
@@ -400,10 +409,16 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist"),
@ApiResponse(code = 412, message = "Namespace is already unloaded
or Namespace has bundles activated") })
- public void unloadNamespace(@PathParam("property") String property,
@PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace) {
- validateNamespaceName(property, cluster, namespace);
- internalUnloadNamespace();
+ public void unloadNamespace(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property,
+ @PathParam("cluster") String cluster, @PathParam("namespace")
String namespace) {
+ try {
+ validateNamespaceName(property, cluster, namespace);
+ internalUnloadNamespace(asyncResponse);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
}
@PUT
@@ -605,11 +620,18 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(hidden = true, value = "Clear backlog for all topics on a
namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
- public void clearNamespaceBacklog(@PathParam("property") String property,
@PathParam("cluster") String cluster,
+ public void clearNamespaceBacklog(@Suspended final AsyncResponse
asyncResponse,
+ @PathParam("property") String property, @PathParam("cluster")
String cluster,
@PathParam("namespace") String namespace,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
- validateNamespaceName(property, cluster, namespace);
- internalClearNamespaceBacklog(authoritative);
+ try {
+ validateNamespaceName(property, cluster, namespace);
+ internalClearNamespaceBacklog(asyncResponse, authoritative);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
}
@POST
@@ -630,12 +652,18 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(hidden = true, value = "Clear backlog for a given
subscription on all topics on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
- public void clearNamespaceBacklogForSubscription(@PathParam("property")
String property,
- @PathParam("cluster") String cluster, @PathParam("namespace")
String namespace,
- @PathParam("subscription") String subscription,
+ public void clearNamespaceBacklogForSubscription(@Suspended final
AsyncResponse asyncResponse,
+ @PathParam("property") String property, @PathParam("cluster")
String cluster,
+ @PathParam("namespace") String namespace,
@PathParam("subscription") String subscription,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
- validateNamespaceName(property, cluster, namespace);
- internalClearNamespaceBacklogForSubscription(subscription,
authoritative);
+ try {
+ validateNamespaceName(property, cluster, namespace);
+ internalClearNamespaceBacklogForSubscription(asyncResponse,
subscription, authoritative);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
}
@POST
@@ -656,11 +684,18 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(hidden = true, value = "Unsubscribes the given subscription
on all topics on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
- public void unsubscribeNamespace(@PathParam("property") String property,
@PathParam("cluster") String cluster,
+ public void unsubscribeNamespace(@Suspended final AsyncResponse
asyncResponse,
+ @PathParam("property") String property, @PathParam("cluster")
String cluster,
@PathParam("namespace") String namespace,
@PathParam("subscription") String subscription,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
- validateNamespaceName(property, cluster, namespace);
- internalUnsubscribeNamespace(subscription, authoritative);
+ try {
+ validateNamespaceName(property, cluster, namespace);
+ internalUnsubscribeNamespace(asyncResponse, subscription,
authoritative);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
}
@POST
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index ced620a..f1347e3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -37,6 +37,7 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
@@ -180,6 +181,9 @@ public class NonPersistentTopics extends PersistentTopics {
// check cluster ownership for a given global namespace:
redirect if peer-cluster owns it
validateGlobalNamespaceOwnership(nsName);
}
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ return;
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
return;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 78efca2..87df589 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -32,6 +32,7 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
@@ -76,8 +77,10 @@ public class PersistentTopics extends PersistentTopicsBase {
try {
validateNamespaceName(property, cluster, namespace);
asyncResponse.resume(internalGetList());
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
} catch (Exception e) {
- asyncResponse.resume(e instanceof RestException ? e : new
RestException(e));
+ asyncResponse.resume(new RestException(e));
}
}
@@ -286,6 +289,8 @@ public class PersistentTopics extends PersistentTopicsBase {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalGetPartitionedStats(asyncResponse, authoritative,
perPartition);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
@@ -306,6 +311,8 @@ public class PersistentTopics extends PersistentTopicsBase {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalGetPartitionedStatsInternal(asyncResponse, authoritative);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index d29972e..435892e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -32,6 +32,9 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
@@ -126,10 +129,17 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist"),
@ApiResponse(code = 409, message = "Namespace is not empty") })
- public void deleteNamespace(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
+ public void deleteNamespace(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
- validateNamespaceName(tenant, namespace);
- internalDeleteNamespace(authoritative);
+ try {
+ validateNamespaceName(tenant, namespace);
+ internalDeleteNamespace(asyncResponse, authoritative);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
}
@DELETE
@@ -302,9 +312,16 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or namespace doesn't
exist"),
@ApiResponse(code = 412, message = "Namespace is already unloaded
or Namespace has bundles activated") })
- public void unloadNamespace(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
- validateNamespaceName(tenant, namespace);
- internalUnloadNamespace();
+ public void unloadNamespace(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
+ try {
+ validateNamespaceName(tenant, namespace);
+ internalUnloadNamespace(asyncResponse);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
}
@PUT
@@ -545,10 +562,17 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Clear backlog for all topics on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
- public void clearNamespaceBacklog(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
+ public void clearNamespaceBacklog(@Suspended final AsyncResponse
asyncResponse, @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
- validateNamespaceName(tenant, namespace);
- internalClearNamespaceBacklog(authoritative);
+ try {
+ validateNamespaceName(tenant, namespace);
+ internalClearNamespaceBacklog(asyncResponse, authoritative);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
}
@POST
@@ -568,11 +592,18 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Clear backlog for a given subscription on all
topics on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
- public void clearNamespaceBacklogForSubscription(@PathParam("tenant")
String tenant,
- @PathParam("namespace") String namespace,
@PathParam("subscription") String subscription,
+ public void clearNamespaceBacklogForSubscription(@Suspended final
AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant, @PathParam("namespace") String
namespace,
+ @PathParam("subscription") String subscription,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
- validateNamespaceName(tenant, namespace);
- internalClearNamespaceBacklogForSubscription(subscription,
authoritative);
+ try {
+ validateNamespaceName(tenant, namespace);
+ internalClearNamespaceBacklogForSubscription(asyncResponse,
subscription, authoritative);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
}
@POST
@@ -593,11 +624,18 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Unsubscribes the given subscription on all topics
on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
- public void unsubscribeNamespace(@PathParam("tenant") String tenant,
@PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace,
@PathParam("subscription") String subscription,
+ public void unsubscribeNamespace(@Suspended final AsyncResponse
asyncResponse, @PathParam("tenant") String tenant,
+ @PathParam("cluster") String cluster, @PathParam("namespace")
String namespace,
+ @PathParam("subscription") String subscription,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
- validateNamespaceName(tenant, namespace);
- internalUnsubscribeNamespace(subscription, authoritative);
+ try {
+ validateNamespaceName(tenant, namespace);
+ internalUnsubscribeNamespace(asyncResponse, subscription,
authoritative);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
}
@POST
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 9807c9f..8125f5b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -38,6 +38,7 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
@@ -240,6 +241,9 @@ public class NonPersistentTopics extends PersistentTopics {
// check cluster ownership for a given global namespace: redirect
if peer-cluster owns it
validateGlobalNamespaceOwnership(namespaceName);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ return;
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
return;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 04aa79f..96a5ca9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -32,6 +32,7 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
@@ -83,8 +84,10 @@ public class PersistentTopics extends PersistentTopicsBase {
try {
validateNamespaceName(tenant, namespace);
asyncResponse.resume(internalGetList());
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
} catch (Exception e) {
- asyncResponse.resume(e instanceof RestException ? e : new
RestException(e));
+ asyncResponse.resume(new RestException(e));
}
}
@@ -482,6 +485,8 @@ public class PersistentTopics extends PersistentTopicsBase {
try {
validatePartitionedTopicName(tenant, namespace, encodedTopic);
internalGetPartitionedStats(asyncResponse, authoritative,
perPartition);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
@@ -510,6 +515,8 @@ public class PersistentTopics extends PersistentTopicsBase {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalGetPartitionedStatsInternal(asyncResponse, authoritative);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 643020d..b277ffe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
@@ -47,6 +48,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
@@ -84,6 +86,7 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
+import org.mockito.ArgumentCaptor;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -505,8 +508,11 @@ public class AdminTest extends MockedPulsarServiceBaseTest
{
properties.createTenant("tenant-config-is-null", null);
assertEquals(properties.getTenantAdmin("tenant-config-is-null"),
nullTenantInfo);
-
- namespaces.deleteNamespace("my-tenant", "use", "my-namespace", false);
+ AsyncResponse response = mock(AsyncResponse.class);
+ namespaces.deleteNamespace(response, "my-tenant", "use",
"my-namespace", false);
+ ArgumentCaptor<Response> captor =
ArgumentCaptor.forClass(Response.class);
+ verify(response, timeout(5000).times(1)).resume(captor.capture());
+ assertEquals(captor.getValue().getStatus(), Status.OK.getStatusCode());
properties.deleteTenant("my-tenant");
properties.deleteTenant("tenant-config-is-null");
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 138404e..763a42c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
@@ -43,9 +44,12 @@ import java.net.URL;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
@@ -79,6 +83,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooDefs;
+import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.testng.annotations.AfterMethod;
@@ -525,16 +530,14 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
// Trick to force redirection
conf.setAuthorizationEnabled(true);
- try {
- namespaces.deleteNamespace(this.testTenant, this.testOtherCluster,
- this.testLocalNamespaces.get(2).getLocalName(), false);
- fail("Should have raised exception to redirect request");
- } catch (WebApplicationException wae) {
- // OK
- assertEquals(wae.getResponse().getStatus(),
Status.TEMPORARY_REDIRECT.getStatusCode());
- assertEquals(wae.getResponse().getLocation().toString(),
-
UriBuilder.fromUri(uri).host("broker-usc.com").port(BROKER_WEBSERVICE_PORT).toString());
- }
+ AsyncResponse response = mock(AsyncResponse.class);
+ namespaces.deleteNamespace(response, this.testTenant,
this.testOtherCluster,
+ this.testLocalNamespaces.get(2).getLocalName(), false);
+ ArgumentCaptor<WebApplicationException> captor =
ArgumentCaptor.forClass(WebApplicationException.class);
+ verify(response, timeout(5000).times(1)).resume(captor.capture());
+ assertEquals(captor.getValue().getResponse().getStatus(),
Status.TEMPORARY_REDIRECT.getStatusCode());
+ assertEquals(captor.getValue().getResponse().getLocation().toString(),
+
UriBuilder.fromUri(uri).host("broker-usc.com").port(BROKER_WEBSERVICE_PORT).toString());
uri = URI.create("http://localhost" + ":" + BROKER_WEBSERVICE_PORT +
"/admin/namespace/"
+ this.testLocalNamespaces.get(2).toString() + "/unload");
@@ -572,27 +575,23 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
doReturn(uri).when(uriInfo).getRequestUri();
doReturn(true).when(namespaces).isLeaderBroker();
- try {
-
namespaces.deleteNamespace(this.testLocalNamespaces.get(2).getTenant(),
- this.testLocalNamespaces.get(2).getCluster(),
this.testLocalNamespaces.get(2).getLocalName(),
- false);
- fail("Should have raised exception to redirect request");
- } catch (WebApplicationException wae) {
- // OK
- assertEquals(wae.getResponse().getStatus(),
Status.TEMPORARY_REDIRECT.getStatusCode());
- assertEquals(wae.getResponse().getLocation().toString(),
-
UriBuilder.fromUri(uri).host("broker-usc.com").port(BROKER_WEBSERVICE_PORT).toString());
- }
+ response = mock(AsyncResponse.class);
+ namespaces.deleteNamespace(response,
this.testLocalNamespaces.get(2).getTenant(),
+ this.testLocalNamespaces.get(2).getCluster(),
this.testLocalNamespaces.get(2).getLocalName(), false);
+ captor = ArgumentCaptor.forClass(WebApplicationException.class);
+ verify(response, timeout(5000).times(1)).resume(captor.capture());
+ assertEquals(captor.getValue().getResponse().getStatus(),
Status.TEMPORARY_REDIRECT.getStatusCode());
+ assertEquals(captor.getValue().getResponse().getLocation().toString(),
+
UriBuilder.fromUri(uri).host("broker-usc.com").port(BROKER_WEBSERVICE_PORT).toString());
}
@Test
public void testDeleteNamespaces() throws Exception {
- try {
- namespaces.deleteNamespace(this.testTenant, this.testLocalCluster,
"non-existing-namespace-1", false);
- fail("should have failed");
- } catch (RestException e) {
- assertEquals(e.getResponse().getStatus(),
Status.NOT_FOUND.getStatusCode());
- }
+ AsyncResponse response = mock(AsyncResponse.class);
+ namespaces.deleteNamespace(response, this.testTenant,
this.testLocalCluster, "non-existing-namespace-1", false);
+ ArgumentCaptor<RestException> errorCaptor =
ArgumentCaptor.forClass(RestException.class);
+ verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
+ assertEquals(errorCaptor.getValue().getResponse().getStatus(),
Status.NOT_FOUND.getStatusCode());
NamespaceName testNs = this.testLocalNamespaces.get(1);
TopicName topicName =
TopicName.get(testNs.getPersistentTopicName("my-topic"));
@@ -603,39 +602,49 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs,
false, false, false);
doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
- try {
- namespaces.deleteNamespace(testNs.getTenant(),
testNs.getCluster(), testNs.getLocalName(), false);
- fail("should have failed");
- } catch (RestException e) {
- // Ok, namespace not empty
- assertEquals(e.getResponse().getStatus(),
Status.CONFLICT.getStatusCode());
- }
+
+ response = mock(AsyncResponse.class);
+ namespaces.deleteNamespace(response, testNs.getTenant(),
testNs.getCluster(), testNs.getLocalName(), false);
+ errorCaptor = ArgumentCaptor.forClass(RestException.class);
+ // Ok, namespace not empty
+ verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
+ assertEquals(errorCaptor.getValue().getResponse().getStatus(),
Status.CONFLICT.getStatusCode());
+
// delete the topic from ZK
mockZookKeeper.delete("/managed-ledgers/" +
topicName.getPersistenceNamingEncoding(), -1);
ZkUtils.createFullPathOptimistic(mockZookKeeper,
"/admin/partitioned-topics/" +
topicName.getPersistenceNamingEncoding(),
new byte[0], null, null);
- try {
- namespaces.deleteNamespace(testNs.getTenant(),
testNs.getCluster(), testNs.getLocalName(), false);
- fail("should have failed");
- } catch (RestException e) {
- // Ok, namespace not empty
- assertEquals(e.getResponse().getStatus(),
Status.CONFLICT.getStatusCode());
- }
+
+ response = mock(AsyncResponse.class);
+ namespaces.deleteNamespace(response, testNs.getTenant(),
testNs.getCluster(), testNs.getLocalName(), false);
+ errorCaptor = ArgumentCaptor.forClass(RestException.class);
+ // Ok, namespace not empty
+ verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
+ assertEquals(errorCaptor.getValue().getResponse().getStatus(),
Status.CONFLICT.getStatusCode());
+
mockZookKeeper.delete("/admin/partitioned-topics/" +
topicName.getPersistenceNamingEncoding(), -1);
testNs = this.testGlobalNamespaces.get(0);
// setup ownership to localhost
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs,
false, false, false);
doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
- namespaces.deleteNamespace(testNs.getTenant(), testNs.getCluster(),
testNs.getLocalName(), false);
+ response = mock(AsyncResponse.class);
+ namespaces.deleteNamespace(response, testNs.getTenant(),
testNs.getCluster(), testNs.getLocalName(), false);
+ ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
+ verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ assertEquals(responseCaptor.getValue().getStatus(),
Status.OK.getStatusCode());
testNs = this.testLocalNamespaces.get(0);
// setup ownership to localhost
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs,
false, false, false);
doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
- namespaces.deleteNamespace(testNs.getTenant(), testNs.getCluster(),
testNs.getLocalName(), false);
+ response = mock(AsyncResponse.class);
+ namespaces.deleteNamespace(response, testNs.getTenant(),
testNs.getCluster(), testNs.getLocalName(), false);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ assertEquals(responseCaptor.getValue().getStatus(),
Status.OK.getStatusCode());
List<String> nsList =
Lists.newArrayList(this.testLocalNamespaces.get(1).toString(),
this.testLocalNamespaces.get(2).toString());
nsList.sort(null);
@@ -647,7 +656,11 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
// setup ownership to localhost
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs,
false, false, false);
doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
- namespaces.deleteNamespace(testNs.getTenant(), testNs.getCluster(),
testNs.getLocalName(), false);
+ response = mock(AsyncResponse.class);
+ namespaces.deleteNamespace(response, testNs.getTenant(),
testNs.getCluster(), testNs.getLocalName(), false);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ assertEquals(responseCaptor.getValue().getStatus(),
Status.OK.getStatusCode());
}
@Test
@@ -682,9 +695,11 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
}
}));
- doThrow(new PulsarAdminException.PreconditionFailedException(
- new
ClientErrorException(Status.PRECONDITION_FAILED))).when(namespacesAdmin)
- .deleteNamespaceBundle(Mockito.anyString(),
Mockito.anyString());
+ CompletableFuture<Void> preconditionFailed = new CompletableFuture<>();
+ preconditionFailed.completeExceptionally(new
PulsarAdminException.PreconditionFailedException(
+ new ClientErrorException(Status.PRECONDITION_FAILED)));
+ doReturn(preconditionFailed).when(namespacesAdmin)
+ .deleteNamespaceBundleAsync(Mockito.anyString(),
Mockito.anyString());
try {
namespaces.deleteNamespaceBundle(testTenant, testLocalCluster,
bundledNsLocal, "0x00000000_0x80000000",
@@ -694,19 +709,18 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
assertEquals(re.getResponse().getStatus(),
Status.PRECONDITION_FAILED.getStatusCode());
}
- try {
- namespaces.deleteNamespace(testTenant, testLocalCluster,
bundledNsLocal, false);
- fail("Should have failed");
- } catch (RestException re) {
- assertEquals(re.getResponse().getStatus(),
Status.PRECONDITION_FAILED.getStatusCode());
- }
+ AsyncResponse response = mock(AsyncResponse.class);
+ namespaces.deleteNamespace(response, testTenant, testLocalCluster,
bundledNsLocal, false);
+ ArgumentCaptor<RestException> captor =
ArgumentCaptor.forClass(RestException.class);
+ verify(response, timeout(5000).times(1)).resume(captor.capture());
+ assertEquals(captor.getValue().getResponse().getStatus(),
Status.PRECONDITION_FAILED.getStatusCode());
NamespaceBundles nsBundles =
nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
// make one bundle owned
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(nsBundles.getBundles().get(0),
false,
true, false);
doReturn(true).when(nsSvc).isServiceUnitOwned(nsBundles.getBundles().get(0));
- doNothing().when(namespacesAdmin).deleteNamespaceBundle(
+
doReturn(CompletableFuture.completedFuture(null)).when(namespacesAdmin).deleteNamespaceBundleAsync(
testTenant + "/" + testLocalCluster + "/" + bundledNsLocal,
"0x00000000_0x80000000");
try {
@@ -717,12 +731,11 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
assertEquals(re.getResponse().getStatus(),
Status.PRECONDITION_FAILED.getStatusCode());
}
- try {
- namespaces.deleteNamespace(testTenant, testLocalCluster,
bundledNsLocal, false);
- fail("should have failed");
- } catch (RestException re) {
- assertEquals(re.getResponse().getStatus(),
Status.PRECONDITION_FAILED.getStatusCode());
- }
+ response = mock(AsyncResponse.class);
+ namespaces.deleteNamespace(response, testTenant, testLocalCluster,
bundledNsLocal, false);
+ captor = ArgumentCaptor.forClass(RestException.class);
+ verify(response, timeout(5000).times(1)).resume(captor.capture());
+ assertEquals(captor.getValue().getResponse().getStatus(),
Status.PRECONDITION_FAILED.getStatusCode());
// ensure all three bundles are owned by the local broker
for (NamespaceBundle bundle : nsBundles.getBundles()) {
@@ -744,7 +757,11 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
doNothing().when(namespaces).validateBundleOwnership(bundle, false,
true);
// The namespace unload should succeed on all the bundles
- namespaces.unloadNamespace(testNs.getTenant(), testNs.getCluster(),
testNs.getLocalName());
+ AsyncResponse response = mock(AsyncResponse.class);
+ namespaces.unloadNamespace(response, testNs.getTenant(),
testNs.getCluster(), testNs.getLocalName());
+ ArgumentCaptor<Response> captor =
ArgumentCaptor.forClass(Response.class);
+ verify(response, timeout(5000).times(1)).resume(captor.capture());
+ assertEquals(captor.getValue().getStatus(), Status.OK.getStatusCode());
}
@Test
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index a828993..c69a4a0 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.admin;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
@@ -304,6 +305,20 @@ public interface Namespaces {
void deleteNamespaceBundle(String namespace, String bundleRange) throws
PulsarAdminException;
/**
+ * Delete an existing bundle in a namespace asynchronously.
+ * <p>
+ * The bundle needs to be empty.
+ *
+ * @param namespace
+ * Namespace name
+ * @param bundleRange
+ * range of the bundle
+ *
+ * @return a future that can be used to track when the bundle is deleted
+ */
+ CompletableFuture<Void> deleteNamespaceBundleAsync(String namespace,
String bundleRange);
+
+ /**
* Get permissions on a namespace.
* <p>
* Retrieve the permissions for a namespace.
@@ -884,13 +899,25 @@ public interface Namespaces {
* Unload namespace bundle
*
* @param namespace
- * @bundle range of bundle to unload
+ * @param bundle
+ * range of bundle to unload
* @throws PulsarAdminException
* Unexpected error
*/
void unloadNamespaceBundle(String namespace, String bundle) throws
PulsarAdminException;
/**
+ * Unload namespace bundle asynchronously
+ *
+ * @param namespace
+ * @param bundle
+ * range of bundle to unload
+ *
+ * @return a future that can be used to track when the bundle is unloaded
+ */
+ CompletableFuture<Void> unloadNamespaceBundleAsync(String namespace,
String bundle);
+
+ /**
* Split namespace bundle
*
* @param namespace
@@ -1014,6 +1041,16 @@ public interface Namespaces {
void clearNamespaceBundleBacklog(String namespace, String bundle) throws
PulsarAdminException;
/**
+ * Clear backlog for all topics on a namespace bundle asynchronously
+ *
+ * @param namespace
+ * @param bundle
+ *
+ * @return a future that can be used to track when the bundle is cleared
+ */
+ CompletableFuture<Void> clearNamespaceBundleBacklogAsync(String namespace,
String bundle);
+
+ /**
* Clear backlog for a given subscription on all topics on a namespace
bundle
*
* @param namespace
@@ -1026,6 +1063,18 @@ public interface Namespaces {
throws PulsarAdminException;
/**
+ * Clear backlog for a given subscription on all topics on a namespace
bundle asynchronously
+ *
+ * @param namespace
+ * @param bundle
+ * @param subscription
+ *
+ * @return a future that can be used to track when the bundle is cleared
+ */
+ CompletableFuture<Void>
clearNamespaceBundleBacklogForSubscriptionAsync(String namespace, String bundle,
+ String subscription);
+
+ /**
* Unsubscribes the given subscription on all topics on a namespace
*
* @param namespace
@@ -1045,6 +1094,17 @@ public interface Namespaces {
void unsubscribeNamespaceBundle(String namespace, String bundle, String
subscription) throws PulsarAdminException;
/**
+ * Unsubscribes the given subscription on all topics on a namespace bundle
asynchronously
+ *
+ * @param namespace
+ * @param bundle
+ * @param subscription
+ *
+ * @return a future that can be used to track when the subscription is
unsubscribed
+ */
+ CompletableFuture<Void> unsubscribeNamespaceBundleAsync(String namespace,
String bundle, String subscription);
+
+ /**
* Set the encryption required status for all topics within a namespace.
* <p>
* When encryption required is true, the broker will prevent to store
unencrypted messages.
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 08f7b87..a5470a4 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -23,6 +23,8 @@ import static
com.google.common.base.Preconditions.checkArgument;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.client.Entity;
@@ -192,15 +194,23 @@ public class NamespacesImpl extends BaseResource
implements Namespaces {
@Override
public void deleteNamespaceBundle(String namespace, String bundleRange)
throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, bundleRange);
- request(path).delete(ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ deleteNamespaceBundleAsync(namespace, bundleRange).get();
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
}
}
@Override
+ public CompletableFuture<Void> deleteNamespaceBundleAsync(String
namespace, String bundleRange) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, bundleRange);
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public Map<String, Set<AuthAction>> getPermissions(String namespace)
throws PulsarAdminException {
try {
NamespaceName ns = NamespaceName.get(namespace);
@@ -496,15 +506,23 @@ public class NamespacesImpl extends BaseResource
implements Namespaces {
@Override
public void unloadNamespaceBundle(String namespace, String bundle) throws
PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, bundle, "unload");
- request(path).put(Entity.entity("", MediaType.APPLICATION_JSON),
ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ unloadNamespaceBundleAsync(namespace, bundle).get();
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
}
}
@Override
+ public CompletableFuture<Void> unloadNamespaceBundleAsync(String
namespace, String bundle) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, bundle, "unload");
+ return asyncPutRequest(path, Entity.entity("",
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void splitNamespaceBundle(String namespace, String bundle, boolean
unloadSplitBundles)
throws PulsarAdminException {
try {
@@ -631,27 +649,44 @@ public class NamespacesImpl extends BaseResource
implements Namespaces {
@Override
public void clearNamespaceBundleBacklog(String namespace, String bundle)
throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, bundle, "clearBacklog");
- request(path).post(Entity.entity("", MediaType.APPLICATION_JSON),
ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ clearNamespaceBundleBacklogAsync(namespace, bundle).get();
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
}
}
@Override
+ public CompletableFuture<Void> clearNamespaceBundleBacklogAsync(String
namespace, String bundle) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, bundle, "clearBacklog");
+ return asyncPostRequest(path, Entity.entity("",
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void clearNamespaceBundleBacklogForSubscription(String namespace,
String bundle, String subscription)
throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, bundle, "clearBacklog",
subscription);
- request(path).post(Entity.entity("", MediaType.APPLICATION_JSON),
ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ clearNamespaceBundleBacklogForSubscriptionAsync(namespace, bundle,
subscription).get();
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
}
}
@Override
+ public CompletableFuture<Void>
clearNamespaceBundleBacklogForSubscriptionAsync(String namespace, String bundle,
+ String subscription) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, bundle, "clearBacklog",
subscription);
+ return asyncPostRequest(path, Entity.entity("",
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void unsubscribeNamespace(String namespace, String subscription)
throws PulsarAdminException {
try {
NamespaceName ns = NamespaceName.get(namespace);
@@ -666,15 +701,24 @@ public class NamespacesImpl extends BaseResource
implements Namespaces {
public void unsubscribeNamespaceBundle(String namespace, String bundle,
String subscription)
throws PulsarAdminException {
try {
- NamespaceName ns = NamespaceName.get(namespace);
- WebTarget path = namespacePath(ns, bundle, "unsubscribe",
subscription);
- request(path).post(Entity.entity("", MediaType.APPLICATION_JSON),
ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ unsubscribeNamespaceBundleAsync(namespace, bundle,
subscription).get();
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
}
}
@Override
+ public CompletableFuture<Void> unsubscribeNamespaceBundleAsync(String
namespace, String bundle,
+ String subscription) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, bundle, "unsubscribe",
subscription);
+ return asyncPostRequest(path, Entity.entity("",
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void setSubscriptionAuthMode(String namespace, SubscriptionAuthMode
subscriptionAuthMode) throws PulsarAdminException {
try {
NamespaceName ns = NamespaceName.get(namespace);