This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 81aed6c75eb [improve] [broker] Improve CPU resources usege of 
TopicName Cache (#23052)
81aed6c75eb is described below

commit 81aed6c75eba99fb62172b986b0c59e693e6f4b9
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 22 17:40:30 2024 +0800

    [improve] [broker] Improve CPU resources usege of TopicName Cache (#23052)
    
    Co-authored-by: Zixuan Liu <[email protected]>
---
 conf/broker.conf                                   |  8 ++++++
 .../apache/pulsar/broker/ServiceConfiguration.java | 15 ++++++++++
 .../pulsar/broker/service/BrokerService.java       | 10 +++++++
 .../apache/pulsar/broker/PulsarServiceTest.java    | 33 ++++++++++++++++++++++
 .../pulsar/broker/service/StandaloneTest.java      |  2 ++
 .../common/naming/ServiceConfigurationTest.java    | 13 +++++++++
 .../configurations/pulsar_broker_test.conf         |  2 ++
 .../pulsar_broker_test_standalone.conf             |  2 ++
 .../org/apache/pulsar/common/naming/TopicName.java | 33 +++++++++++-----------
 9 files changed, 101 insertions(+), 17 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index b715c4e515b..3c956bdd86d 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -159,6 +159,14 @@ skipBrokerShutdownOnOOM=false
 # Factory class-name to create topic with custom workflow
 topicFactoryClassName=
 
+# Max capacity of the topic name cache. -1 means unlimited cache; 0 means 
broker will clear all cache
+# per "maxSecondsToClearTopicNameCache", it does not mean broker will not 
cache TopicName.
+topicNameCacheMaxCapacity=100000
+
+# A Specifies the minimum number of seconds that the topic name stays in 
memory, to avoid clear cache frequently when
+# there are too many topics are in use.
+maxSecondsToClearTopicNameCache=7200
+
 # Enable backlog quota check. Enforces action on topic when the quota is 
reached
 backlogQuotaCheckEnabled=true
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index aba3ad3a669..2d2765287c0 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -594,6 +594,21 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private boolean backlogQuotaCheckEnabled = true;
 
+    @FieldContext(
+        dynamic = true,
+        category = CATEGORY_POLICIES,
+        doc = "Max capacity of the topic name cache. -1 means unlimited cache; 
0 means broker will clear all cache"
+                + " per maxSecondsToClearTopicNameCache, it does not mean 
broker will not cache TopicName."
+    )
+    private int topicNameCacheMaxCapacity = 100_000;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "A Specifies the minimum number of seconds that the topic name 
stays in memory, to avoid clear cache"
+                + " frequently when there are too many topics are in use."
+    )
+    private int maxSecondsToClearTopicNameCache = 3600 * 2;
+
     @FieldContext(
             category = CATEGORY_POLICIES,
             doc = "Whether to enable precise time based backlog quota check. "
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 6ecd0a1ba60..c0f44838ac6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -625,6 +625,16 @@ public class BrokerService implements Closeable {
         this.updateBrokerDispatchThrottlingMaxRate();
         this.startCheckReplicationPolicies();
         this.startDeduplicationSnapshotMonitor();
+        this.startClearInvalidateTopicNameCacheTask();
+    }
+
+    protected void startClearInvalidateTopicNameCacheTask() {
+        final int maxSecondsToClearTopicNameCache = 
pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache();
+        inactivityMonitor.scheduleAtFixedRate(
+            () -> 
TopicName.clearIfReachedMaxCapacity(pulsar.getConfiguration().getTopicNameCacheMaxCapacity()),
+            maxSecondsToClearTopicNameCache,
+            maxSecondsToClearTopicNameCache,
+            TimeUnit.SECONDS);
     }
 
     protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int 
statsUpdateFrequencyInSecs) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index daa4393db55..3bbf423da6e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -24,11 +24,14 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.AssertJUnit.assertSame;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.testng.annotations.AfterMethod;
@@ -56,6 +59,8 @@ public class PulsarServiceTest extends 
MockedPulsarServiceBaseTest {
         super.doInitConf();
         conf.setBrokerServicePortTls(Optional.of(0));
         conf.setWebServicePortTls(Optional.of(0));
+        conf.setTopicNameCacheMaxCapacity(5000);
+        conf.setMaxSecondsToClearTopicNameCache(5);
         if (useStaticPorts) {
             conf.setBrokerServicePortTls(Optional.of(6651));
             conf.setBrokerServicePort(Optional.of(6660));
@@ -187,6 +192,34 @@ public class PulsarServiceTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(pulsar.getWebServiceAddressTls(), "https://localhost:"; + 
pulsar.getWebService().getListenPortHTTPS().get());
     }
 
+    @Test
+    public void testTopicCacheConfiguration() throws Exception {
+        cleanup();
+        setup();
+        assertEquals(conf.getTopicNameCacheMaxCapacity(), 5000);
+        assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 5);
+
+        List<TopicName> topicNameCached = new ArrayList<>();
+        for (int i = 0; i < 20; i++) {
+            topicNameCached.add(TopicName.get("public/default/tp_" + i));
+        }
+
+        // Verify: the cache does not clear since it is not larger than max 
capacity.
+        Thread.sleep(10 * 1000);
+        for (int i = 0; i < 20; i++) {
+            assertTrue(topicNameCached.get(i) == 
TopicName.get("public/default/tp_" + i));
+        }
+
+        // Update max capacity.
+        
admin.brokers().updateDynamicConfiguration("topicNameCacheMaxCapacity", "10");
+
+        // Verify: the cache were cleared.
+        Thread.sleep(10 * 1000);
+        for (int i = 0; i < 20; i++) {
+            assertFalse(topicNameCached.get(i) == 
TopicName.get("public/default/tp_" + i));
+        }
+    }
+
     @Test
     public void testBacklogAndRetentionCheck() throws PulsarServerException {
         ServiceConfiguration config = new ServiceConfiguration();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
index b99f8d5338f..e95b9410f4d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
@@ -63,5 +63,7 @@ public class StandaloneTest {
         assertEquals(standalone.getConfig().getAdvertisedListeners(),
                 
"internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651");
         
assertEquals(standalone.getConfig().isDispatcherPauseOnAckStatePersistentEnabled(),
 true);
+        
assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1);
+        assertEquals(standalone.getConfig().getTopicNameCacheMaxCapacity(), 
200);
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index ebeaffc48e4..c64c54d2d19 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -74,6 +74,8 @@ public class ServiceConfigurationTest {
         assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05);
         assertEquals(config.getHttpMaxRequestHeaderSize(), 1234);
         assertEquals(config.isDispatcherPauseOnAckStatePersistentEnabled(), 
true);
+        assertEquals(config.getMaxSecondsToClearTopicNameCache(), 1);
+        assertEquals(config.getTopicNameCacheMaxCapacity(), 200);
         OffloadPoliciesImpl offloadPolicies = 
OffloadPoliciesImpl.create(config.getProperties());
         
assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(),
 "bookkeeper-first");
     }
@@ -375,4 +377,15 @@ public class ServiceConfigurationTest {
         conf = PulsarConfigurationLoader.create(properties, 
ServiceConfiguration.class);
         assertEquals(conf.getAllowAutoTopicCreationType(), 
TopicType.NON_PARTITIONED);
     }
+
+    @Test
+    public void testTopicNameCacheConfiguration() throws Exception {
+        ServiceConfiguration conf;
+        final Properties properties = new Properties();
+        properties.setProperty("maxSecondsToClearTopicNameCache", "2");
+        properties.setProperty("topicNameCacheMaxCapacity", "100");
+        conf = PulsarConfigurationLoader.create(properties, 
ServiceConfiguration.class);
+        assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 2);
+        assertEquals(conf.getTopicNameCacheMaxCapacity(), 100);
+    }
 }
diff --git 
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf 
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
index ddda30d0a4b..f344a3e3f63 100644
--- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
+++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
@@ -104,3 +104,5 @@ transactionPendingAckBatchedWriteEnabled=true
 transactionPendingAckBatchedWriteMaxRecords=44
 transactionPendingAckBatchedWriteMaxSize=55
 transactionPendingAckBatchedWriteMaxDelayInMillis=66
+topicNameCacheMaxCapacity=200
+maxSecondsToClearTopicNameCache=1
diff --git 
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
 
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
index 812c8dc9748..c520512e77b 100644
--- 
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
+++ 
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
@@ -95,3 +95,5 @@ supportedNamespaceBundleSplitAlgorithms=[range_equally_divide]
 defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
 maxMessagePublishBufferSizeInMB=-1
 dispatcherPauseOnAckStatePersistentEnabled=true
+topicNameCacheMaxCapacity=200
+maxSecondsToClearTopicNameCache=1
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index d264eab9574..dd24c9a9712 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -19,16 +19,11 @@
 package org.apache.pulsar.common.naming;
 
 import com.google.common.base.Splitter;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.util.concurrent.UncheckedExecutionException;
 import com.google.re2j.Pattern;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.List;
 import java.util.Objects;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.util.Codec;
 
@@ -54,13 +49,17 @@ public class TopicName implements ServiceUnitId {
 
     private final int partitionIndex;
 
-    private static final LoadingCache<String, TopicName> cache = 
CacheBuilder.newBuilder().maximumSize(100000)
-            .expireAfterAccess(30, TimeUnit.MINUTES).build(new 
CacheLoader<String, TopicName>() {
-                @Override
-                public TopicName load(String name) throws Exception {
-                    return new TopicName(name);
-                }
-            });
+    private static final ConcurrentHashMap<String, TopicName> cache = new 
ConcurrentHashMap<>();
+
+    public static void clearIfReachedMaxCapacity(int maxCapacity) {
+        if (maxCapacity < 0) {
+            // Unlimited cache.
+            return;
+        }
+        if (cache.size() > maxCapacity) {
+            cache.clear();
+        }
+    }
 
     public static TopicName get(String domain, NamespaceName namespaceName, 
String topic) {
         String name = domain + "://" + namespaceName.toString() + '/' + topic;
@@ -79,11 +78,11 @@ public class TopicName implements ServiceUnitId {
     }
 
     public static TopicName get(String topic) {
-        try {
-            return cache.get(topic);
-        } catch (ExecutionException | UncheckedExecutionException e) {
-            throw (RuntimeException) e.getCause();
+        TopicName tp = cache.get(topic);
+        if (tp != null) {
+            return tp;
         }
+        return cache.computeIfAbsent(topic, k -> new TopicName(k));
     }
 
     public static TopicName getPartitionedTopicName(String topic) {

Reply via email to