This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3fcaa0a3044e3861b70d22141dd6dc3f6c178abd
Author: Kai Wang <[email protected]>
AuthorDate: Fri Jun 30 09:23:31 2023 +0800

    [improve][broker] Make get list from bundle Admin API async (#20652)
    
    (cherry picked from commit 4958f4578967a6960cd6ac4c0c0759c4bd903b94)
---
 .../broker/admin/v2/NonPersistentTopics.java       | 60 +++++++++++-----------
 .../pulsar/broker/web/PulsarWebResource.java       |  9 ++--
 2 files changed, 35 insertions(+), 34 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 0c4ddd46c09..7cb18264ea1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -25,6 +25,7 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -51,7 +52,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -442,35 +442,35 @@ public class NonPersistentTopics extends PersistentTopics 
{
                         bundleRange);
                 asyncResponse.resume(Response.noContent().build());
             } else {
-                NamespaceBundle nsBundle;
-                try {
-                    nsBundle = validateNamespaceBundleOwnership(namespaceName, 
policies.bundles,
-                        bundleRange, true, true);
-                } catch (WebApplicationException wae) {
-                    asyncResponse.resume(wae);
-                    return;
-                }
-                try {
-                    ConcurrentOpenHashMap<String, 
ConcurrentOpenHashMap<String, Topic>> bundleTopics =
-                            
pulsar().getBrokerService().getMultiLayerTopicsMap().get(namespaceName.toString());
-                    if (bundleTopics == null || bundleTopics.isEmpty()) {
-                        asyncResponse.resume(Collections.emptyList());
-                        return;
-                    }
-                    final List<String> topicList = Lists.newArrayList();
-                    String bundleKey = namespaceName.toString() + "/" + 
nsBundle.getBundleRange();
-                    ConcurrentOpenHashMap<String, Topic> topicMap = 
bundleTopics.get(bundleKey);
-                    if (topicMap != null) {
-                        topicList.addAll(topicMap.keys().stream()
-                                .filter(name -> 
!TopicName.get(name).isPersistent())
-                                .collect(Collectors.toList()));
-                    }
-                    asyncResponse.resume(topicList);
-                } catch (Exception e) {
-                    log.error("[{}] Failed to list topics on namespace bundle 
{}/{}", clientAppId(),
-                            namespaceName, bundleRange, e);
-                    asyncResponse.resume(new RestException(e));
-                }
+                validateNamespaceBundleOwnershipAsync(namespaceName, 
policies.bundles, bundleRange, true, true)
+                        .thenAccept(nsBundle -> {
+                            ConcurrentOpenHashMap<String, 
ConcurrentOpenHashMap<String, Topic>> bundleTopics =
+                                    pulsar().getBrokerService()
+                                            
.getMultiLayerTopicsMap().get(namespaceName.toString());
+                            if (bundleTopics == null || 
bundleTopics.isEmpty()) {
+                                asyncResponse.resume(Collections.emptyList());
+                                return;
+                            }
+                            final List<String> topicList = new ArrayList<>();
+                            String bundleKey = namespaceName.toString() + "/" 
+ nsBundle.getBundleRange();
+                            ConcurrentOpenHashMap<String, Topic> topicMap = 
bundleTopics.get(bundleKey);
+                            if (topicMap != null) {
+                                topicList.addAll(topicMap.keys().stream()
+                                        .filter(name -> 
!TopicName.get(name).isPersistent())
+                                        .collect(Collectors.toList()));
+                            }
+                            asyncResponse.resume(topicList);
+                        }).exceptionally(ex -> {
+                            Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                            log.error("[{}] Failed to list topics on namespace 
bundle {}/{}", clientAppId(),
+                                    namespaceName, bundleRange, realCause);
+                            if (realCause instanceof WebApplicationException) {
+                                asyncResponse.resume(realCause);
+                            } else {
+                                asyncResponse.resume(new 
RestException(realCause));
+                            }
+                            return null;
+                        });
             }
         }).exceptionally(ex -> {
             log.error("[{}] Failed to list topics on namespace bundle {}/{}", 
clientAppId(),
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 66d81827f71..021b80330ae 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -723,10 +723,11 @@ public abstract class PulsarWebResource {
                                 if (!owned) {
                                     boolean newAuthoritative = 
this.isLeaderBroker();
                                     // Replace the host and port of the 
current request and redirect
-                                    URI redirect = 
UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.get().getHost())
-                                            
.port(webUrl.get().getPort()).replaceQueryParam("authoritative",
-                                                    newAuthoritative).build();
-
+                                    UriBuilder uriBuilder = 
UriBuilder.fromUri(uri.getRequestUri())
+                                            .host(webUrl.get().getHost())
+                                            .port(webUrl.get().getPort())
+                                            
.replaceQueryParam("authoritative", newAuthoritative);
+                                    URI redirect = uriBuilder.build();
                                     log.debug("{} is not a service unit 
owned", bundle);
                                     // Redirect
                                     log.debug("Redirecting the rest call to 
{}", redirect);

Reply via email to