This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5a839582ac0 [improve] [broker] Improve CPU resources usege of 
TopicName Cache (#23052)
5a839582ac0 is described below

commit 5a839582ac08eb3be110de9b81f1cc7d245d4018
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 22 17:40:30 2024 +0800

    [improve] [broker] Improve CPU resources usege of TopicName Cache (#23052)
    
    Co-authored-by: Zixuan Liu <[email protected]>
    (cherry picked from commit 81aed6c75eba99fb62172b986b0c59e693e6f4b9)
---
 conf/broker.conf                                   |  8 ++++++
 .../apache/pulsar/broker/ServiceConfiguration.java | 15 ++++++++++
 .../pulsar/broker/service/BrokerService.java       | 10 +++++++
 .../apache/pulsar/broker/PulsarServiceTest.java    |  2 ++
 .../pulsar/broker/service/StandaloneTest.java      |  2 ++
 .../common/naming/ServiceConfigurationTest.java    | 13 +++++++++
 .../configurations/pulsar_broker_test.conf         |  2 ++
 .../pulsar_broker_test_standalone.conf             |  2 ++
 .../org/apache/pulsar/common/naming/TopicName.java | 33 +++++++++++-----------
 9 files changed, 70 insertions(+), 17 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index fd1eecf27d3..bc026fea9c9 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -158,6 +158,14 @@ skipBrokerShutdownOnOOM=false
 # Factory class-name to create topic with custom workflow
 topicFactoryClassName=
 
+# Max capacity of the topic name cache. -1 means unlimited cache; 0 means 
broker will clear all cache
+# per "maxSecondsToClearTopicNameCache", it does not mean broker will not 
cache TopicName.
+topicNameCacheMaxCapacity=100000
+
+# A Specifies the minimum number of seconds that the topic name stays in 
memory, to avoid clear cache frequently when
+# there are too many topics are in use.
+maxSecondsToClearTopicNameCache=7200
+
 # Enable backlog quota check. Enforces action on topic when the quota is 
reached
 backlogQuotaCheckEnabled=true
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 5087e5b51ac..a992170f130 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -586,6 +586,21 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private boolean backlogQuotaCheckEnabled = true;
 
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_POLICIES,
+        doc = "Max capacity of the topic name cache. -1 means unlimited cache; 
0 means broker will clear all cache"
+                + " per maxSecondsToClearTopicNameCache, it does not mean 
broker will not cache TopicName."
+    )
+    private int topicNameCacheMaxCapacity = 100_000;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "A Specifies the minimum number of seconds that the topic name 
stays in memory, to avoid clear cache"
+                + " frequently when there are too many topics are in use."
+    )
+    private int maxSecondsToClearTopicNameCache = 3600 * 2;
+
     @FieldContext(
             category = CATEGORY_POLICIES,
             doc = "Whether to enable precise time based backlog quota check. "
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index cee53e90678..de0750b695a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -582,6 +582,16 @@ public class BrokerService implements Closeable {
         this.updateBrokerDispatchThrottlingMaxRate();
         this.startCheckReplicationPolicies();
         this.startDeduplicationSnapshotMonitor();
+        this.startClearInvalidateTopicNameCacheTask();
+    }
+
+    protected void startClearInvalidateTopicNameCacheTask() {
+        final int maxSecondsToClearTopicNameCache = 
pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache();
+        inactivityMonitor.scheduleAtFixedRate(
+            () -> 
TopicName.clearIfReachedMaxCapacity(pulsar.getConfiguration().getTopicNameCacheMaxCapacity()),
+            maxSecondsToClearTopicNameCache,
+            maxSecondsToClearTopicNameCache,
+            TimeUnit.SECONDS);
     }
 
     protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int 
statsUpdateFrequencyInSecs) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index 3e0887646e1..a515890dd30 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -56,6 +56,8 @@ public class PulsarServiceTest extends 
MockedPulsarServiceBaseTest {
         super.doInitConf();
         conf.setBrokerServicePortTls(Optional.of(0));
         conf.setWebServicePortTls(Optional.of(0));
+        conf.setTopicNameCacheMaxCapacity(5000);
+        conf.setMaxSecondsToClearTopicNameCache(5);
         if (useStaticPorts) {
             conf.setBrokerServicePortTls(Optional.of(6651));
             conf.setBrokerServicePort(Optional.of(6660));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
index 5307e1a9ee8..67d188efd25 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
@@ -56,5 +56,7 @@ public class StandaloneTest extends 
MockedPulsarServiceBaseTest {
         assertNull(standalone.getConfig().getAdvertisedAddress());
         assertEquals(standalone.getConfig().getAdvertisedListeners(),
                 
"internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651");
+        
assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1);
+        assertEquals(standalone.getConfig().getTopicNameCacheMaxCapacity(), 
200);
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index 55971c15adf..ae13afb1934 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -73,6 +73,8 @@ public class ServiceConfigurationTest {
         assertEquals(config.getManagedLedgerDataReadPriority(), 
"bookkeeper-first");
         assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05);
         assertEquals(config.getHttpMaxRequestHeaderSize(), 1234);
+        assertEquals(config.getMaxSecondsToClearTopicNameCache(), 1);
+        assertEquals(config.getTopicNameCacheMaxCapacity(), 200);
         OffloadPoliciesImpl offloadPolicies = 
OffloadPoliciesImpl.create(config.getProperties());
         
assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(),
 "bookkeeper-first");
     }
@@ -370,4 +372,15 @@ public class ServiceConfigurationTest {
         conf = PulsarConfigurationLoader.create(properties, 
ServiceConfiguration.class);
         assertEquals(conf.getAllowAutoTopicCreationType(), 
TopicType.NON_PARTITIONED);
     }
+
+    @Test
+    public void testTopicNameCacheConfiguration() throws Exception {
+        ServiceConfiguration conf;
+        final Properties properties = new Properties();
+        properties.setProperty("maxSecondsToClearTopicNameCache", "2");
+        properties.setProperty("topicNameCacheMaxCapacity", "100");
+        conf = PulsarConfigurationLoader.create(properties, 
ServiceConfiguration.class);
+        assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 2);
+        assertEquals(conf.getTopicNameCacheMaxCapacity(), 100);
+    }
 }
diff --git 
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf 
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
index 36f5869d73d..551a9c88757 100644
--- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
+++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
@@ -103,3 +103,5 @@ transactionPendingAckBatchedWriteEnabled=true
 transactionPendingAckBatchedWriteMaxRecords=44
 transactionPendingAckBatchedWriteMaxSize=55
 transactionPendingAckBatchedWriteMaxDelayInMillis=66
+topicNameCacheMaxCapacity=200
+maxSecondsToClearTopicNameCache=1
diff --git 
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
 
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
index 0748418be63..e9aeed1a34d 100644
--- 
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
+++ 
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
@@ -94,3 +94,5 @@ 
brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up
 supportedNamespaceBundleSplitAlgorithms=[range_equally_divide]
 defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
 maxMessagePublishBufferSizeInMB=-1
+topicNameCacheMaxCapacity=200
+maxSecondsToClearTopicNameCache=1
\ No newline at end of file
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index dd3307b4fa1..e52b21aa355 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -19,16 +19,11 @@
 package org.apache.pulsar.common.naming;
 
 import com.google.common.base.Splitter;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.util.concurrent.UncheckedExecutionException;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.List;
 import java.util.Objects;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.util.Codec;
 
@@ -54,13 +49,17 @@ public class TopicName implements ServiceUnitId {
 
     private final int partitionIndex;
 
-    private static final LoadingCache<String, TopicName> cache = 
CacheBuilder.newBuilder().maximumSize(100000)
-            .expireAfterAccess(30, TimeUnit.MINUTES).build(new 
CacheLoader<String, TopicName>() {
-                @Override
-                public TopicName load(String name) throws Exception {
-                    return new TopicName(name);
-                }
-            });
+    private static final ConcurrentHashMap<String, TopicName> cache = new 
ConcurrentHashMap<>();
+
+    public static void clearIfReachedMaxCapacity(int maxCapacity) {
+        if (maxCapacity < 0) {
+            // Unlimited cache.
+            return;
+        }
+        if (cache.size() > maxCapacity) {
+            cache.clear();
+        }
+    }
 
     public static TopicName get(String domain, NamespaceName namespaceName, 
String topic) {
         String name = domain + "://" + namespaceName.toString() + '/' + topic;
@@ -79,11 +78,11 @@ public class TopicName implements ServiceUnitId {
     }
 
     public static TopicName get(String topic) {
-        try {
-            return cache.get(topic);
-        } catch (ExecutionException | UncheckedExecutionException e) {
-            throw (RuntimeException) e.getCause();
+        TopicName tp = cache.get(topic);
+        if (tp != null) {
+            return tp;
         }
+        return cache.computeIfAbsent(topic, k -> new TopicName(k));
     }
 
     public static TopicName getPartitionedTopicName(String topic) {

Reply via email to