This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 340a19655ab [improve] [broker] high CPU usage caused by list topics
under namespace (#23049)
340a19655ab is described below
commit 340a19655ab949c39d1d888bc782fe5ece566b84
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 22 08:45:23 2024 +0800
[improve] [broker] high CPU usage caused by list topics under namespace
(#23049)
(cherry picked from commit 3e4f338e91877fb2e4592aa9abc3aced6d4e50c7)
---
.../pulsar/broker/namespace/NamespaceService.java | 23 ++++++++++++++++++++++
.../apache/pulsar/broker/service/ServerCnx.java | 4 ++--
.../pulsar/broker/service/ServerCnxTest.java | 2 ++
3 files changed, 27 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index e9eacf570e3..8c97555205d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -38,6 +38,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -50,6 +51,7 @@ import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
@@ -92,6 +94,7 @@ import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
+import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataCache;
@@ -164,6 +167,9 @@ public class NamespaceService implements AutoCloseable {
.register();
+ private ConcurrentHashMap<String, CompletableFuture<List<String>>>
inProgressQueryUserTopics =
+ new ConcurrentHashMap<>();
+
/**
* Default constructor.
*
@@ -1206,6 +1212,23 @@ public class NamespaceService implements AutoCloseable {
}
}
+ public CompletableFuture<List<String>> getListOfUserTopics(NamespaceName
namespaceName, Mode mode) {
+ String key = String.format("%s://%s", mode, namespaceName);
+ final MutableBoolean initializedByCurrentThread = new MutableBoolean();
+ CompletableFuture<List<String>> queryRes =
inProgressQueryUserTopics.computeIfAbsent(key, k -> {
+ initializedByCurrentThread.setTrue();
+ return getListOfTopics(namespaceName, mode).thenApplyAsync(list ->
{
+ return TopicList.filterSystemTopic(list);
+ }, pulsar.getExecutor());
+ });
+ if (initializedByCurrentThread.getValue()) {
+ queryRes.whenComplete((ignore, ex) -> {
+ inProgressQueryUserTopics.remove(key, queryRes);
+ });
+ }
+ return queryRes;
+ }
+
public CompletableFuture<List<String>> getAllPartitions(NamespaceName
namespaceName) {
return getPartitions(namespaceName, TopicDomain.persistent)
.thenCombine(getPartitions(namespaceName,
TopicDomain.non_persistent),
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 473dbecd0c7..1d8dbe6ae90 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2125,11 +2125,11 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (lookupSemaphore.tryAcquire()) {
isNamespaceOperationAllowed(namespaceName,
NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
if (isAuthorized) {
-
getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName,
mode)
+
getBrokerService().pulsar().getNamespaceService().getListOfUserTopics(namespaceName,
mode)
.thenAccept(topics -> {
boolean filterTopics = false;
// filter system topic
- List<String> filteredTopics =
TopicList.filterSystemTopic(topics);
+ List<String> filteredTopics = topics;
if (enableSubscriptionPatternEvaluation &&
topicsPattern.isPresent()) {
if (topicsPattern.get().length() <=
maxSubscriptionPatternLength) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 3b4f6a6f2e4..525f375db8e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -264,6 +264,8 @@ public class ServerCnxTest {
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any());
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics(
NamespaceName.get("use", "ns-abc"),
CommandGetTopicsOfNamespace.Mode.ALL);
+
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfUserTopics(
+ NamespaceName.get("use", "ns-abc"),
CommandGetTopicsOfNamespace.Mode.ALL);
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfPersistentTopics(
NamespaceName.get("use", "ns-abc"));