This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 138ea354f9d [improve][broker] Make some methods in NamespacesBase
async. (#15518)
138ea354f9d is described below
commit 138ea354f9de30e4b7bc9d4fe6ee16b84cd4a896
Author: Jiwei Guo <[email protected]>
AuthorDate: Sat May 14 09:59:16 2022 +0800
[improve][broker] Make some methods in NamespacesBase async. (#15518)
### Motivation
See PIP #14365 and change tracker #15043.
Make `NamespacesBase` `getTenantNamespaces / createNamespace / getTopics /
getPolicies / getPermissions` methods to pure async.
---
.../broker/resources/NamespaceResources.java | 4 +
.../pulsar/broker/resources/TenantResources.java | 38 +++-
.../apache/pulsar/broker/admin/AdminResource.java | 10 +
.../pulsar/broker/admin/impl/NamespacesBase.java | 74 ++++---
.../apache/pulsar/broker/admin/v1/Namespaces.java | 126 ++++++++----
.../apache/pulsar/broker/admin/v2/Namespaces.java | 96 ++++++---
.../pulsar/broker/web/PulsarWebResource.java | 15 ++
.../org/apache/pulsar/broker/admin/AdminTest.java | 221 ++++++---------------
.../apache/pulsar/broker/admin/NamespacesTest.java | 80 ++++----
.../broker/auth/MockedPulsarServiceBaseTest.java | 99 +++++++++
10 files changed, 453 insertions(+), 310 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index ce797a80c7c..c24df6c586f 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -93,6 +93,10 @@ public class NamespaceResources extends
BaseResources<Policies> {
create(joinPath(BASE_POLICIES_PATH, ns.toString()), policies);
}
+ public CompletableFuture<Void> createPoliciesAsync(NamespaceName ns,
Policies policies) {
+ return createAsync(joinPath(BASE_POLICIES_PATH, ns.toString()),
policies);
+ }
+
public boolean namespaceExists(NamespaceName ns) throws
MetadataStoreException {
String path = joinPath(BASE_POLICIES_PATH, ns.toString());
return super.exists(path) && super.getChildren(path).isEmpty();
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
index 3313b61c8a1..87f48f7e682 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.resources;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -78,7 +79,7 @@ public class TenantResources extends
BaseResources<TenantInfo> {
}
public CompletableFuture<Boolean> tenantExistsAsync(String tenantName) {
- return getCache().exists(joinPath(BASE_POLICIES_PATH, tenantName));
+ return existsAsync(joinPath(BASE_POLICIES_PATH, tenantName));
}
public List<String> getListOfNamespaces(String tenant) throws
MetadataStoreException {
@@ -110,6 +111,41 @@ public class TenantResources extends
BaseResources<TenantInfo> {
return namespaces;
}
+ public CompletableFuture<List<String>> getListOfNamespacesAsync(String
tenant) {
+ // this will return a cluster in v1 and a namespace in v2
+ return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant))
+ .thenCompose(clusterOrNamespaces ->
clusterOrNamespaces.stream().map(key ->
+ getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant,
key))
+ .thenCompose(children -> {
+ if (children == null ||
children.isEmpty()) {
+ String namespace =
NamespaceName.get(tenant, key).toString();
+ // if the length is 0 then this is
probably a leftover cluster from namespace
+ // created with the v1 admin format
(prop/cluster/ns) and then deleted, so no
+ // need to add it to the list
+ return
getAsync(joinPath(BASE_POLICIES_PATH, namespace))
+ .thenApply(opt -> opt.isPresent() ?
Collections.singletonList(namespace)
+ : new ArrayList<String>())
+ .exceptionally(ex -> {
+ Throwable cause =
FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof
MetadataStoreException
+
.ContentDeserializationException) {
+ return new ArrayList<>();
+ }
+ throw
FutureUtil.wrapToCompletionException(ex);
+ });
+ } else {
+ CompletableFuture<List<String>> ret =
new CompletableFuture();
+ ret.complete(children.stream().map(ns
-> NamespaceName.get(tenant, key, ns)
+
.toString()).collect(Collectors.toList()));
+ return ret;
+ }
+
})).reduce(CompletableFuture.completedFuture(new ArrayList<>()),
+ (accumulator, n) ->
accumulator.thenCompose(namespaces -> n.thenApply(m -> {
+ namespaces.addAll(m);
+ return namespaces;
+ }))));
+ }
+
public CompletableFuture<List<String>> getActiveNamespaces(String tenant,
String cluster) {
return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, cluster));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index ec502e134f2..3a396411c63 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -323,6 +323,11 @@ public abstract class AdminResource extends
PulsarWebResource {
return FutureUtil.failedFuture(new RestException(e));
}
policies.get().bundles = bundleData != null ? bundleData :
policies.get().bundles;
+ if (policies.get().is_allow_auto_update_schema == null) {
+ // the type changed from boolean to Boolean. return
broker value here for keeping compatibility.
+ policies.get().is_allow_auto_update_schema =
pulsar().getConfig()
+ .isAllowAutoUpdateSchemaEnabled();
+ }
return CompletableFuture.completedFuture(policies.get());
});
} else {
@@ -534,6 +539,11 @@ public abstract class AdminResource extends
PulsarWebResource {
}
}
+ protected CompletableFuture<List<String>>
getPartitionedTopicListAsync(TopicDomain topicDomain) {
+ return namespaceResources().getPartitionedTopicResources()
+ .listPartitionedTopicsAsync(namespaceName, topicDomain);
+ }
+
protected List<String> getTopicPartitionList(TopicDomain topicDomain) {
try {
return
getPulsarResources().getTopicResources().getExistingPartitions(topicName)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 49a221946ee..39a4eb13060 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -103,7 +103,6 @@ import
org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
-import
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.slf4j.Logger;
@@ -111,52 +110,47 @@ import org.slf4j.LoggerFactory;
public abstract class NamespacesBase extends AdminResource {
- protected List<String> internalGetTenantNamespaces(String tenant) {
- checkNotNull(tenant, "Tenant should not be null");
+ protected CompletableFuture<List<String>>
internalGetTenantNamespaces(String tenant) {
+ if (tenant == null) {
+ return FutureUtil.failedFuture(new
RestException(Status.BAD_REQUEST, "Tenant should not be null"));
+ }
try {
NamedEntity.checkName(tenant);
} catch (IllegalArgumentException e) {
log.warn("[{}] Tenant name is invalid {}", clientAppId(), tenant,
e);
- throw new RestException(Status.PRECONDITION_FAILED, "Tenant name
is not valid");
- }
- validateTenantOperation(tenant, TenantOperation.LIST_NAMESPACES);
-
- try {
- if (!tenantResources().tenantExists(tenant)) {
- throw new RestException(Status.NOT_FOUND, "Tenant not found");
- }
-
- return tenantResources().getListOfNamespaces(tenant);
- } catch (Exception e) {
- log.error("[{}] Failed to get namespaces list: {}", clientAppId(),
e);
- throw new RestException(e);
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid"));
}
+ return validateTenantOperationAsync(tenant,
TenantOperation.LIST_NAMESPACES)
+ .thenCompose(__ -> tenantResources().tenantExistsAsync(tenant))
+ .thenCompose(existed -> {
+ if (!existed) {
+ throw new RestException(Status.NOT_FOUND, "Tenant not
found");
+ }
+ return tenantResources().getListOfNamespacesAsync(tenant);
+ });
}
- protected void internalCreateNamespace(Policies policies) {
- validateTenantOperation(namespaceName.getTenant(),
TenantOperation.CREATE_NAMESPACE);
- validatePoliciesReadOnlyAccess();
- validatePolicies(namespaceName, policies);
-
- try {
- int maxNamespacesPerTenant =
pulsar().getConfiguration().getMaxNamespacesPerTenant();
- // no distributed locks are added here.In a concurrent scenario,
the threshold will be exceeded.
- if (maxNamespacesPerTenant > 0) {
- List<String> namespaces =
tenantResources().getListOfNamespaces(namespaceName.getTenant());
- if (namespaces != null && namespaces.size() >
maxNamespacesPerTenant) {
- throw new RestException(Status.PRECONDITION_FAILED,
- "Exceed the maximum number of namespace in tenant
:" + namespaceName.getTenant());
- }
- }
- namespaceResources().createPolicies(namespaceName, policies);
- log.info("[{}] Created namespace {}", clientAppId(),
namespaceName);
- } catch (AlreadyExistsException e) {
- log.warn("[{}] Failed to create namespace {} - already exists",
clientAppId(), namespaceName);
- throw new RestException(Status.CONFLICT, "Namespace already
exists");
- } catch (Exception e) {
- log.error("[{}] Failed to create namespace {}", clientAppId(),
namespaceName, e);
- throw new RestException(e);
- }
+ protected CompletableFuture<Void> internalCreateNamespace(Policies
policies) {
+ return validateTenantOperationAsync(namespaceName.getTenant(),
TenantOperation.CREATE_NAMESPACE)
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenAccept(__ -> validatePolicies(namespaceName, policies))
+ .thenCompose(__ -> {
+ int maxNamespacesPerTenant =
pulsar().getConfiguration().getMaxNamespacesPerTenant();
+ // no distributed locks are added here.In a concurrent
scenario, the threshold will be exceeded.
+ if (maxNamespacesPerTenant > 0) {
+ return
tenantResources().getListOfNamespacesAsync(namespaceName.getTenant())
+ .thenAccept(namespaces -> {
+ if (namespaces != null &&
namespaces.size() > maxNamespacesPerTenant) {
+ throw new
RestException(Status.PRECONDITION_FAILED,
+ "Exceed the maximum number of
namespace in tenant :"
+ +
namespaceName.getTenant());
+ }
+ });
+ }
+ return CompletableFuture.completedFuture(null);
+ })
+ .thenCompose(__ ->
namespaceResources().createPoliciesAsync(namespaceName, policies))
+ .thenAccept(__ -> log.info("[{}] Created namespace {}",
clientAppId(), namespaceName));
}
protected void internalDeleteNamespace(AsyncResponse asyncResponse,
boolean authoritative, boolean force) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index c21c8b8910f..0ccc505d5e8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -28,6 +28,7 @@ import io.swagger.annotations.ApiResponses;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
@@ -42,6 +43,7 @@ import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.web.RestException;
@@ -67,6 +69,8 @@ import
org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrat
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,8 +88,15 @@ public class Namespaces extends NamespacesBase {
response = String.class, responseContainer = "Set")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Property doesn't exist")})
- public List<String> getTenantNamespaces(@PathParam("property") String
property) {
- return internalGetTenantNamespaces(property);
+ public void getTenantNamespaces(@Suspended AsyncResponse response,
+ @PathParam("property") String property) {
+ internalGetTenantNamespaces(property)
+ .thenAccept(response::resume)
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get namespaces list: {}",
clientAppId(), ex);
+ resumeAsyncResponseExceptionally(response, ex);
+ return null;
+ });
}
@GET
@@ -125,21 +136,20 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin or operate
permission on the namespace"),
@ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist")})
- public void getTopics(@PathParam("property") String property,
- @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
- @QueryParam("mode")
@DefaultValue("PERSISTENT") Mode mode,
- @Suspended AsyncResponse asyncResponse) {
+ public void getTopics(@Suspended AsyncResponse response,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace,
+ @QueryParam("mode") @DefaultValue("PERSISTENT") Mode
mode) {
validateNamespaceName(property, cluster, namespace);
- validateNamespaceOperation(NamespaceName.get(property, namespace),
NamespaceOperation.GET_TOPICS);
-
- // Validate that namespace exists, throws 404 if it doesn't exist
- getNamespacePolicies(namespaceName);
-
- pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
- .thenAccept(asyncResponse::resume)
+ validateNamespaceOperationAsync(NamespaceName.get(property,
namespace), NamespaceOperation.GET_TOPICS)
+ // Validate that namespace exists, throws 404 if it doesn't
exist
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenCompose(__ ->
pulsar().getNamespaceService().getListOfTopics(namespaceName, mode))
+ .thenAccept(response::resume)
.exceptionally(ex -> {
log.error("Failed to get topics list for namespace {}",
namespaceName, ex);
- asyncResponse.resume(ex);
+ resumeAsyncResponseExceptionally(response, ex);
return null;
});
}
@@ -150,11 +160,20 @@ public class Namespaces extends NamespacesBase {
response = Policies.class)
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist")})
- public Policies getPolicies(@PathParam("property") String property,
@PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace) {
+ public void getPolicies(@Suspended AsyncResponse response,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
- validateNamespacePolicyOperation(NamespaceName.get(property,
namespace), PolicyName.ALL, PolicyOperation.READ);
- return getNamespacePolicies(namespaceName);
+ validateNamespacePolicyOperationAsync(NamespaceName.get(property,
namespace), PolicyName.ALL,
+ PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(response::resume)
+ .exceptionally(ex -> {
+ log.error("Failed to get policies for namespace {}",
namespaceName, ex);
+ resumeAsyncResponseExceptionally(response, ex);
+ return null;
+ });
}
@SuppressWarnings("deprecation")
@@ -165,29 +184,46 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist"),
@ApiResponse(code = 409, message = "Namespace already exists"),
@ApiResponse(code = 412, message = "Namespace name is not valid")
})
- public void createNamespace(@PathParam("property") String property,
@PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace, BundlesData
initialBundles) {
+ public void createNamespace(@Suspended AsyncResponse response,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace,
+ BundlesData initialBundles) {
validateNamespaceName(property, cluster, namespace);
+ CompletableFuture<Void> ret;
if (!namespaceName.isGlobal()) {
// If the namespace is non global, make sure property has the
access on the cluster. For global namespace,
// same check is made at the time of setting replication.
- validateClusterForTenant(namespaceName.getTenant(),
namespaceName.getCluster());
+ ret = validateClusterForTenantAsync(namespaceName.getTenant(),
namespaceName.getCluster());
+ } else {
+ ret = CompletableFuture.completedFuture(null);
}
-
- Policies policies = new Policies();
- if (initialBundles != null && initialBundles.getNumBundles() > 0) {
- if (initialBundles.getBoundaries() == null ||
initialBundles.getBoundaries().size() == 0) {
- policies.bundles = getBundles(initialBundles.getNumBundles());
+ ret.thenApply(__ -> {
+ Policies policies = new Policies();
+ if (initialBundles != null && initialBundles.getNumBundles() > 0) {
+ if (initialBundles.getBoundaries() == null ||
initialBundles.getBoundaries().size() == 0) {
+ policies.bundles =
getBundles(initialBundles.getNumBundles());
+ } else {
+ policies.bundles = validateBundlesData(initialBundles);
+ }
} else {
- policies.bundles = validateBundlesData(initialBundles);
+ int defaultNumberOfBundles =
config().getDefaultNumberOfNamespaceBundles();
+ policies.bundles = getBundles(defaultNumberOfBundles);
}
- } else {
- int defaultNumberOfBundles =
config().getDefaultNumberOfNamespaceBundles();
- policies.bundles = getBundles(defaultNumberOfBundles);
- }
-
- internalCreateNamespace(policies);
+ return policies;
+ }).thenCompose(this::internalCreateNamespace)
+ .thenAccept(__ -> response.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ Throwable root = FutureUtil.unwrapCompletionException(ex);
+ if (root instanceof
MetadataStoreException.AlreadyExistsException) {
+ response.resume(new RestException(Status.CONFLICT,
"Namespace already exists"));
+ } else {
+ log.error("[{}] Failed to create namespace {}",
clientAppId(), namespaceName, ex);
+ resumeAsyncResponseExceptionally(response, ex);
+ }
+ return null;
+ });
}
@DELETE
@@ -200,9 +236,9 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 405, message = "Broker doesn't allow forced
deletion of namespaces"),
@ApiResponse(code = 409, message = "Namespace is not empty") })
public void deleteNamespace(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property,
- @PathParam("cluster") String cluster, @PathParam("namespace")
String namespace,
- @QueryParam("force") @DefaultValue("false") boolean force,
- @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
+ @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
+ @QueryParam("force") @DefaultValue("false")
boolean force,
+ @QueryParam("authoritative")
@DefaultValue("false") boolean authoritative) {
try {
validateNamespaceName(property, cluster, namespace);
internalDeleteNamespace(asyncResponse, authoritative, force);
@@ -236,13 +272,19 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist"),
@ApiResponse(code = 409, message = "Namespace is not empty") })
- public Map<String, Set<AuthAction>> getPermissions(@PathParam("property")
String property,
- @PathParam("cluster") String cluster, @PathParam("namespace")
String namespace) {
+ public void getPermissions(@Suspended AsyncResponse response,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
- validateNamespaceOperation(NamespaceName.get(property, namespace),
NamespaceOperation.GET_PERMISSION);
-
- Policies policies = getNamespacePolicies(namespaceName);
- return policies.auth_policies.getNamespaceAuthentication();
+ validateNamespaceOperationAsync(NamespaceName.get(property,
namespace), NamespaceOperation.GET_PERMISSION)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies ->
response.resume(policies.auth_policies.getNamespaceAuthentication()))
+ .exceptionally(ex -> {
+ log.error("Failed to get permissions for namespace {}",
namespaceName, ex);
+ resumeAsyncResponseExceptionally(response, ex);
+ return null;
+ });
}
@GET
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index e3f82b72d2a..2769b5b3bc3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -75,7 +75,9 @@ import
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,8 +93,15 @@ public class Namespaces extends NamespacesBase {
response = String.class, responseContainer = "Set")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant doesn't exist")})
- public List<String> getTenantNamespaces(@PathParam("tenant") String
tenant) {
- return internalGetTenantNamespaces(tenant);
+ public void getTenantNamespaces(@Suspended final AsyncResponse response,
+ @PathParam("tenant") String tenant) {
+ internalGetTenantNamespaces(tenant)
+ .thenAccept(response::resume)
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get namespaces list: {}",
clientAppId(), ex);
+ resumeAsyncResponseExceptionally(response, ex);
+ return null;
+ });
}
@GET
@@ -102,21 +111,19 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin or operate
permission on the namespace"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist")})
- public void getTopics(@PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace,
- @QueryParam("mode")
@DefaultValue("PERSISTENT") Mode mode,
- @Suspended AsyncResponse asyncResponse) {
- validateNamespaceName(tenant, namespace);
- validateNamespaceOperation(NamespaceName.get(tenant, namespace),
NamespaceOperation.GET_TOPICS);
-
- // Validate that namespace exists, throws 404 if it doesn't exist
- getNamespacePolicies(namespaceName);
-
- pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
- .thenAccept(asyncResponse::resume)
+ public void getTopics(@Suspended AsyncResponse response,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @QueryParam("mode") @DefaultValue("PERSISTENT") Mode
mode) {
+ validateNamespaceName(tenant, namespace);
+ validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace),
NamespaceOperation.GET_TOPICS)
+ // Validate that namespace exists, throws 404 if it doesn't
exist
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenCompose(__ ->
pulsar().getNamespaceService().getListOfTopics(namespaceName, mode))
+ .thenAccept(response::resume)
.exceptionally(ex -> {
log.error("Failed to get topics list for namespace {}",
namespaceName, ex);
- asyncResponse.resume(ex);
+ resumeAsyncResponseExceptionally(response, ex);
return null;
});
}
@@ -126,10 +133,19 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Get the dump all the policies specified for a
namespace.", response = Policies.class)
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist") })
- public Policies getPolicies(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
- validateNamespaceName(tenant, namespace);
- validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace),
PolicyName.ALL, PolicyOperation.READ);
- return getNamespacePolicies(namespaceName);
+ public void getPolicies(@Suspended AsyncResponse response,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(tenant, namespace);
+ validateNamespacePolicyOperationAsync(NamespaceName.get(tenant,
namespace), PolicyName.ALL,
+ PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(response::resume)
+ .exceptionally(ex -> {
+ log.error("Failed to get policies for namespace {}",
namespaceName, ex);
+ resumeAsyncResponseExceptionally(response, ex);
+ return null;
+ });
}
@PUT
@@ -139,11 +155,24 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 404, message = "Tenant or cluster doesn't
exist"),
@ApiResponse(code = 409, message = "Namespace already exists"),
@ApiResponse(code = 412, message = "Namespace name is not valid")
})
- public void createNamespace(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
- @ApiParam(value = "Policies for the namespace") Policies policies)
{
+ public void createNamespace(@Suspended AsyncResponse response,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Policies for the
namespace") Policies policies) {
validateNamespaceName(tenant, namespace);
policies = getDefaultPolicesIfNull(policies);
- internalCreateNamespace(policies);
+ internalCreateNamespace(policies)
+ .thenAccept(__ ->
response.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ Throwable root = FutureUtil.unwrapCompletionException(ex);
+ if (root instanceof
MetadataStoreException.AlreadyExistsException) {
+ response.resume(new
RestException(Response.Status.CONFLICT, "Namespace already exists"));
+ } else {
+ log.error("[{}] Failed to create namespace {}",
clientAppId(), namespaceName, ex);
+ resumeAsyncResponseExceptionally(response, ex);
+ }
+ return null;
+ });
}
@DELETE
@@ -156,9 +185,9 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 405, message = "Broker doesn't allow forced
deletion of namespaces"),
@ApiResponse(code = 409, message = "Namespace is not empty") })
public void deleteNamespace(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace,
- @QueryParam("force") @DefaultValue("false") boolean force,
- @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
+ @PathParam("namespace") String namespace,
+ @QueryParam("force") @DefaultValue("false")
boolean force,
+ @QueryParam("authoritative")
@DefaultValue("false") boolean authoritative) {
try {
validateNamespaceName(tenant, namespace);
internalDeleteNamespace(asyncResponse, authoritative, force);
@@ -191,13 +220,18 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist"),
@ApiResponse(code = 409, message = "Namespace is not empty") })
- public Map<String, Set<AuthAction>> getPermissions(@PathParam("tenant")
String tenant,
- @PathParam("namespace") String namespace) {
+ public void getPermissions(@Suspended AsyncResponse response,
+ @PathParam("tenant")
String tenant,
+ @PathParam("namespace")
String namespace) {
validateNamespaceName(tenant, namespace);
- validateNamespaceOperation(NamespaceName.get(tenant, namespace),
NamespaceOperation.GET_PERMISSION);
-
- Policies policies = getNamespacePolicies(namespaceName);
- return policies.auth_policies.getNamespaceAuthentication();
+ validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace),
NamespaceOperation.GET_PERMISSION)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies ->
response.resume(policies.auth_policies.getNamespaceAuthentication()))
+ .exceptionally(ex -> {
+ log.error("Failed to get permissions for namespace {}",
namespaceName, ex);
+ resumeAsyncResponseExceptionally(response, ex);
+ return null;
+ });
}
@GET
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 1690210a80a..38c61d3688d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -415,6 +415,21 @@ public abstract class PulsarWebResource {
log.info("Successfully validated clusters on tenant [{}]", tenant);
}
+ protected CompletableFuture<Void> validateClusterForTenantAsync(String
tenant, String cluster) {
+ return
pulsar().getPulsarResources().getTenantResources().getTenantAsync(tenant)
+ .thenAccept(tenantInfo -> {
+ if (!tenantInfo.isPresent()) {
+ throw new RestException(Status.NOT_FOUND, "Tenant does
not exist");
+ }
+ if
(!tenantInfo.get().getAllowedClusters().contains(cluster)) {
+ String msg = String.format("Cluster [%s] is not in the
list of allowed clusters list"
+ + " for tenant [%s]", cluster, tenant);
+ log.info(msg);
+ throw new RestException(Status.FORBIDDEN, msg);
+ }
+ });
+ }
+
protected CompletableFuture<Void> validateClusterOwnershipAsync(String
cluster) {
return getClusterDataIfDifferentCluster(pulsar(), cluster,
clientAppId())
.thenAccept(differentClusterData -> {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index bfc29250028..583a87275bd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -38,17 +38,12 @@ import java.lang.reflect.Field;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.container.TimeoutHandler;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
@@ -196,7 +191,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
new ClientConfiguration().getZkLedgersRootPath(),
conf.isBookkeeperMetadataStoreSeparated() ?
conf.getBookkeeperMetadataStoreUrl() : null,
pulsar.getWorkerConfig().map(WorkerConfig::getStateStorageServiceUrl).orElse(null));
- Object response = asynRequests(ctx ->
brokers.getInternalConfigurationData(ctx));
+ Object response = asyncRequests(ctx ->
brokers.getInternalConfigurationData(ctx));
assertTrue(response instanceof InternalConfigurationData);
assertEquals(response, expectedData);
}
@@ -204,18 +199,18 @@ public class AdminTest extends
MockedPulsarServiceBaseTest {
@Test
@SuppressWarnings("unchecked")
public void clusters() throws Exception {
- assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)),
Sets.newHashSet());
+ assertEquals(asyncRequests(ctx -> clusters.getClusters(ctx)),
Sets.newHashSet());
verify(clusters, never()).validateSuperUserAccessAsync();
- asynRequests(ctx -> clusters.createCluster(ctx,
+ asyncRequests(ctx -> clusters.createCluster(ctx,
"use",
ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build()));
// ensure to read from ZooKeeper directly
//clusters.clustersListCache().clear();
- assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)),
Sets.newHashSet("use"));
+ assertEquals(asyncRequests(ctx -> clusters.getClusters(ctx)),
Sets.newHashSet("use"));
// Check creating existing cluster
try {
- asynRequests(ctx -> clusters.createCluster(ctx, "use",
+ asyncRequests(ctx -> clusters.createCluster(ctx, "use",
ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build()));
fail("should have failed");
} catch (RestException e) {
@@ -224,23 +219,23 @@ public class AdminTest extends
MockedPulsarServiceBaseTest {
// Check deleting non-existing cluster
try {
- asynRequests(ctx -> clusters.deleteCluster(ctx, "usc"));
+ asyncRequests(ctx -> clusters.deleteCluster(ctx, "usc"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.NOT_FOUND.getStatusCode());
}
- assertEquals(asynRequests(ctx -> clusters.getCluster(ctx, "use")),
+ assertEquals(asyncRequests(ctx -> clusters.getCluster(ctx, "use")),
ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build());
- asynRequests(ctx -> clusters.updateCluster(ctx, "use",
+ asyncRequests(ctx -> clusters.updateCluster(ctx, "use",
ClusterDataImpl.builder().serviceUrl("http://new-broker.messaging.use.example.com:8080").build()));
- assertEquals(asynRequests(ctx -> clusters.getCluster(ctx, "use")),
+ assertEquals(asyncRequests(ctx -> clusters.getCluster(ctx, "use")),
ClusterData.builder().serviceUrl("http://new-broker.messaging.use.example.com:8080").build());
try {
- asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx,
"use"));
+ asyncRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx,
"use"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 404);
@@ -260,38 +255,38 @@ public class AdminTest extends
MockedPulsarServiceBaseTest {
.build();
AsyncResponse response = mock(AsyncResponse.class);
clusters.setNamespaceIsolationPolicy(response,"use", "policy1",
policyData);
- asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx,
"use"));
+ asyncRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx,
"use"));
try {
- asynRequests(ctx -> clusters.deleteCluster(ctx, "use"));
+ asyncRequests(ctx -> clusters.deleteCluster(ctx, "use"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 412);
}
clusters.deleteNamespaceIsolationPolicy("use", "policy1");
- assertTrue(((Map<String, NamespaceIsolationDataImpl>) asynRequests(ctx
->
+ assertTrue(((Map<String, NamespaceIsolationDataImpl>)
asyncRequests(ctx ->
clusters.getNamespaceIsolationPolicies(ctx,
"use"))).isEmpty());
- asynRequests(ctx -> clusters.deleteCluster(ctx, "use"));
- assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)),
Sets.newHashSet());
+ asyncRequests(ctx -> clusters.deleteCluster(ctx, "use"));
+ assertEquals(asyncRequests(ctx -> clusters.getClusters(ctx)),
Sets.newHashSet());
try {
- asynRequests(ctx -> clusters.getCluster(ctx, "use"));
+ asyncRequests(ctx -> clusters.getCluster(ctx, "use"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 404);
}
try {
- asynRequests(ctx -> clusters.updateCluster(ctx, "use",
ClusterDataImpl.builder().build()));
+ asyncRequests(ctx -> clusters.updateCluster(ctx, "use",
ClusterDataImpl.builder().build()));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 404);
}
try {
- asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx,
"use"));
+ asyncRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx,
"use"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 404);
@@ -311,7 +306,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
clusterCache.invalidateAll();
store.invalidateAll();
try {
- asynRequests(ctx -> clusters.getClusters(ctx));
+ asyncRequests(ctx -> clusters.getClusters(ctx));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -322,7 +317,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
&& path.equals("/admin/clusters/test");
});
try {
- asynRequests(ctx -> clusters.createCluster(ctx, "test",
+ asyncRequests(ctx -> clusters.createCluster(ctx, "test",
ClusterDataImpl.builder().serviceUrl("http://broker.messaging.test.example.com:8080").build()));
fail("should have failed");
} catch (RestException e) {
@@ -336,7 +331,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
clusterCache.invalidateAll();
store.invalidateAll();
try {
- asynRequests(ctx -> clusters.updateCluster(ctx, "test",
+ asyncRequests(ctx -> clusters.updateCluster(ctx, "test",
ClusterDataImpl.builder().serviceUrl("http://broker.messaging.test.example.com").build()));
fail("should have failed");
} catch (RestException e) {
@@ -349,7 +344,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
});
try {
- asynRequests(ctx -> clusters.getCluster(ctx, "test"));
+ asyncRequests(ctx -> clusters.getCluster(ctx, "test"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -361,7 +356,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
});
try {
- asynRequests(ctx -> clusters.deleteCluster(ctx, "use"));
+ asyncRequests(ctx -> clusters.deleteCluster(ctx, "use"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -375,7 +370,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
isolationPolicyCache.invalidateAll();
store.invalidateAll();
try {
- asynRequests(ctx -> clusters.deleteCluster(ctx, "use"));
+ asyncRequests(ctx -> clusters.deleteCluster(ctx, "use"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -383,7 +378,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
// Check name validations
try {
- asynRequests(ctx -> clusters.createCluster(ctx, "bf@",
+ asyncRequests(ctx -> clusters.createCluster(ctx, "bf@",
ClusterDataImpl.builder().serviceUrl("http://dummy.messaging.example.com").build()));
fail("should have filed");
} catch (RestException e) {
@@ -392,7 +387,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
// Check authentication and listener name
try {
- asynRequests(ctx -> clusters.createCluster(ctx, "auth",
ClusterDataImpl.builder()
+ asyncRequests(ctx -> clusters.createCluster(ctx, "auth",
ClusterDataImpl.builder()
.serviceUrl("http://dummy.web.example.com")
.serviceUrlTls("")
.brokerServiceUrl("http://dummy.messaging.example.com")
@@ -401,7 +396,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
.authenticationParameters("authenticationParameters")
.listenerName("listenerName")
.build()));
- ClusterData cluster = (ClusterData) asynRequests(ctx ->
clusters.getCluster(ctx, "auth"));
+ ClusterData cluster = (ClusterData) asyncRequests(ctx ->
clusters.getCluster(ctx, "auth"));
assertEquals(cluster.getAuthenticationPlugin(),
"authenticationPlugin");
assertEquals(cluster.getAuthenticationParameters(),
"authenticationParameters");
assertEquals(cluster.getListenerName(), "listenerName");
@@ -412,23 +407,14 @@ public class AdminTest extends
MockedPulsarServiceBaseTest {
verify(clusters, times(2)).validateSuperUserAccess();
}
- Object asynRequests(Consumer<TestAsyncResponse> function) throws Exception
{
- TestAsyncResponse ctx = new TestAsyncResponse();
- function.accept(ctx);
- ctx.latch.await();
- if (ctx.e != null) {
- throw (Exception) ctx.e;
- }
- return ctx.response;
- }
@Test
public void properties() throws Throwable {
- Object response = asynRequests(ctx -> properties.getTenants(ctx));
+ Object response = asyncRequests(ctx -> properties.getTenants(ctx));
assertEquals(response, Lists.newArrayList());
verify(properties, times(1)).validateSuperUserAccess();
// create local cluster
- asynRequests(ctx -> clusters.createCluster(ctx, configClusterName,
ClusterDataImpl.builder().build()));
+ asyncRequests(ctx -> clusters.createCluster(ctx, configClusterName,
ClusterDataImpl.builder().build()));
Set<String> allowedClusters = Sets.newHashSet();
allowedClusters.add(configClusterName);
@@ -436,14 +422,14 @@ public class AdminTest extends
MockedPulsarServiceBaseTest {
.adminRoles(Sets.newHashSet("role1", "role2"))
.allowedClusters(allowedClusters)
.build();
- response = asynRequests(ctx -> properties.createTenant(ctx,
"test-property", tenantInfo));
+ response = asyncRequests(ctx -> properties.createTenant(ctx,
"test-property", tenantInfo));
verify(properties, times(2)).validateSuperUserAccess();
- response = asynRequests(ctx -> properties.getTenants(ctx));
+ response = asyncRequests(ctx -> properties.getTenants(ctx));
assertEquals(response, Lists.newArrayList("test-property"));
verify(properties, times(3)).validateSuperUserAccess();
- response = asynRequests(ctx -> properties.getTenantAdmin(ctx,
"test-property"));
+ response = asyncRequests(ctx -> properties.getTenantAdmin(ctx,
"test-property"));
assertEquals(response, tenantInfo);
verify(properties, times(4)).validateSuperUserAccess();
@@ -451,21 +437,21 @@ public class AdminTest extends
MockedPulsarServiceBaseTest {
.adminRoles(Sets.newHashSet("role1", "other-role"))
.allowedClusters(allowedClusters)
.build();
- response = asynRequests(ctx -> properties.updateTenant(ctx,
"test-property", newPropertyAdmin));
+ response = asyncRequests(ctx -> properties.updateTenant(ctx,
"test-property", newPropertyAdmin));
verify(properties, times(5)).validateSuperUserAccess();
// Wait for updateTenant to take effect
Thread.sleep(100);
- response = asynRequests(ctx -> properties.getTenantAdmin(ctx,
"test-property"));
+ response = asyncRequests(ctx -> properties.getTenantAdmin(ctx,
"test-property"));
assertEquals(response, newPropertyAdmin);
- response = asynRequests(ctx -> properties.getTenantAdmin(ctx,
"test-property"));
+ response = asyncRequests(ctx -> properties.getTenantAdmin(ctx,
"test-property"));
assertNotSame(response, tenantInfo);
verify(properties, times(7)).validateSuperUserAccess();
// Check creating existing property
try {
- response = asynRequests(ctx -> properties.createTenant(ctx,
"test-property", tenantInfo));
+ response = asyncRequests(ctx -> properties.createTenant(ctx,
"test-property", tenantInfo));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.CONFLICT.getStatusCode());
@@ -473,14 +459,14 @@ public class AdminTest extends
MockedPulsarServiceBaseTest {
// Check non-existing property
try {
- response = asynRequests(ctx -> properties.getTenantAdmin(ctx,
"non-existing"));
+ response = asyncRequests(ctx -> properties.getTenantAdmin(ctx,
"non-existing"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.NOT_FOUND.getStatusCode());
}
try {
- response = asynRequests(ctx -> properties.updateTenant(ctx,
"xxx-non-existing", newPropertyAdmin));
+ response = asyncRequests(ctx -> properties.updateTenant(ctx,
"xxx-non-existing", newPropertyAdmin));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.NOT_FOUND.getStatusCode());
@@ -488,7 +474,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
// Check deleting non-existing property
try {
- response = asynRequests(ctx -> properties.deleteTenant(ctx,
"non-existing", false));
+ response = asyncRequests(ctx -> properties.deleteTenant(ctx,
"non-existing", false));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.NOT_FOUND.getStatusCode());
@@ -505,7 +491,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
return op == MockZooKeeper.Op.GET_CHILDREN &&
path.equals("/admin/policies");
});
try {
- response = asynRequests(ctx -> properties.getTenants(ctx));
+ response = asyncRequests(ctx -> properties.getTenants(ctx));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -515,7 +501,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
return op == MockZooKeeper.Op.GET &&
path.equals("/admin/policies/my-tenant");
});
try {
- response = asynRequests(ctx -> properties.getTenantAdmin(ctx,
"my-tenant"));
+ response = asyncRequests(ctx -> properties.getTenantAdmin(ctx,
"my-tenant"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -525,7 +511,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
return op == MockZooKeeper.Op.GET &&
path.equals("/admin/policies/my-tenant");
});
try {
- response = asynRequests(ctx -> properties.updateTenant(ctx,
"my-tenant", newPropertyAdmin));
+ response = asyncRequests(ctx -> properties.updateTenant(ctx,
"my-tenant", newPropertyAdmin));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -535,7 +521,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
return op == MockZooKeeper.Op.CREATE &&
path.equals("/admin/policies/test");
});
try {
- response = asynRequests(ctx -> properties.createTenant(ctx,
"test", tenantInfo));
+ response = asyncRequests(ctx -> properties.createTenant(ctx,
"test", tenantInfo));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -547,28 +533,28 @@ public class AdminTest extends
MockedPulsarServiceBaseTest {
try {
cache.invalidateAll();
store.invalidateAll();
- response = asynRequests(ctx -> properties.deleteTenant(ctx,
"test-property", false));
+ response = asyncRequests(ctx -> properties.deleteTenant(ctx,
"test-property", false));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
- response = asynRequests(ctx -> properties.createTenant(ctx,
"error-property", tenantInfo));
+ response = asyncRequests(ctx -> properties.createTenant(ctx,
"error-property", tenantInfo));
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) ->
{
return op == MockZooKeeper.Op.DELETE &&
path.equals("/admin/policies/error-property");
});
try {
- response = asynRequests(ctx -> properties.deleteTenant(ctx,
"error-property", false));
+ response = asyncRequests(ctx -> properties.deleteTenant(ctx,
"error-property", false));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
- response = asynRequests(ctx -> properties.deleteTenant(ctx,
"test-property", false));
- response = asynRequests(ctx -> properties.deleteTenant(ctx,
"error-property", false));
+ response = asyncRequests(ctx -> properties.deleteTenant(ctx,
"test-property", false));
+ response = asyncRequests(ctx -> properties.deleteTenant(ctx,
"error-property", false));
response = Lists.newArrayList();
- response = asynRequests(ctx -> properties.getTenants(ctx));
+ response = asyncRequests(ctx -> properties.getTenants(ctx));
assertEquals(response, Lists.newArrayList());
// Create a namespace to test deleting a non-empty property
@@ -576,12 +562,12 @@ public class AdminTest extends
MockedPulsarServiceBaseTest {
.adminRoles(Sets.newHashSet("role1", "other-role"))
.allowedClusters(Sets.newHashSet("use"))
.build();
- response = asynRequests(ctx -> properties.createTenant(ctx,
"my-tenant", newPropertyAdmin2));
+ response = asyncRequests(ctx -> properties.createTenant(ctx,
"my-tenant", newPropertyAdmin2));
- namespaces.createNamespace("my-tenant", "use", "my-namespace",
BundlesData.builder().build());
+ response = asyncRequests(ctx ->
namespaces.createNamespace(ctx,"my-tenant", "use", "my-namespace",
BundlesData.builder().build()));
try {
- response = asynRequests(ctx -> properties.deleteTenant(ctx,
"my-tenant", false));
+ response = asyncRequests(ctx -> properties.deleteTenant(ctx,
"my-tenant", false));
fail("should have failed");
} catch (RestException e) {
// Ok
@@ -589,7 +575,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
// Check name validation
try {
- response = asynRequests(ctx -> properties.createTenant(ctx,
"test&", tenantInfo));
+ response = asyncRequests(ctx -> properties.createTenant(ctx,
"test&", tenantInfo));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.PRECONDITION_FAILED.getStatusCode());
@@ -597,7 +583,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
// Check tenantInfo is null
try {
- response = asynRequests(ctx -> properties.createTenant(ctx,
"tenant-config-is-null", null));
+ response = asyncRequests(ctx -> properties.createTenant(ctx,
"tenant-config-is-null", null));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.PRECONDITION_FAILED.getStatusCode());
@@ -611,7 +597,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
.allowedClusters(blankClusters)
.build();
try {
- response = asynRequests(ctx -> properties.createTenant(ctx,
"tenant-config-is-empty", tenantWithEmptyCluster));
+ response = asyncRequests(ctx -> properties.createTenant(ctx,
"tenant-config-is-empty", tenantWithEmptyCluster));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.PRECONDITION_FAILED.getStatusCode());
@@ -625,7 +611,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
.allowedClusters(containBlankClusters)
.build();
try {
- response = asynRequests(ctx -> properties.createTenant(ctx,
"tenant-config-contain-empty", tenantContainEmptyCluster));
+ response = asyncRequests(ctx -> properties.createTenant(ctx,
"tenant-config-contain-empty", tenantContainEmptyCluster));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.PRECONDITION_FAILED.getStatusCode());
@@ -636,13 +622,13 @@ public class AdminTest extends
MockedPulsarServiceBaseTest {
ArgumentCaptor<Response> captor =
ArgumentCaptor.forClass(Response.class);
verify(response2, timeout(5000).times(1)).resume(captor.capture());
assertEquals(captor.getValue().getStatus(),
Status.NO_CONTENT.getStatusCode());
- response = asynRequests(ctx -> properties.deleteTenant(ctx,
"my-tenant", false));
+ response = asyncRequests(ctx -> properties.deleteTenant(ctx,
"my-tenant", false));
}
@Test
@SuppressWarnings("unchecked")
public void brokers() throws Exception {
- asynRequests(ctx -> clusters.createCluster(ctx, "use",
ClusterDataImpl.builder()
+ asyncRequests(ctx -> clusters.createCluster(ctx, "use",
ClusterDataImpl.builder()
.serviceUrl("http://broker.messaging.use.example.com")
.serviceUrlTls("https://broker.messaging.use.example.com:4443")
.build()));
@@ -654,12 +640,12 @@ public class AdminTest extends
MockedPulsarServiceBaseTest {
Field uriField = PulsarWebResource.class.getDeclaredField("uri");
uriField.setAccessible(true);
uriField.set(brokers, mockUri);
- Object res = asynRequests(ctx -> brokers.getActiveBrokers(ctx, "use"));
+ Object res = asyncRequests(ctx -> brokers.getActiveBrokers(ctx,
"use"));
assertTrue(res instanceof Set);
Set<String> activeBrokers = (Set<String>) res;
assertEquals(activeBrokers.size(), 1);
assertEquals(activeBrokers,
Sets.newHashSet(pulsar.getAdvertisedAddress() + ":" +
pulsar.getListenPortHTTP().get()));
- Object leaderBrokerRes = asynRequests(ctx ->
brokers.getLeaderBroker(ctx));
+ Object leaderBrokerRes = asyncRequests(ctx ->
brokers.getLeaderBroker(ctx));
assertTrue(leaderBrokerRes instanceof BrokerInfo);
BrokerInfo leaderBroker = (BrokerInfo)leaderBrokerRes;
assertEquals(leaderBroker.getServiceUrl(),
pulsar.getLeaderElectionService().getCurrentLeader().map(LeaderBroker::getServiceUrl).get());
@@ -707,8 +693,8 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
.allowedClusters(Collections.singleton(cluster))
.build();
ClusterDataImpl clusterData =
ClusterDataImpl.builder().serviceUrl(cluster).build();
- asynRequests(ctx -> clusters.createCluster(ctx, cluster, clusterData
));
- asynRequests(ctx -> properties.createTenant(ctx, property, admin));
+ asyncRequests(ctx -> clusters.createCluster(ctx, cluster, clusterData
));
+ asyncRequests(ctx -> properties.createTenant(ctx, property, admin));
// customized bandwidth for this namespace
double customizeBandwidth = 3000;
@@ -876,87 +862,4 @@ public class AdminTest extends MockedPulsarServiceBaseTest
{
Assert.assertEquals(responseCaptor.getValue().getResponse().getStatus(),
Status.INTERNAL_SERVER_ERROR.getStatusCode());
Assert.assertTrue(((ErrorData)responseCaptor.getValue().getResponse().getEntity()).reason.contains("500
error contains error message"));
}
-
-
- static class TestAsyncResponse implements AsyncResponse {
-
- Object response;
- Throwable e;
- CountDownLatch latch = new CountDownLatch(1);
-
- @Override
- public boolean resume(Object response) {
- this.response = response;
- latch.countDown();
- return true;
- }
-
- @Override
- public boolean resume(Throwable response) {
- this.e = response;
- latch.countDown();
- return true;
- }
-
- @Override
- public boolean cancel() {
- return false;
- }
-
- @Override
- public boolean cancel(int retryAfter) {
- return false;
- }
-
- @Override
- public boolean cancel(Date retryAfter) {
- return false;
- }
-
- @Override
- public boolean isSuspended() {
- return false;
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return false;
- }
-
- @Override
- public boolean setTimeout(long time, TimeUnit unit) {
- return false;
- }
-
- @Override
- public void setTimeoutHandler(TimeoutHandler handler) {
-
- }
-
- @Override
- public Collection<Class<?>> register(Class<?> callback) {
- return null;
- }
-
- @Override
- public Map<Class<?>, Collection<Class<?>>> register(Class<?> callback,
Class<?>... callbacks) {
- return null;
- }
-
- @Override
- public Collection<Class<?>> register(Object callback) {
- return null;
- }
-
- @Override
- public Map<Class<?>, Collection<Class<?>>> register(Object callback,
Object... callbacks) {
- return null;
- }
-
- }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 0eb87491a52..a02f7d6c934 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -203,8 +203,8 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
@Test
public void testCreateNamespaces() throws Exception {
try {
- namespaces.createNamespace(this.testTenant, "other-colo",
"my-namespace",
- BundlesData.builder().build());
+ asyncRequests(response -> namespaces.createNamespace(response,
this.testTenant, "other-colo", "my-namespace",
+ BundlesData.builder().build()));
fail("should have failed");
} catch (RestException e) {
// Ok, cluster doesn't exist
@@ -217,24 +217,24 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
createTestNamespaces(nsnames, BundlesData.builder().build());
try {
- namespaces.createNamespace(this.testTenant, "use",
"create-namespace-1",
- BundlesData.builder().build());
+ asyncRequests(response -> namespaces.createNamespace(response,
this.testTenant, "use", "create-namespace-1",
+ BundlesData.builder().build()));
fail("should have failed");
} catch (RestException e) {
// Ok, namespace already exists
}
try {
- namespaces.createNamespace("non-existing-tenant", "use",
"create-namespace-1",
- BundlesData.builder().build());
+ asyncRequests(response ->
namespaces.createNamespace(response,"non-existing-tenant", "use",
"create-namespace-1",
+ BundlesData.builder().build()));
fail("should have failed");
} catch (RestException e) {
// Ok, tenant doesn't exist
}
try {
- namespaces.createNamespace(this.testTenant, "use",
"create-namespace-#",
- BundlesData.builder().build());
+ asyncRequests(response -> namespaces.createNamespace(response,
this.testTenant, "use", "create-namespace-#",
+ BundlesData.builder().build()));
fail("should have failed");
} catch (RestException e) {
// Ok, invalid namespace name
@@ -246,7 +246,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
&&
path.equals("/admin/policies/my-tenant/use/my-namespace-3");
});
try {
- namespaces.createNamespace(this.testTenant, "use",
"my-namespace-3", BundlesData.builder().build());
+ asyncRequests(response -> namespaces.createNamespace(response,
this.testTenant, "use", "my-namespace-3", BundlesData.builder().build()));
fail("should have failed");
} catch (RestException e) {
// Ok
@@ -263,18 +263,24 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
this.testLocalNamespaces.get(1).toString(),
this.testLocalNamespaces.get(2).toString(),
this.testGlobalNamespaces.get(0).toString());
expectedList.sort(null);
- assertEquals(namespaces.getTenantNamespaces(this.testTenant),
expectedList);
+ AsyncResponse response = mock(AsyncResponse.class);
+ namespaces.getTenantNamespaces(response, this.testTenant);
+ ArgumentCaptor<Response> captor =
ArgumentCaptor.forClass(Response.class);
+ verify(response, timeout(5000).times(1)).resume(captor.capture());
+ List<String> namespacesList = (List<String>) captor.getValue();
+ namespacesList.sort(null);
+ assertEquals(namespacesList, expectedList);
try {
// check the tenant name is valid
- namespaces.getTenantNamespaces(this.testTenant + "/default");
+ asyncRequests(ctx -> namespaces.getTenantNamespaces(ctx,
this.testTenant + "/default"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.PRECONDITION_FAILED.getStatusCode());
}
try {
- namespaces.getTenantNamespaces("non-existing-tenant");
+ asyncRequests(ctx -> namespaces.getTenantNamespaces(ctx,
"non-existing-tenant"));
fail("should have failed");
} catch (RestException e) {
// Ok, does not exist
@@ -299,7 +305,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
tenantCache.invalidateAll();
store.invalidateAll();
try {
- namespaces.getTenantNamespaces(this.testTenant);
+ asyncRequests(ctx -> namespaces.getTenantNamespaces(ctx,
this.testTenant));
fail("should have failed");
} catch (RestException e) {
// Ok
@@ -321,46 +327,46 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
@Test(enabled = false)
public void testGrantAndRevokePermissions() throws Exception {
Policies expectedPolicies = new Policies();
- assertEquals(namespaces.getPolicies(this.testTenant,
this.testLocalCluster,
- this.testLocalNamespaces.get(0).getLocalName()),
expectedPolicies);
- assertEquals(namespaces.getPermissions(this.testTenant,
this.testLocalCluster,
- this.testLocalNamespaces.get(0).getLocalName()),
expectedPolicies.auth_policies.getNamespaceAuthentication());
+ assertEquals(asyncRequests(ctx -> namespaces.getPolicies(ctx,
this.testTenant, this.testLocalCluster,
+ this.testLocalNamespaces.get(0).getLocalName())),
expectedPolicies);
+ assertEquals(asyncRequests(ctx -> namespaces.getPermissions(ctx,
this.testTenant, this.testLocalCluster,
+ this.testLocalNamespaces.get(0).getLocalName())),
expectedPolicies.auth_policies.getNamespaceAuthentication());
namespaces.grantPermissionOnNamespace(this.testTenant,
this.testLocalCluster,
this.testLocalNamespaces.get(0).getLocalName(), "my-role",
EnumSet.of(AuthAction.produce));
expectedPolicies.auth_policies.getNamespaceAuthentication().put("my-role",
EnumSet.of(AuthAction.produce));
- assertEquals(namespaces.getPolicies(this.testTenant,
this.testLocalCluster,
- this.testLocalNamespaces.get(0).getLocalName()),
expectedPolicies);
- assertEquals(namespaces.getPermissions(this.testTenant,
this.testLocalCluster,
- this.testLocalNamespaces.get(0).getLocalName()),
expectedPolicies.auth_policies.getNamespaceAuthentication());
+ assertEquals(asyncRequests(ctx -> namespaces.getPolicies(ctx,
this.testTenant, this.testLocalCluster,
+ this.testLocalNamespaces.get(0).getLocalName())),
expectedPolicies);
+ assertEquals(asyncRequests(ctx -> namespaces.getPermissions(ctx,
this.testTenant, this.testLocalCluster,
+ this.testLocalNamespaces.get(0).getLocalName())),
expectedPolicies.auth_policies.getNamespaceAuthentication());
namespaces.grantPermissionOnNamespace(this.testTenant,
this.testLocalCluster,
this.testLocalNamespaces.get(0).getLocalName(), "other-role",
EnumSet.of(AuthAction.consume));
expectedPolicies.auth_policies.getNamespaceAuthentication().put("other-role",
EnumSet.of(AuthAction.consume));
- assertEquals(namespaces.getPolicies(this.testTenant,
this.testLocalCluster,
- this.testLocalNamespaces.get(0).getLocalName()),
expectedPolicies);
- assertEquals(namespaces.getPermissions(this.testTenant,
this.testLocalCluster,
- this.testLocalNamespaces.get(0).getLocalName()),
expectedPolicies.auth_policies.getNamespaceAuthentication());
+ assertEquals(asyncRequests(ctx -> namespaces.getPolicies(ctx,
this.testTenant, this.testLocalCluster,
+ this.testLocalNamespaces.get(0).getLocalName())),
expectedPolicies);
+ assertEquals(asyncRequests(ctx -> namespaces.getPermissions(ctx,
this.testTenant, this.testLocalCluster,
+ this.testLocalNamespaces.get(0).getLocalName())),
expectedPolicies.auth_policies.getNamespaceAuthentication());
namespaces.revokePermissionsOnNamespace(this.testTenant,
this.testLocalCluster,
this.testLocalNamespaces.get(0).getLocalName(), "my-role");
expectedPolicies.auth_policies.getNamespaceAuthentication().remove("my-role");
- assertEquals(namespaces.getPolicies(this.testTenant,
this.testLocalCluster,
- this.testLocalNamespaces.get(0).getLocalName()),
expectedPolicies);
- assertEquals(namespaces.getPermissions(this.testTenant,
this.testLocalCluster,
- this.testLocalNamespaces.get(0).getLocalName()),
expectedPolicies.auth_policies.getNamespaceAuthentication());
+ assertEquals(asyncRequests(ctx -> namespaces.getPolicies(ctx,
this.testTenant, this.testLocalCluster,
+ this.testLocalNamespaces.get(0).getLocalName())),
expectedPolicies);
+ assertEquals(asyncRequests(ctx -> namespaces.getPermissions(ctx,
this.testTenant, this.testLocalCluster,
+ this.testLocalNamespaces.get(0).getLocalName())),
expectedPolicies.auth_policies.getNamespaceAuthentication());
// Non-existing namespaces
try {
- namespaces.getPolicies(this.testTenant, this.testLocalCluster,
"non-existing-namespace-1");
+ asyncRequests(ctx -> namespaces.getPolicies(ctx, this.testTenant,
this.testLocalCluster, "non-existing-namespace-1"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.NOT_FOUND.getStatusCode());
}
try {
- namespaces.getPermissions(this.testTenant, this.testLocalCluster,
"non-existing-namespace-1");
+ asyncRequests(ctx -> namespaces.getPermissions(ctx,
this.testTenant, this.testLocalCluster, "non-existing-namespace-1"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(),
Status.NOT_FOUND.getStatusCode());
@@ -393,7 +399,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
});
try {
- namespaces.getPolicies(testNs.getTenant(), testNs.getCluster(),
testNs.getLocalName());
+ asyncRequests(ctx -> namespaces.getPolicies(ctx,
testNs.getTenant(), testNs.getCluster(), testNs.getLocalName()));
fail("should have failed");
} catch (RestException e) {
// Ok
@@ -407,7 +413,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
return true;
});
try {
- namespaces.getPermissions(testNs.getTenant(), testNs.getCluster(),
testNs.getLocalName());
+ asyncRequests(ctx -> namespaces.getPermissions(ctx,
testNs.getTenant(), testNs.getCluster(), testNs.getLocalName()));
fail("should have failed");
} catch (RestException e) {
// Ok
@@ -758,7 +764,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
List<String> nsList =
Lists.newArrayList(this.testLocalNamespaces.get(1).toString(),
this.testLocalNamespaces.get(2).toString());
nsList.sort(null);
- assertEquals(namespaces.getTenantNamespaces(this.testTenant), nsList);
+ assertEquals(asyncRequests(ctx -> namespaces.getTenantNamespaces(ctx,
this.testTenant)), nsList);
testNs = this.testLocalNamespaces.get(1);
// setup ownership to localhost
@@ -978,16 +984,16 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
private void createBundledTestNamespaces(String property, String cluster,
String namespace, BundlesData bundle)
throws Exception {
- namespaces.createNamespace(property, cluster, namespace, bundle);
+ asyncRequests(ctx -> namespaces.createNamespace(ctx, property,
cluster, namespace, bundle));
}
private void createGlobalTestNamespaces(String property, String namespace,
BundlesData bundle) throws Exception {
- namespaces.createNamespace(property, "global", namespace, bundle);
+ asyncRequests(ctx -> namespaces.createNamespace(ctx, property,
"global", namespace, bundle));
}
private void createTestNamespaces(List<NamespaceName> nsnames, BundlesData
bundle) throws Exception {
for (NamespaceName nsName : nsnames) {
- namespaces.createNamespace(nsName.getTenant(),
nsName.getCluster(), nsName.getLocalName(), bundle);
+ asyncRequests(ctx -> namespaces.createNamespace(ctx,
nsName.getTenant(), nsName.getCluster(), nsName.getLocalName(), bundle));
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 55631aea465..88fe6ece0bb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -33,11 +33,15 @@ import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.bookkeeper.client.BookKeeper;
@@ -70,6 +74,9 @@ import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.TimeoutHandler;
+
/**
* Base class for all tests that need a Pulsar instance without a ZK and BK
cluster.
*/
@@ -499,5 +506,97 @@ public abstract class MockedPulsarServiceBaseTest extends
TestRetrySupport {
}
}
+ protected Object asyncRequests(Consumer<TestAsyncResponse> function)
throws Exception {
+ TestAsyncResponse ctx = new TestAsyncResponse();
+ function.accept(ctx);
+ ctx.latch.await();
+ if (ctx.e != null) {
+ throw (Exception) ctx.e;
+ }
+ return ctx.response;
+ }
+
+ public static class TestAsyncResponse implements AsyncResponse {
+
+ Object response;
+ Throwable e;
+ CountDownLatch latch = new CountDownLatch(1);
+
+ @Override
+ public boolean resume(Object response) {
+ this.response = response;
+ latch.countDown();
+ return true;
+ }
+
+ @Override
+ public boolean resume(Throwable response) {
+ this.e = response;
+ latch.countDown();
+ return true;
+ }
+
+ @Override
+ public boolean cancel() {
+ return false;
+ }
+
+ @Override
+ public boolean cancel(int retryAfter) {
+ return false;
+ }
+
+ @Override
+ public boolean cancel(Date retryAfter) {
+ return false;
+ }
+
+ @Override
+ public boolean isSuspended() {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return false;
+ }
+
+ @Override
+ public boolean setTimeout(long time, TimeUnit unit) {
+ return false;
+ }
+
+ @Override
+ public void setTimeoutHandler(TimeoutHandler handler) {
+
+ }
+
+ @Override
+ public Collection<Class<?>> register(Class<?> callback) {
+ return null;
+ }
+
+ @Override
+ public Map<Class<?>, Collection<Class<?>>> register(Class<?> callback,
Class<?>... callbacks) {
+ return null;
+ }
+
+ @Override
+ public Collection<Class<?>> register(Object callback) {
+ return null;
+ }
+
+ @Override
+ public Map<Class<?>, Collection<Class<?>>> register(Object callback,
Object... callbacks) {
+ return null;
+ }
+
+ }
+
private static final Logger log =
LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
}