This is an automated email from the ASF dual-hosted git repository.
zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f47c705e3cc [improve][broker] Make splitNamespaceBundle and
getTopicHashPositions async (#16411)
f47c705e3cc is described below
commit f47c705e3cc1cb0c34e48ad1ca71060f1202c70a
Author: gaozhangmin <[email protected]>
AuthorDate: Mon Jul 25 17:15:25 2022 +0800
[improve][broker] Make splitNamespaceBundle and getTopicHashPositions async
(#16411)
* make splitNamespaceBundle and getTopicHashPositions async
* appli comments
* Update
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
Co-authored-by: Zixuan Liu <[email protected]>
* Update
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
Co-authored-by: Zixuan Liu <[email protected]>
* Update
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
Co-authored-by: Zixuan Liu <[email protected]>
* Update
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
Co-authored-by: Zixuan Liu <[email protected]>
* Update
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
Co-authored-by: Zixuan Liu <[email protected]>
* Update
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
Co-authored-by: Zixuan Liu <[email protected]>
* fix error
* fix error
* fix check style error
Co-authored-by: gavingaozhangmin <[email protected]>
Co-authored-by: Zixuan Liu <[email protected]>
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 189 ++++++++++-----------
.../apache/pulsar/broker/admin/v1/Namespaces.java | 107 +++++++++---
.../apache/pulsar/broker/admin/v2/Namespaces.java | 65 ++++---
.../apache/pulsar/broker/admin/NamespacesTest.java | 9 +-
4 files changed, 214 insertions(+), 156 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index f5da9ff6aa3..5b069edf75b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1115,116 +1115,99 @@ public abstract class NamespacesBase extends
AdminResource {
}
@SuppressWarnings("deprecation")
- protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse,
String bundleName, boolean authoritative,
- boolean unload, String
splitAlgorithmName, List<Long> splitBoundaries) {
- validateSuperUserAccess();
- checkNotNull(bundleName, "BundleRange should not be null");
- log.info("[{}] Split namespace bundle {}/{}", clientAppId(),
namespaceName, bundleName);
-
- String bundleRange = getBundleRange(bundleName);
-
- Policies policies = getNamespacePolicies(namespaceName);
-
- if (namespaceName.isGlobal()) {
- // check cluster ownership for a given global namespace: redirect
if peer-cluster owns it
- validateGlobalNamespaceOwnership(namespaceName);
- } else {
- validateClusterOwnership(namespaceName.getCluster());
- validateClusterForTenant(namespaceName.getTenant(),
namespaceName.getCluster());
- }
-
- validatePoliciesReadOnlyAccess();
-
- List<String> supportedNamespaceBundleSplitAlgorithms =
-
pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
- if (StringUtils.isNotBlank(splitAlgorithmName)) {
- if
(!supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
- asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
- "Unsupported namespace bundle split algorithm,
supported algorithms are "
- + supportedNamespaceBundleSplitAlgorithms));
- }
- if
(splitAlgorithmName.equalsIgnoreCase(NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE)
- && (splitBoundaries == null || splitBoundaries.size() ==
0)) {
- asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
- "With specified_positions_divide split algorithm,
splitBoundaries must not be emtpy"));
- }
- }
-
- NamespaceBundle nsBundle;
- try {
- nsBundle = validateNamespaceBundleOwnership(namespaceName,
policies.bundles, bundleRange,
- authoritative, false);
- } catch (Exception e) {
- asyncResponse.resume(e);
- return;
- }
+ protected CompletableFuture<Void> internalSplitNamespaceBundleAsync(String
bundleName,
+
boolean authoritative, boolean unload,
+ String
splitAlgorithmName,
+
List<Long> splitBoundaries) {
+ return validateSuperUserAccessAsync()
+ .thenAccept(__ -> {
+ checkNotNull(bundleName, "BundleRange should not be null");
+ log.info("[{}] Split namespace bundle {}/{}",
clientAppId(), namespaceName, bundleName);
+ List<String> supportedNamespaceBundleSplitAlgorithms =
+
pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
+ if (StringUtils.isNotBlank(splitAlgorithmName)) {
+ if
(!supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Unsupported namespace bundle split
algorithm, supported algorithms are "
+ +
supportedNamespaceBundleSplitAlgorithms);
+ }
+ if (splitAlgorithmName
+
.equalsIgnoreCase(NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE)
+ && (splitBoundaries == null ||
splitBoundaries.size() == 0)) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "With specified_positions_divide split
algorithm, splitBoundaries must not be "
+ + "emtpy");
+ }
+ }
+ })
+ .thenCompose(__ -> {
+ if (namespaceName.isGlobal()) {
+ // check cluster ownership for a given global
namespace: redirect if peer-cluster owns it
+ return
validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ return
validateClusterOwnershipAsync(namespaceName.getCluster())
+ .thenCompose(ignore ->
validateClusterForTenantAsync(namespaceName.getTenant(),
+ namespaceName.getCluster()));
+ }
+ })
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenCompose(policies->{
+ String bundleRange = getBundleRange(bundleName);
+ return
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles,
bundleRange,
+ authoritative, false)
+ .thenCompose(nsBundle ->
pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
+
getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries));
- pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
- getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName),
splitBoundaries)
- .thenRun(() -> {
- log.info("[{}] Successfully split namespace bundle {}",
clientAppId(), nsBundle.toString());
- asyncResponse.resume(Response.noContent().build());
- }).exceptionally(ex -> {
- if (ex.getCause() instanceof IllegalArgumentException) {
- log.error("[{}] Failed to split namespace bundle {}/{} due to
{}", clientAppId(), namespaceName,
- bundleRange, ex.getMessage());
- asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
- "Split bundle failed due to invalid request"));
- } else {
- log.error("[{}] Failed to split namespace bundle {}/{}",
clientAppId(), namespaceName, bundleRange, ex);
- asyncResponse.resume(new RestException(ex.getCause()));
- }
- return null;
- });
+ });
}
- protected void internalGetTopicHashPositions(AsyncResponse asyncResponse,
String bundleRange, List<String> topics) {
+ protected CompletableFuture<TopicHashPositions>
internalGetTopicHashPositionsAsync(String bundleRange,
+
List<String> topics) {
if (log.isDebugEnabled()) {
log.debug("[{}] Getting hash position for topic list {}, bundle
{}", clientAppId(), topics, bundleRange);
}
- validateNamespacePolicyOperation(namespaceName,
PolicyName.PERSISTENCE, PolicyOperation.READ);
- Policies policies = getNamespacePolicies(namespaceName);
- NamespaceBundle bundle =
validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
- false, true);
-
pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle).whenComplete(
- (allTopicsInThisBundle, throwable) -> {
- if (throwable != null) {
- log.error("[{}] {} Failed to get topic list for bundle
{}.", clientAppId(),
- namespaceName, bundle);
- asyncResponse.resume(new RestException(throwable));
- }
- // if topics is empty, return all topics' hash position in
this bundle
- Map<String, Long> topicHashPositions = new HashMap<>();
- if (topics == null || topics.size() == 0) {
- allTopicsInThisBundle.forEach(t -> {
- topicHashPositions.put(t,
-
pulsar().getNamespaceService().getNamespaceBundleFactory()
- .getLongHashCode(t));
- });
- } else {
- for (String topic :
topics.stream().map(Codec::decode).collect(Collectors.toList())) {
- TopicName topicName = TopicName.get(topic);
- // partitioned topic
- if (topicName.getPartitionIndex() == -1) {
- allTopicsInThisBundle.stream()
- .filter(t ->
TopicName.get(t).getPartitionedTopicName()
-
.equals(TopicName.get(topic).getPartitionedTopicName()))
- .forEach(partition -> {
- topicHashPositions.put(partition,
-
pulsar().getNamespaceService().getNamespaceBundleFactory()
-
.getLongHashCode(partition));
- });
- } else { // topic partition
- if
(allTopicsInThisBundle.contains(topicName.toString())) {
- topicHashPositions.put(topic,
-
pulsar().getNamespaceService().getNamespaceBundleFactory()
- .getLongHashCode(topic));
+ return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.PERSISTENCE, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenCompose(policies -> {
+ return
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles,
bundleRange,
+ false, true)
+ .thenCompose(nsBundle ->
+
pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(nsBundle))
+ .thenApply(allTopicsInThisBundle -> {
+ Map<String, Long> topicHashPositions = new
HashMap<>();
+ if (topics == null || topics.size() == 0) {
+ allTopicsInThisBundle.forEach(t -> {
+ topicHashPositions.put(t,
+
pulsar().getNamespaceService().getNamespaceBundleFactory()
+ .getLongHashCode(t));
+ });
+ } else {
+ for (String topic :
topics.stream().map(Codec::decode).toList()) {
+ TopicName topicName =
TopicName.get(topic);
+ // partitioned topic
+ if (topicName.getPartitionIndex() ==
-1) {
+ allTopicsInThisBundle.stream()
+ .filter(t ->
TopicName.get(t).getPartitionedTopicName()
+
.equals(TopicName.get(topic).getPartitionedTopicName()))
+ .forEach(partition -> {
+
topicHashPositions.put(partition,
+
pulsar().getNamespaceService()
+
.getNamespaceBundleFactory()
+
.getLongHashCode(partition));
+ });
+ } else { // topic partition
+ if
(allTopicsInThisBundle.contains(topicName.toString())) {
+ topicHashPositions.put(topic,
+
pulsar().getNamespaceService().getNamespaceBundleFactory()
+
.getLongHashCode(topic));
+ }
+ }
+ }
}
- }
- }
- }
- asyncResponse.resume(
- new TopicHashPositions(namespaceName.toString(),
bundleRange, topicHashPositions));
+ return new
TopicHashPositions(namespaceName.toString(), bundleRange,
+ topicHashPositions);
+ });
});
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 4264e5669d0..d3f1fb9a9e4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -809,15 +809,22 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist"),
@ApiResponse(code = 412, message = "Namespace is not setup to
split in bundles") })
- public BundlesData getBundlesData(@PathParam("property") String property,
@PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace) {
- validatePoliciesReadOnlyAccess();
- validateNamespaceName(property, cluster, namespace);
- validateNamespaceOperation(NamespaceName.get(property, namespace),
NamespaceOperation.GET_BUNDLE);
-
- Policies policies = getNamespacePolicies(namespaceName);
-
- return policies.bundles;
+ public void getBundlesData(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(property, cluster, namespace);
+ validatePoliciesReadOnlyAccessAsync()
+ .thenCompose(__ ->
validateNamespaceOperationAsync(NamespaceName.get(property, namespace),
+ NamespaceOperation.GET_BUNDLE))
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies -> asyncResponse.resume(policies.bundles))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get bundle data for namespace {}
", clientAppId(),
+ namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@PUT
@@ -899,15 +906,27 @@ public class Namespaces extends NamespacesBase {
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@QueryParam("unload") @DefaultValue("false") boolean unload,
@QueryParam("splitBoundaries") @DefaultValue("") List<Long>
splitBoundaries) {
- try {
- validateNamespaceName(property, cluster, namespace);
- internalSplitNamespaceBundle(asyncResponse, bundleRange,
- authoritative, unload,
NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME, splitBoundaries);
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(property, cluster, namespace);
+ internalSplitNamespaceBundleAsync(bundleRange,
+ authoritative, unload,
NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME, splitBoundaries)
+ .thenAccept(__ -> {
+ log.info("[{}] Successfully split namespace bundle {}",
clientAppId(), bundleRange);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to split namespace bundle
{}/{}",
+ clientAppId(), namespaceName, bundleRange, ex);
+ }
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof IllegalArgumentException) {
+ asyncResponse.resume(new
RestException(Response.Status.PRECONDITION_FAILED,
+ "Split bundle failed due to invalid request"));
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
+ return null;
+ });
}
@GET
@@ -923,17 +942,32 @@ public class Namespaces extends NamespacesBase {
@QueryParam("topics") List<String> topics,
@Suspended AsyncResponse asyncResponse) {
validateNamespaceName(property, cluster, namespace);
- internalGetTopicHashPositions(asyncResponse, bundle, topics);
+ internalGetTopicHashPositionsAsync(bundle, topics)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] {} Failed to get topic list for bundle
{}.", clientAppId(),
+ namespaceName, bundle);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@Path("/{property}/{cluster}/{namespace}/publishRate")
@ApiOperation(hidden = true, value = "Set publish-rate throttling for all
topics of the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission") })
- public void setPublishRate(@PathParam("property") String property,
@PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace, PublishRate publishRate)
{
+ public void setPublishRate(@Suspended AsyncResponse asyncResponse,
+ @PathParam("property") String property,
@PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace,
PublishRate publishRate) {
validateNamespaceName(property, cluster, namespace);
- internalSetPublishRate(publishRate);
+ internalSetPublishRateAsync(publishRate)
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
@@ -943,20 +977,37 @@ public class Namespaces extends NamespacesBase {
+ "-1 means msg-publish-rate or byte-publish-rate not
configured in publish-rate yet")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist")})
- public PublishRate getPublishRate(@PathParam("property") String property,
@PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace) {
+ public void getPublishRate(@Suspended AsyncResponse asyncResponse,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
- return internalGetPublishRate();
+ internalGetPublishRateAsync()
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ log.error("Failed to get publish rate for namespace {}",
namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@Path("/{property}/{cluster}/{namespace}/dispatchRate")
@ApiOperation(hidden = true, value = "Set dispatch-rate throttling for all
topics of the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission") })
- public void setDispatchRate(@PathParam("property") String property,
@PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace, DispatchRateImpl
dispatchRate) {
+ public void setDispatchRate(@Suspended AsyncResponse asyncResponse,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace,
DispatchRateImpl dispatchRate) {
validateNamespaceName(property, cluster, namespace);
- internalSetTopicDispatchRate(dispatchRate);
+ internalSetTopicDispatchRateAsync(dispatchRate)
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to update the dispatchRate for
cluster on namespace {}", clientAppId(),
+ namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index aa3698e1c7d..6d8fe5bc1d1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -58,7 +58,6 @@ import
org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
-import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
@@ -735,15 +734,21 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace
doesn't exist"),
@ApiResponse(code = 412, message = "Namespace is not setup to
split in bundles") })
- public BundlesData getBundlesData(@PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace) {
- validatePoliciesReadOnlyAccess();
+ public void getBundlesData(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String
namespace) {
validateNamespaceName(tenant, namespace);
- validateNamespaceOperation(NamespaceName.get(tenant, namespace),
NamespaceOperation.GET_BUNDLE);
-
- Policies policies = getNamespacePolicies(namespaceName);
-
- return policies.bundles;
+ validatePoliciesReadOnlyAccessAsync()
+ .thenCompose(__ ->
validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace),
+ NamespaceOperation.GET_BUNDLE))
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies -> asyncResponse.resume(policies.bundles))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get bundle data for namespace
{}", clientAppId(),
+ namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@PUT
@@ -826,15 +831,26 @@ public class Namespaces extends NamespacesBase {
@QueryParam("unload") @DefaultValue("false") boolean unload,
@QueryParam("splitAlgorithmName") String splitAlgorithmName,
@ApiParam("splitBoundaries") List<Long> splitBoundaries) {
- try {
- validateNamespaceName(tenant, namespace);
- internalSplitNamespaceBundle(asyncResponse,
- bundleRange, authoritative, unload, splitAlgorithmName,
splitBoundaries);
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(tenant, namespace);
+ internalSplitNamespaceBundleAsync(bundleRange, authoritative, unload,
splitAlgorithmName, splitBoundaries)
+ .thenAccept(__ -> {
+ log.info("[{}] Successfully split namespace bundle {}",
clientAppId(), bundleRange);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to split namespace bundle {}/{}
due to {}",
+ clientAppId(), namespaceName, bundleRange,
ex.getMessage());
+ }
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof IllegalArgumentException) {
+ asyncResponse.resume(new
RestException(Response.Status.PRECONDITION_FAILED,
+ "Split bundle failed due to invalid request"));
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
+ return null;
+ });
}
@GET
@@ -849,8 +865,17 @@ public class Namespaces extends NamespacesBase {
@PathParam("bundle") String bundleRange,
@QueryParam("topics") List<String> topics,
@Suspended AsyncResponse asyncResponse) {
- validateNamespaceName(tenant, namespace);
- internalGetTopicHashPositions(asyncResponse, bundleRange, topics);
+ validateNamespaceName(tenant, namespace);
+ internalGetTopicHashPositionsAsync(bundleRange, topics)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] {} Failed to get topic list for bundle
{}.", clientAppId(),
+ namespaceName, bundleRange);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 11a16d4d17f..2369a0af4bb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -644,10 +644,8 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
.numBundles(boundaries.size() - 1)
.build();
createBundledTestNamespaces(this.testTenant, this.testLocalCluster,
"test-bundled-namespace-1", bundle);
- BundlesData responseData = namespaces.getBundlesData(testTenant,
this.testLocalCluster,
- "test-bundled-namespace-1");
-
- assertEquals(responseData, bundle);
+ assertEquals(asyncRequests(ctx -> namespaces.getBundlesData(ctx,
testTenant, this.testLocalCluster,
+ "test-bundled-namespace-1")), bundle);
}
@Test
@@ -917,7 +915,8 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
ArgumentCaptor<Response> captor =
ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(captor.capture());
// verify split bundles
- BundlesData bundlesData = namespaces.getBundlesData(testTenant,
testLocalCluster, bundledNsLocal);
+ BundlesData bundlesData = (BundlesData) asyncRequests(ctx ->
namespaces.getBundlesData(ctx, testTenant,
+ testLocalCluster, bundledNsLocal));
assertNotNull(bundlesData);
assertEquals(bundlesData.getBoundaries().size(), 3);
assertEquals(bundlesData.getBoundaries().get(0), "0x00000000");