This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3fcaa0a3044e3861b70d22141dd6dc3f6c178abd Author: Kai Wang <[email protected]> AuthorDate: Fri Jun 30 09:23:31 2023 +0800 [improve][broker] Make get list from bundle Admin API async (#20652) (cherry picked from commit 4958f4578967a6960cd6ac4c0c0759c4bd903b94) --- .../broker/admin/v2/NonPersistentTopics.java | 60 +++++++++++----------- .../pulsar/broker/web/PulsarWebResource.java | 9 ++-- 2 files changed, 35 insertions(+), 34 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index 0c4ddd46c09..7cb18264ea1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -25,6 +25,7 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -51,7 +52,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.web.RestException; -import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.Policies; @@ -442,35 +442,35 @@ public class NonPersistentTopics extends PersistentTopics { bundleRange); asyncResponse.resume(Response.noContent().build()); } else { - NamespaceBundle nsBundle; - try { - nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, - bundleRange, true, true); - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - return; - } - try { - ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> bundleTopics = - pulsar().getBrokerService().getMultiLayerTopicsMap().get(namespaceName.toString()); - if (bundleTopics == null || bundleTopics.isEmpty()) { - asyncResponse.resume(Collections.emptyList()); - return; - } - final List<String> topicList = Lists.newArrayList(); - String bundleKey = namespaceName.toString() + "/" + nsBundle.getBundleRange(); - ConcurrentOpenHashMap<String, Topic> topicMap = bundleTopics.get(bundleKey); - if (topicMap != null) { - topicList.addAll(topicMap.keys().stream() - .filter(name -> !TopicName.get(name).isPersistent()) - .collect(Collectors.toList())); - } - asyncResponse.resume(topicList); - } catch (Exception e) { - log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(), - namespaceName, bundleRange, e); - asyncResponse.resume(new RestException(e)); - } + validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, true, true) + .thenAccept(nsBundle -> { + ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> bundleTopics = + pulsar().getBrokerService() + .getMultiLayerTopicsMap().get(namespaceName.toString()); + if (bundleTopics == null || bundleTopics.isEmpty()) { + asyncResponse.resume(Collections.emptyList()); + return; + } + final List<String> topicList = new ArrayList<>(); + String bundleKey = namespaceName.toString() + "/" + nsBundle.getBundleRange(); + ConcurrentOpenHashMap<String, Topic> topicMap = bundleTopics.get(bundleKey); + if (topicMap != null) { + topicList.addAll(topicMap.keys().stream() + .filter(name -> !TopicName.get(name).isPersistent()) + .collect(Collectors.toList())); + } + asyncResponse.resume(topicList); + }).exceptionally(ex -> { + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(), + namespaceName, bundleRange, realCause); + if (realCause instanceof WebApplicationException) { + asyncResponse.resume(realCause); + } else { + asyncResponse.resume(new RestException(realCause)); + } + return null; + }); } }).exceptionally(ex -> { log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 66d81827f71..021b80330ae 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -723,10 +723,11 @@ public abstract class PulsarWebResource { if (!owned) { boolean newAuthoritative = this.isLeaderBroker(); // Replace the host and port of the current request and redirect - URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.get().getHost()) - .port(webUrl.get().getPort()).replaceQueryParam("authoritative", - newAuthoritative).build(); - + UriBuilder uriBuilder = UriBuilder.fromUri(uri.getRequestUri()) + .host(webUrl.get().getHost()) + .port(webUrl.get().getPort()) + .replaceQueryParam("authoritative", newAuthoritative); + URI redirect = uriBuilder.build(); log.debug("{} is not a service unit owned", bundle); // Redirect log.debug("Redirecting the rest call to {}", redirect);
