This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 7f736e21be5 [improve] [broker] Improve CPU resources usege of
TopicName Cache (#23052)
7f736e21be5 is described below
commit 7f736e21be500e3ee48b102231796923c6d2984c
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 22 17:40:30 2024 +0800
[improve] [broker] Improve CPU resources usege of TopicName Cache (#23052)
Co-authored-by: Zixuan Liu <[email protected]>
(cherry picked from commit 81aed6c75eba99fb62172b986b0c59e693e6f4b9)
---
conf/broker.conf | 8 ++++++
.../apache/pulsar/broker/ServiceConfiguration.java | 15 ++++++++++
.../pulsar/broker/service/BrokerService.java | 10 +++++++
.../apache/pulsar/broker/PulsarServiceTest.java | 2 ++
.../pulsar/broker/service/StandaloneTest.java | 2 ++
.../common/naming/ServiceConfigurationTest.java | 13 +++++++++
.../configurations/pulsar_broker_test.conf | 2 ++
.../pulsar_broker_test_standalone.conf | 2 ++
.../org/apache/pulsar/common/naming/TopicName.java | 33 +++++++++++-----------
9 files changed, 70 insertions(+), 17 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index cab021246f5..5e85cf2c646 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -155,6 +155,14 @@ skipBrokerShutdownOnOOM=false
# Factory class-name to create topic with custom workflow
topicFactoryClassName=
+# Max capacity of the topic name cache. -1 means unlimited cache; 0 means
broker will clear all cache
+# per "maxSecondsToClearTopicNameCache", it does not mean broker will not
cache TopicName.
+topicNameCacheMaxCapacity=100000
+
+# A Specifies the minimum number of seconds that the topic name stays in
memory, to avoid clear cache frequently when
+# there are too many topics are in use.
+maxSecondsToClearTopicNameCache=7200
+
# Enable backlog quota check. Enforces action on topic when the quota is
reached
backlogQuotaCheckEnabled=true
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 9af3a070546..8fa2f4c47a8 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -547,6 +547,21 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private boolean backlogQuotaCheckEnabled = true;
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Max capacity of the topic name cache. -1 means unlimited cache;
0 means broker will clear all cache"
+ + " per maxSecondsToClearTopicNameCache, it does not mean
broker will not cache TopicName."
+ )
+ private int topicNameCacheMaxCapacity = 100_000;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "A Specifies the minimum number of seconds that the topic name
stays in memory, to avoid clear cache"
+ + " frequently when there are too many topics are in use."
+ )
+ private int maxSecondsToClearTopicNameCache = 3600 * 2;
+
@FieldContext(
category = CATEGORY_POLICIES,
doc = "Whether to enable precise time based backlog quota check. "
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index c0da47755b6..ba774b4f305 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -539,6 +539,16 @@ public class BrokerService implements Closeable {
this.updateBrokerDispatchThrottlingMaxRate();
this.startCheckReplicationPolicies();
this.startDeduplicationSnapshotMonitor();
+ this.startClearInvalidateTopicNameCacheTask();
+ }
+
+ protected void startClearInvalidateTopicNameCacheTask() {
+ final int maxSecondsToClearTopicNameCache =
pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache();
+ inactivityMonitor.scheduleAtFixedRate(
+ () ->
TopicName.clearIfReachedMaxCapacity(pulsar.getConfiguration().getTopicNameCacheMaxCapacity()),
+ maxSecondsToClearTopicNameCache,
+ maxSecondsToClearTopicNameCache,
+ TimeUnit.SECONDS);
}
protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int
statsUpdateFrequencyInSecs) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index ddb6402ad57..1fc324cf6fd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -55,6 +55,8 @@ public class PulsarServiceTest extends
MockedPulsarServiceBaseTest {
@Override
protected void doInitConf() throws Exception {
super.doInitConf();
+ conf.setTopicNameCacheMaxCapacity(5000);
+ conf.setMaxSecondsToClearTopicNameCache(5);
if (useStaticPorts) {
conf.setBrokerServicePortTls(Optional.of(6651));
conf.setBrokerServicePort(Optional.of(6660));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
index 9b7b02407d8..fe611496ecb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
@@ -56,5 +56,7 @@ public class StandaloneTest extends
MockedPulsarServiceBaseTest {
assertNull(standalone.getConfig().getAdvertisedAddress());
assertEquals(standalone.getConfig().getAdvertisedListeners(),
"internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651");
+
assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1);
+ assertEquals(standalone.getConfig().getTopicNameCacheMaxCapacity(),
200);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index 55d725b1810..f6f8a87094b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -71,6 +71,8 @@ public class ServiceConfigurationTest {
assertEquals(config.getMaxMessagePublishBufferSizeInMB(), -1);
assertEquals(config.getManagedLedgerDataReadPriority(),
"bookkeeper-first");
assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05);
+ assertEquals(config.getMaxSecondsToClearTopicNameCache(), 1);
+ assertEquals(config.getTopicNameCacheMaxCapacity(), 200);
OffloadPoliciesImpl offloadPolicies =
OffloadPoliciesImpl.create(config.getProperties());
assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(),
"bookkeeper-first");
}
@@ -323,4 +325,15 @@ public class ServiceConfigurationTest {
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxDelayInMillis(),
20);
}
}
+
+ @Test
+ public void testTopicNameCacheConfiguration() throws Exception {
+ ServiceConfiguration conf;
+ final Properties properties = new Properties();
+ properties.setProperty("maxSecondsToClearTopicNameCache", "2");
+ properties.setProperty("topicNameCacheMaxCapacity", "100");
+ conf = PulsarConfigurationLoader.create(properties,
ServiceConfiguration.class);
+ assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 2);
+ assertEquals(conf.getTopicNameCacheMaxCapacity(), 100);
+ }
}
diff --git
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
index 226b2f31a73..a5b787f429f 100644
--- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
+++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
@@ -102,3 +102,5 @@ transactionPendingAckBatchedWriteEnabled=true
transactionPendingAckBatchedWriteMaxRecords=44
transactionPendingAckBatchedWriteMaxSize=55
transactionPendingAckBatchedWriteMaxDelayInMillis=66
+topicNameCacheMaxCapacity=200
+maxSecondsToClearTopicNameCache=1
diff --git
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
index c733409fc00..65588b36f45 100644
---
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
+++
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
@@ -94,3 +94,5 @@
brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up
supportedNamespaceBundleSplitAlgorithms=[range_equally_divide]
defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
maxMessagePublishBufferSizeInMB=-1
+topicNameCacheMaxCapacity=200
+maxSecondsToClearTopicNameCache=1
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index 94e93d39865..73672774946 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -19,15 +19,10 @@
package org.apache.pulsar.common.naming;
import com.google.common.base.Splitter;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.util.concurrent.UncheckedExecutionException;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.List;
import java.util.Objects;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.util.Codec;
@@ -54,13 +49,17 @@ public class TopicName implements ServiceUnitId {
private final int partitionIndex;
- private static final LoadingCache<String, TopicName> cache =
CacheBuilder.newBuilder().maximumSize(100000)
- .expireAfterAccess(30, TimeUnit.MINUTES).build(new
CacheLoader<String, TopicName>() {
- @Override
- public TopicName load(String name) throws Exception {
- return new TopicName(name);
- }
- });
+ private static final ConcurrentHashMap<String, TopicName> cache = new
ConcurrentHashMap<>();
+
+ public static void clearIfReachedMaxCapacity(int maxCapacity) {
+ if (maxCapacity < 0) {
+ // Unlimited cache.
+ return;
+ }
+ if (cache.size() > maxCapacity) {
+ cache.clear();
+ }
+ }
public static TopicName get(String domain, NamespaceName namespaceName,
String topic) {
String name = domain + "://" + namespaceName.toString() + '/' + topic;
@@ -79,11 +78,11 @@ public class TopicName implements ServiceUnitId {
}
public static TopicName get(String topic) {
- try {
- return cache.get(topic);
- } catch (ExecutionException | UncheckedExecutionException e) {
- throw (RuntimeException) e.getCause();
+ TopicName tp = cache.get(topic);
+ if (tp != null) {
+ return tp;
}
+ return cache.computeIfAbsent(topic, k -> new TopicName(k));
}
public static TopicName getPartitionedTopicName(String topic) {