This is an automated email from the ASF dual-hosted git repository.

zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f47c705e3cc [improve][broker] Make splitNamespaceBundle and 
getTopicHashPositions async  (#16411)
f47c705e3cc is described below

commit f47c705e3cc1cb0c34e48ad1ca71060f1202c70a
Author: gaozhangmin <[email protected]>
AuthorDate: Mon Jul 25 17:15:25 2022 +0800

    [improve][broker] Make splitNamespaceBundle and getTopicHashPositions async 
 (#16411)
    
    * make splitNamespaceBundle and getTopicHashPositions async
    
    * appli comments
    
    * Update 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
    
    Co-authored-by: Zixuan Liu <[email protected]>
    
    * Update 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
    
    Co-authored-by: Zixuan Liu <[email protected]>
    
    * Update 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
    
    Co-authored-by: Zixuan Liu <[email protected]>
    
    * Update 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
    
    Co-authored-by: Zixuan Liu <[email protected]>
    
    * Update 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
    
    Co-authored-by: Zixuan Liu <[email protected]>
    
    * Update 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
    
    Co-authored-by: Zixuan Liu <[email protected]>
    
    * fix error
    
    * fix error
    
    * fix check style error
    
    Co-authored-by: gavingaozhangmin <[email protected]>
    Co-authored-by: Zixuan Liu <[email protected]>
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 189 ++++++++++-----------
 .../apache/pulsar/broker/admin/v1/Namespaces.java  | 107 +++++++++---
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  65 ++++---
 .../apache/pulsar/broker/admin/NamespacesTest.java |   9 +-
 4 files changed, 214 insertions(+), 156 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index f5da9ff6aa3..5b069edf75b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1115,116 +1115,99 @@ public abstract class NamespacesBase extends 
AdminResource {
     }
 
     @SuppressWarnings("deprecation")
-    protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, 
String bundleName, boolean authoritative,
-                                                boolean unload, String 
splitAlgorithmName, List<Long> splitBoundaries) {
-        validateSuperUserAccess();
-        checkNotNull(bundleName, "BundleRange should not be null");
-        log.info("[{}] Split namespace bundle {}/{}", clientAppId(), 
namespaceName, bundleName);
-
-        String bundleRange = getBundleRange(bundleName);
-
-        Policies policies = getNamespacePolicies(namespaceName);
-
-        if (namespaceName.isGlobal()) {
-            // check cluster ownership for a given global namespace: redirect 
if peer-cluster owns it
-            validateGlobalNamespaceOwnership(namespaceName);
-        } else {
-            validateClusterOwnership(namespaceName.getCluster());
-            validateClusterForTenant(namespaceName.getTenant(), 
namespaceName.getCluster());
-        }
-
-        validatePoliciesReadOnlyAccess();
-
-        List<String> supportedNamespaceBundleSplitAlgorithms =
-                
pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
-        if (StringUtils.isNotBlank(splitAlgorithmName)) {
-            if 
(!supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
-                asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
-                        "Unsupported namespace bundle split algorithm, 
supported algorithms are "
-                                + supportedNamespaceBundleSplitAlgorithms));
-            }
-            if 
(splitAlgorithmName.equalsIgnoreCase(NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE)
-                    && (splitBoundaries == null || splitBoundaries.size() == 
0)) {
-                asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
-                        "With specified_positions_divide split algorithm, 
splitBoundaries must not be emtpy"));
-            }
-        }
-
-        NamespaceBundle nsBundle;
-        try {
-            nsBundle = validateNamespaceBundleOwnership(namespaceName, 
policies.bundles, bundleRange,
-                    authoritative, false);
-        } catch (Exception e) {
-            asyncResponse.resume(e);
-            return;
-        }
+    protected CompletableFuture<Void> internalSplitNamespaceBundleAsync(String 
bundleName,
+                                                                        
boolean authoritative, boolean unload,
+                                                                        String 
splitAlgorithmName,
+                                                                        
List<Long> splitBoundaries) {
+        return validateSuperUserAccessAsync()
+                .thenAccept(__ -> {
+                    checkNotNull(bundleName, "BundleRange should not be null");
+                    log.info("[{}] Split namespace bundle {}/{}", 
clientAppId(), namespaceName, bundleName);
+                    List<String> supportedNamespaceBundleSplitAlgorithms =
+                            
pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
+                    if (StringUtils.isNotBlank(splitAlgorithmName)) {
+                        if 
(!supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
+                            throw new RestException(Status.PRECONDITION_FAILED,
+                                    "Unsupported namespace bundle split 
algorithm, supported algorithms are "
+                                            + 
supportedNamespaceBundleSplitAlgorithms);
+                        }
+                        if (splitAlgorithmName
+                                
.equalsIgnoreCase(NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE)
+                                && (splitBoundaries == null || 
splitBoundaries.size() == 0)) {
+                            throw new RestException(Status.PRECONDITION_FAILED,
+                                    "With specified_positions_divide split 
algorithm, splitBoundaries must not be "
+                                            + "emtpy");
+                        }
+                    }
+                })
+                .thenCompose(__ -> {
+                    if (namespaceName.isGlobal()) {
+                        // check cluster ownership for a given global 
namespace: redirect if peer-cluster owns it
+                        return 
validateGlobalNamespaceOwnershipAsync(namespaceName);
+                    } else {
+                        return 
validateClusterOwnershipAsync(namespaceName.getCluster())
+                                .thenCompose(ignore -> 
validateClusterForTenantAsync(namespaceName.getTenant(),
+                                        namespaceName.getCluster()));
+                    }
+                })
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenCompose(policies->{
+                    String bundleRange = getBundleRange(bundleName);
+                    return 
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, 
bundleRange,
+                            authoritative, false)
+                            .thenCompose(nsBundle -> 
pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
+                                    
getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries));
 
-        pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
-                getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), 
splitBoundaries)
-                .thenRun(() -> {
-                    log.info("[{}] Successfully split namespace bundle {}", 
clientAppId(), nsBundle.toString());
-                    asyncResponse.resume(Response.noContent().build());
-                }).exceptionally(ex -> {
-            if (ex.getCause() instanceof IllegalArgumentException) {
-                log.error("[{}] Failed to split namespace bundle {}/{} due to 
{}", clientAppId(), namespaceName,
-                        bundleRange, ex.getMessage());
-                asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
-                        "Split bundle failed due to invalid request"));
-            } else {
-                log.error("[{}] Failed to split namespace bundle {}/{}", 
clientAppId(), namespaceName, bundleRange, ex);
-                asyncResponse.resume(new RestException(ex.getCause()));
-            }
-            return null;
-        });
+                });
     }
 
-    protected void internalGetTopicHashPositions(AsyncResponse asyncResponse, 
String bundleRange, List<String> topics) {
+    protected CompletableFuture<TopicHashPositions> 
internalGetTopicHashPositionsAsync(String bundleRange,
+                                                                               
        List<String> topics) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Getting hash position for topic list {}, bundle 
{}", clientAppId(), topics, bundleRange);
         }
-        validateNamespacePolicyOperation(namespaceName, 
PolicyName.PERSISTENCE, PolicyOperation.READ);
-        Policies policies = getNamespacePolicies(namespaceName);
-        NamespaceBundle bundle = 
validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
-                false, true);
-        
pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle).whenComplete(
-                (allTopicsInThisBundle, throwable) -> {
-                    if (throwable != null) {
-                        log.error("[{}] {} Failed to get topic list for bundle 
{}.", clientAppId(),
-                                namespaceName, bundle);
-                        asyncResponse.resume(new RestException(throwable));
-                    }
-                    // if topics is empty, return all topics' hash position in 
this bundle
-                    Map<String, Long> topicHashPositions = new HashMap<>();
-                    if (topics == null || topics.size() == 0) {
-                        allTopicsInThisBundle.forEach(t -> {
-                            topicHashPositions.put(t,
-                                    
pulsar().getNamespaceService().getNamespaceBundleFactory()
-                                            .getLongHashCode(t));
-                        });
-                    } else {
-                        for (String topic : 
topics.stream().map(Codec::decode).collect(Collectors.toList())) {
-                            TopicName topicName = TopicName.get(topic);
-                            // partitioned topic
-                            if (topicName.getPartitionIndex() == -1) {
-                                allTopicsInThisBundle.stream()
-                                        .filter(t -> 
TopicName.get(t).getPartitionedTopicName()
-                                                
.equals(TopicName.get(topic).getPartitionedTopicName()))
-                                        .forEach(partition -> {
-                                            topicHashPositions.put(partition,
-                                                    
pulsar().getNamespaceService().getNamespaceBundleFactory()
-                                                            
.getLongHashCode(partition));
-                                        });
-                            } else { // topic partition
-                                if 
(allTopicsInThisBundle.contains(topicName.toString())) {
-                                    topicHashPositions.put(topic,
-                                            
pulsar().getNamespaceService().getNamespaceBundleFactory()
-                                                    .getLongHashCode(topic));
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.PERSISTENCE, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenCompose(policies -> {
+                    return 
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, 
bundleRange,
+                            false, true)
+                            .thenCompose(nsBundle ->
+                                    
pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(nsBundle))
+                            .thenApply(allTopicsInThisBundle -> {
+                                Map<String, Long> topicHashPositions = new 
HashMap<>();
+                                if (topics == null || topics.size() == 0) {
+                                    allTopicsInThisBundle.forEach(t -> {
+                                        topicHashPositions.put(t,
+                                                
pulsar().getNamespaceService().getNamespaceBundleFactory()
+                                                        .getLongHashCode(t));
+                                    });
+                                } else {
+                                    for (String topic : 
topics.stream().map(Codec::decode).toList()) {
+                                        TopicName topicName = 
TopicName.get(topic);
+                                        // partitioned topic
+                                        if (topicName.getPartitionIndex() == 
-1) {
+                                            allTopicsInThisBundle.stream()
+                                                    .filter(t -> 
TopicName.get(t).getPartitionedTopicName()
+                                                            
.equals(TopicName.get(topic).getPartitionedTopicName()))
+                                                    .forEach(partition -> {
+                                                        
topicHashPositions.put(partition,
+                                                                
pulsar().getNamespaceService()
+                                                                        
.getNamespaceBundleFactory()
+                                                                        
.getLongHashCode(partition));
+                                                    });
+                                        } else { // topic partition
+                                            if 
(allTopicsInThisBundle.contains(topicName.toString())) {
+                                                topicHashPositions.put(topic,
+                                                        
pulsar().getNamespaceService().getNamespaceBundleFactory()
+                                                                
.getLongHashCode(topic));
+                                            }
+                                        }
+                                    }
                                 }
-                            }
-                        }
-                    }
-                    asyncResponse.resume(
-                            new TopicHashPositions(namespaceName.toString(), 
bundleRange, topicHashPositions));
+                                return new 
TopicHashPositions(namespaceName.toString(), bundleRange,
+                                        topicHashPositions);
+                            });
                 });
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 4264e5669d0..d3f1fb9a9e4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -809,15 +809,22 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist"),
             @ApiResponse(code = 412, message = "Namespace is not setup to 
split in bundles") })
-    public BundlesData getBundlesData(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace) {
-        validatePoliciesReadOnlyAccess();
-        validateNamespaceName(property, cluster, namespace);
-        validateNamespaceOperation(NamespaceName.get(property, namespace), 
NamespaceOperation.GET_BUNDLE);
-
-        Policies policies = getNamespacePolicies(namespaceName);
-
-        return policies.bundles;
+    public void getBundlesData(@Suspended final AsyncResponse asyncResponse,
+                              @PathParam("property") String property,
+                              @PathParam("cluster") String cluster,
+                              @PathParam("namespace") String namespace) {
+        validateNamespaceName(property, cluster, namespace);
+        validatePoliciesReadOnlyAccessAsync()
+                .thenCompose(__ -> 
validateNamespaceOperationAsync(NamespaceName.get(property, namespace),
+                        NamespaceOperation.GET_BUNDLE))
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.bundles))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get bundle data for namespace {} 
", clientAppId(),
+                            namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @PUT
@@ -899,15 +906,27 @@ public class Namespaces extends NamespacesBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
             @QueryParam("unload") @DefaultValue("false") boolean unload,
             @QueryParam("splitBoundaries") @DefaultValue("") List<Long> 
splitBoundaries) {
-        try {
-            validateNamespaceName(property, cluster, namespace);
-            internalSplitNamespaceBundle(asyncResponse, bundleRange,
-                    authoritative, unload, 
NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME, splitBoundaries);
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(property, cluster, namespace);
+        internalSplitNamespaceBundleAsync(bundleRange,
+                authoritative, unload, 
NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME, splitBoundaries)
+                .thenAccept(__ -> {
+                    log.info("[{}] Successfully split namespace bundle {}", 
clientAppId(), bundleRange);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to split namespace bundle 
{}/{}",
+                                clientAppId(), namespaceName, bundleRange, ex);
+                    }
+                    Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                    if (realCause instanceof IllegalArgumentException) {
+                        asyncResponse.resume(new 
RestException(Response.Status.PRECONDITION_FAILED,
+                                "Split bundle failed due to invalid request"));
+                    } else {
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    }
+                    return null;
+                });
     }
 
     @GET
@@ -923,17 +942,32 @@ public class Namespaces extends NamespacesBase {
             @QueryParam("topics") List<String> topics,
             @Suspended AsyncResponse asyncResponse) {
         validateNamespaceName(property, cluster, namespace);
-        internalGetTopicHashPositions(asyncResponse, bundle, topics);
+        internalGetTopicHashPositionsAsync(bundle, topics)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] {} Failed to get topic list for bundle 
{}.", clientAppId(),
+                                namespaceName, bundle);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
     @Path("/{property}/{cluster}/{namespace}/publishRate")
     @ApiOperation(hidden = true, value = "Set publish-rate throttling for all 
topics of the namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission") })
-    public void setPublishRate(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, PublishRate publishRate) 
{
+    public void setPublishRate(@Suspended AsyncResponse asyncResponse,
+                               @PathParam("property") String property, 
@PathParam("cluster") String cluster,
+                               @PathParam("namespace") String namespace, 
PublishRate publishRate) {
         validateNamespaceName(property, cluster, namespace);
-        internalSetPublishRate(publishRate);
+        internalSetPublishRateAsync(publishRate)
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -943,20 +977,37 @@ public class Namespaces extends NamespacesBase {
                     + "-1 means msg-publish-rate or byte-publish-rate not 
configured in publish-rate yet")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist")})
-    public PublishRate getPublishRate(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace) {
+    public void getPublishRate(@Suspended AsyncResponse asyncResponse,
+                               @PathParam("property") String property,
+                               @PathParam("cluster") String cluster,
+                               @PathParam("namespace") String namespace) {
         validateNamespaceName(property, cluster, namespace);
-        return internalGetPublishRate();
+        internalGetPublishRateAsync()
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    log.error("Failed to get publish rate for namespace {}", 
namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
     @Path("/{property}/{cluster}/{namespace}/dispatchRate")
     @ApiOperation(hidden = true, value = "Set dispatch-rate throttling for all 
topics of the namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission") })
-    public void setDispatchRate(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, DispatchRateImpl 
dispatchRate) {
+    public void setDispatchRate(@Suspended AsyncResponse asyncResponse,
+                                @PathParam("property") String property,
+                                @PathParam("cluster") String cluster,
+                                @PathParam("namespace") String namespace, 
DispatchRateImpl dispatchRate) {
         validateNamespaceName(property, cluster, namespace);
-        internalSetTopicDispatchRate(dispatchRate);
+        internalSetTopicDispatchRateAsync(dispatchRate)
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to update the dispatchRate for 
cluster on namespace {}", clientAppId(),
+                            namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index aa3698e1c7d..6d8fe5bc1d1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -58,7 +58,6 @@ import 
org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
-import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
@@ -735,15 +734,21 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist"),
             @ApiResponse(code = 412, message = "Namespace is not setup to 
split in bundles") })
-    public BundlesData getBundlesData(@PathParam("tenant") String tenant,
-            @PathParam("namespace") String namespace) {
-        validatePoliciesReadOnlyAccess();
+    public void getBundlesData(@Suspended final AsyncResponse asyncResponse,
+                                      @PathParam("tenant") String tenant,
+                                      @PathParam("namespace") String 
namespace) {
         validateNamespaceName(tenant, namespace);
-        validateNamespaceOperation(NamespaceName.get(tenant, namespace), 
NamespaceOperation.GET_BUNDLE);
-
-        Policies policies = getNamespacePolicies(namespaceName);
-
-        return policies.bundles;
+        validatePoliciesReadOnlyAccessAsync()
+                .thenCompose(__ -> 
validateNamespaceOperationAsync(NamespaceName.get(tenant, namespace),
+                        NamespaceOperation.GET_BUNDLE))
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.bundles))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get bundle data for namespace 
{}", clientAppId(),
+                            namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @PUT
@@ -826,15 +831,26 @@ public class Namespaces extends NamespacesBase {
             @QueryParam("unload") @DefaultValue("false") boolean unload,
             @QueryParam("splitAlgorithmName") String splitAlgorithmName,
             @ApiParam("splitBoundaries") List<Long> splitBoundaries) {
-        try {
-            validateNamespaceName(tenant, namespace);
-            internalSplitNamespaceBundle(asyncResponse,
-                    bundleRange, authoritative, unload, splitAlgorithmName, 
splitBoundaries);
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(tenant, namespace);
+        internalSplitNamespaceBundleAsync(bundleRange, authoritative, unload, 
splitAlgorithmName, splitBoundaries)
+                .thenAccept(__ -> {
+                    log.info("[{}] Successfully split namespace bundle {}", 
clientAppId(), bundleRange);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to split namespace bundle {}/{} 
due to {}",
+                                clientAppId(), namespaceName, bundleRange, 
ex.getMessage());
+                    }
+                    Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                    if (realCause instanceof IllegalArgumentException) {
+                        asyncResponse.resume(new 
RestException(Response.Status.PRECONDITION_FAILED,
+                                "Split bundle failed due to invalid request"));
+                    } else {
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    }
+                    return null;
+                });
     }
 
     @GET
@@ -849,8 +865,17 @@ public class Namespaces extends NamespacesBase {
             @PathParam("bundle") String bundleRange,
             @QueryParam("topics") List<String> topics,
             @Suspended AsyncResponse asyncResponse) {
-            validateNamespaceName(tenant, namespace);
-            internalGetTopicHashPositions(asyncResponse, bundleRange, topics);
+        validateNamespaceName(tenant, namespace);
+        internalGetTopicHashPositionsAsync(bundleRange, topics)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] {} Failed to get topic list for bundle 
{}.", clientAppId(),
+                                namespaceName, bundleRange);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 11a16d4d17f..2369a0af4bb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -644,10 +644,8 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
                 .numBundles(boundaries.size() - 1)
                 .build();
         createBundledTestNamespaces(this.testTenant, this.testLocalCluster, 
"test-bundled-namespace-1", bundle);
-        BundlesData responseData = namespaces.getBundlesData(testTenant, 
this.testLocalCluster,
-                "test-bundled-namespace-1");
-
-        assertEquals(responseData, bundle);
+        assertEquals(asyncRequests(ctx -> namespaces.getBundlesData(ctx, 
testTenant, this.testLocalCluster,
+                "test-bundled-namespace-1")), bundle);
     }
 
     @Test
@@ -917,7 +915,8 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
             ArgumentCaptor<Response> captor = 
ArgumentCaptor.forClass(Response.class);
             verify(response, timeout(5000).times(1)).resume(captor.capture());
             // verify split bundles
-            BundlesData bundlesData = namespaces.getBundlesData(testTenant, 
testLocalCluster, bundledNsLocal);
+            BundlesData bundlesData = (BundlesData) asyncRequests(ctx -> 
namespaces.getBundlesData(ctx, testTenant,
+                    testLocalCluster, bundledNsLocal));
             assertNotNull(bundlesData);
             assertEquals(bundlesData.getBoundaries().size(), 3);
             assertEquals(bundlesData.getBoundaries().get(0), "0x00000000");

Reply via email to