This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6cf8cb0729a8228f443ad9cf30b3085eabe4db03 Author: feynmanlin <[email protected]> AuthorDate: Tue Jun 29 20:36:17 2021 +0800 Fix GetListInBundle return all Topics in bundle (#11110) https://github.com/apache/pulsar/blob/7824325efc822307f60b38e9d24bdb189e415a06/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java#L310-L313 1. `getListFromBundle` in NonPersistentTopics should only return the list of non-persistent topics under a namespace bundle. But now it returns all the topics. 2. Now it will traverse all topics on Broker. When there are a lot of topics, timeouts often occur.  I will support getting all topics or only persistent topics in another PR 1) Filter out persistent topics 2) Only traverse the topics under the corresponding bundle (cherry picked from commit a6ab3af45bf225c3adf9bdf711c637cde703c0b9) --- .../broker/admin/v2/NonPersistentTopics.java | 21 ++++++++---- .../apache/pulsar/broker/admin/AdminApiTest2.java | 39 +++++++++++++++++++--- 2 files changed, 50 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index 9d29967..9755b93 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -25,6 +25,7 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -57,6 +58,7 @@ import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -352,13 +354,20 @@ public class NonPersistentTopics extends PersistentTopics { return; } try { + ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> bundleTopics = + pulsar().getBrokerService().getMultiLayerTopicsMap().get(namespaceName.toString()); + if (bundleTopics == null || bundleTopics.isEmpty()) { + asyncResponse.resume(Collections.emptyList()); + return; + } final List<String> topicList = Lists.newArrayList(); - pulsar().getBrokerService().forEachTopic(topic -> { - TopicName topicName = TopicName.get(topic.getName()); - if (nsBundle.includes(topicName)) { - topicList.add(topic.getName()); - } - }); + String bundleKey = namespaceName.toString() + "/" + nsBundle.getBundleRange(); + ConcurrentOpenHashMap<String, Topic> topicMap = bundleTopics.get(bundleKey); + if (topicMap != null) { + topicList.addAll(topicMap.keys().stream() + .filter(name -> !TopicName.get(name).isPersistent()) + .collect(Collectors.toList())); + } asyncResponse.resume(topicList); } catch (Exception e) { log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(), diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index f337966..830c8f1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -80,17 +80,14 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData; -import org.apache.pulsar.common.policies.data.AutoFailoverPolicyDataImpl; import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType; import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl; import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; +import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.ConsumerStats; -import org.apache.pulsar.common.policies.data.FailureDomainImpl; import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; -import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; @@ -2018,6 +2015,40 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { } @Test + public void testGetListInBundle() throws Exception { + final String namespace = "prop-xyz/ns11"; + admin.namespaces().createNamespace(namespace, 3); + + final String persistentTopicName = TopicName.get( + "persistent", NamespaceName.get(namespace), + "get_topics_mode_" + UUID.randomUUID()).toString(); + + final String nonPersistentTopicName = TopicName.get( + "non-persistent", NamespaceName.get(namespace), + "get_topics_mode_" + UUID.randomUUID()).toString(); + admin.topics().createPartitionedTopic(persistentTopicName, 3); + admin.topics().createPartitionedTopic(nonPersistentTopicName, 3); + pulsarClient.newProducer().topic(persistentTopicName).create().close(); + pulsarClient.newProducer().topic(nonPersistentTopicName).create().close(); + + BundlesData bundlesData = admin.namespaces().getBundles(namespace); + List<String> boundaries = bundlesData.getBoundaries(); + int topicNum = 0; + for (int i = 0; i < boundaries.size() - 1; i++) { + String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1)); + List<String> topic = admin.topics().getListInBundle(namespace, bundle); + if (topic == null) { + continue; + } + topicNum += topic.size(); + for (String s : topic) { + assertFalse(TopicName.get(s).isPersistent()); + } + } + assertEquals(topicNum, 3); + } + + @Test public void testGetTopicsWithDifferentMode() throws Exception { final String namespace = "prop-xyz/ns1";
