This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6cf8cb0729a8228f443ad9cf30b3085eabe4db03
Author: feynmanlin <[email protected]>
AuthorDate: Tue Jun 29 20:36:17 2021 +0800

    Fix GetListInBundle return all Topics in bundle (#11110)
    
    
https://github.com/apache/pulsar/blob/7824325efc822307f60b38e9d24bdb189e415a06/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java#L310-L313
    
    1.
    `getListFromBundle` in NonPersistentTopics should only return the list of 
non-persistent topics under a namespace bundle.
    But now it returns all the topics.
    
    2.
    Now it will traverse all topics on Broker. When there are a lot of topics, 
timeouts often occur.
    
![image](https://user-images.githubusercontent.com/9758905/123505745-67815d80-d693-11eb-87bb-ef3287668155.png)
    
    I will support getting all topics or only persistent topics in another PR
    
    1) Filter out persistent topics
    2) Only traverse the topics under the corresponding bundle
    
    (cherry picked from commit a6ab3af45bf225c3adf9bdf711c637cde703c0b9)
---
 .../broker/admin/v2/NonPersistentTopics.java       | 21 ++++++++----
 .../apache/pulsar/broker/admin/AdminApiTest2.java  | 39 +++++++++++++++++++---
 2 files changed, 50 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 9d29967..9755b93 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -25,6 +25,7 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -57,6 +58,7 @@ import 
org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -352,13 +354,20 @@ public class NonPersistentTopics extends PersistentTopics 
{
                     return;
                 }
                 try {
+                    ConcurrentOpenHashMap<String, 
ConcurrentOpenHashMap<String, Topic>> bundleTopics =
+                            
pulsar().getBrokerService().getMultiLayerTopicsMap().get(namespaceName.toString());
+                    if (bundleTopics == null || bundleTopics.isEmpty()) {
+                        asyncResponse.resume(Collections.emptyList());
+                        return;
+                    }
                     final List<String> topicList = Lists.newArrayList();
-                    pulsar().getBrokerService().forEachTopic(topic -> {
-                        TopicName topicName = TopicName.get(topic.getName());
-                        if (nsBundle.includes(topicName)) {
-                            topicList.add(topic.getName());
-                        }
-                    });
+                    String bundleKey = namespaceName.toString() + "/" + 
nsBundle.getBundleRange();
+                    ConcurrentOpenHashMap<String, Topic> topicMap = 
bundleTopics.get(bundleKey);
+                    if (topicMap != null) {
+                        topicList.addAll(topicMap.keys().stream()
+                                .filter(name -> 
!TopicName.get(name).isPersistent())
+                                .collect(Collectors.toList()));
+                    }
                     asyncResponse.resume(topicList);
                 } catch (Exception e) {
                     log.error("[{}] Failed to list topics on namespace bundle 
{}/{}", clientAppId(),
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index f337966..830c8f1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -80,17 +80,14 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
-import org.apache.pulsar.common.policies.data.AutoFailoverPolicyDataImpl;
 import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
 import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
 import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
+import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
-import org.apache.pulsar.common.policies.data.FailureDomainImpl;
 import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
-import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -2018,6 +2015,40 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testGetListInBundle() throws Exception {
+        final String namespace = "prop-xyz/ns11";
+        admin.namespaces().createNamespace(namespace, 3);
+
+        final String persistentTopicName = TopicName.get(
+                "persistent", NamespaceName.get(namespace),
+                "get_topics_mode_" + UUID.randomUUID()).toString();
+
+        final String nonPersistentTopicName = TopicName.get(
+                "non-persistent", NamespaceName.get(namespace),
+                "get_topics_mode_" + UUID.randomUUID()).toString();
+        admin.topics().createPartitionedTopic(persistentTopicName, 3);
+        admin.topics().createPartitionedTopic(nonPersistentTopicName, 3);
+        pulsarClient.newProducer().topic(persistentTopicName).create().close();
+        
pulsarClient.newProducer().topic(nonPersistentTopicName).create().close();
+
+        BundlesData bundlesData = admin.namespaces().getBundles(namespace);
+        List<String> boundaries = bundlesData.getBoundaries();
+        int topicNum = 0;
+        for (int i = 0; i < boundaries.size() - 1; i++) {
+            String bundle = String.format("%s_%s", boundaries.get(i), 
boundaries.get(i + 1));
+            List<String> topic = admin.topics().getListInBundle(namespace, 
bundle);
+            if (topic == null) {
+                continue;
+            }
+            topicNum += topic.size();
+            for (String s : topic) {
+                assertFalse(TopicName.get(s).isPersistent());
+            }
+        }
+        assertEquals(topicNum, 3);
+    }
+
+    @Test
     public void testGetTopicsWithDifferentMode() throws Exception {
         final String namespace = "prop-xyz/ns1";
 

Reply via email to