This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e4f338e918 [improve] [broker] high CPU usage caused by list topics 
under namespace (#23049)
3e4f338e918 is described below

commit 3e4f338e91877fb2e4592aa9abc3aced6d4e50c7
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 22 08:45:23 2024 +0800

    [improve] [broker] high CPU usage caused by list topics under namespace 
(#23049)
---
 .../pulsar/broker/namespace/NamespaceService.java  | 23 ++++++++++++++++++++++
 .../apache/pulsar/broker/service/ServerCnx.java    |  4 ++--
 .../pulsar/broker/service/ServerCnxTest.java       |  2 ++
 3 files changed, 27 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 2a1584df961..ec4c907234a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -40,6 +40,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -55,6 +56,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.ListUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -104,6 +106,7 @@ import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
 import org.apache.pulsar.common.stats.MetricsUtil;
+import org.apache.pulsar.common.topics.TopicList;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.metadata.api.MetadataCache;
@@ -187,6 +190,9 @@ public class NamespaceService implements AutoCloseable {
             .register();
     private final DoubleHistogram lookupLatencyHistogram;
 
+    private ConcurrentHashMap<String, CompletableFuture<List<String>>> 
inProgressQueryUserTopics =
+            new ConcurrentHashMap<>();
+
     /**
      * Default constructor.
      */
@@ -1509,6 +1515,23 @@ public class NamespaceService implements AutoCloseable {
         }
     }
 
+    public CompletableFuture<List<String>> getListOfUserTopics(NamespaceName 
namespaceName, Mode mode) {
+        String key = String.format("%s://%s", mode, namespaceName);
+        final MutableBoolean initializedByCurrentThread = new MutableBoolean();
+        CompletableFuture<List<String>> queryRes = 
inProgressQueryUserTopics.computeIfAbsent(key, k -> {
+            initializedByCurrentThread.setTrue();
+            return getListOfTopics(namespaceName, mode).thenApplyAsync(list -> 
{
+                return TopicList.filterSystemTopic(list);
+            }, pulsar.getExecutor());
+        });
+        if (initializedByCurrentThread.getValue()) {
+            queryRes.whenComplete((ignore, ex) -> {
+                inProgressQueryUserTopics.remove(key, queryRes);
+            });
+        }
+        return queryRes;
+    }
+
     public CompletableFuture<List<String>> getAllPartitions(NamespaceName 
namespaceName) {
         return getPartitions(namespaceName, TopicDomain.persistent)
                 .thenCombine(getPartitions(namespaceName, 
TopicDomain.non_persistent),
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4933aee974d..9bca80c41bb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2459,11 +2459,11 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         if (lookupSemaphore.tryAcquire()) {
             isNamespaceOperationAllowed(namespaceName, 
NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
                 if (isAuthorized) {
-                    
getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName,
 mode)
+                    
getBrokerService().pulsar().getNamespaceService().getListOfUserTopics(namespaceName,
 mode)
                         .thenAccept(topics -> {
                             boolean filterTopics = false;
                             // filter system topic
-                            List<String> filteredTopics = 
TopicList.filterSystemTopic(topics);
+                            List<String> filteredTopics = topics;
 
                             if (enableSubscriptionPatternEvaluation && 
topicsPattern.isPresent()) {
                                 if (topicsPattern.get().length() <= 
maxSubscriptionPatternLength) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 27afedd6b10..58c6b96a0f3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -229,6 +229,8 @@ public class ServerCnxTest {
         
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any());
         
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics(
                 NamespaceName.get("use", "ns-abc"), 
CommandGetTopicsOfNamespace.Mode.ALL);
+        
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfUserTopics(
+                NamespaceName.get("use", "ns-abc"), 
CommandGetTopicsOfNamespace.Mode.ALL);
         
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfPersistentTopics(
                 NamespaceName.get("use", "ns-abc"));
 

Reply via email to