This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 81aed6c75eb [improve] [broker] Improve CPU resources usege of
TopicName Cache (#23052)
81aed6c75eb is described below
commit 81aed6c75eba99fb62172b986b0c59e693e6f4b9
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 22 17:40:30 2024 +0800
[improve] [broker] Improve CPU resources usege of TopicName Cache (#23052)
Co-authored-by: Zixuan Liu <[email protected]>
---
conf/broker.conf | 8 ++++++
.../apache/pulsar/broker/ServiceConfiguration.java | 15 ++++++++++
.../pulsar/broker/service/BrokerService.java | 10 +++++++
.../apache/pulsar/broker/PulsarServiceTest.java | 33 ++++++++++++++++++++++
.../pulsar/broker/service/StandaloneTest.java | 2 ++
.../common/naming/ServiceConfigurationTest.java | 13 +++++++++
.../configurations/pulsar_broker_test.conf | 2 ++
.../pulsar_broker_test_standalone.conf | 2 ++
.../org/apache/pulsar/common/naming/TopicName.java | 33 +++++++++++-----------
9 files changed, 101 insertions(+), 17 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index b715c4e515b..3c956bdd86d 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -159,6 +159,14 @@ skipBrokerShutdownOnOOM=false
# Factory class-name to create topic with custom workflow
topicFactoryClassName=
+# Max capacity of the topic name cache. -1 means unlimited cache; 0 means
broker will clear all cache
+# per "maxSecondsToClearTopicNameCache", it does not mean broker will not
cache TopicName.
+topicNameCacheMaxCapacity=100000
+
+# A Specifies the minimum number of seconds that the topic name stays in
memory, to avoid clear cache frequently when
+# there are too many topics are in use.
+maxSecondsToClearTopicNameCache=7200
+
# Enable backlog quota check. Enforces action on topic when the quota is
reached
backlogQuotaCheckEnabled=true
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index aba3ad3a669..2d2765287c0 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -594,6 +594,21 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private boolean backlogQuotaCheckEnabled = true;
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_POLICIES,
+ doc = "Max capacity of the topic name cache. -1 means unlimited cache;
0 means broker will clear all cache"
+ + " per maxSecondsToClearTopicNameCache, it does not mean
broker will not cache TopicName."
+ )
+ private int topicNameCacheMaxCapacity = 100_000;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "A Specifies the minimum number of seconds that the topic name
stays in memory, to avoid clear cache"
+ + " frequently when there are too many topics are in use."
+ )
+ private int maxSecondsToClearTopicNameCache = 3600 * 2;
+
@FieldContext(
category = CATEGORY_POLICIES,
doc = "Whether to enable precise time based backlog quota check. "
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 6ecd0a1ba60..c0f44838ac6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -625,6 +625,16 @@ public class BrokerService implements Closeable {
this.updateBrokerDispatchThrottlingMaxRate();
this.startCheckReplicationPolicies();
this.startDeduplicationSnapshotMonitor();
+ this.startClearInvalidateTopicNameCacheTask();
+ }
+
+ protected void startClearInvalidateTopicNameCacheTask() {
+ final int maxSecondsToClearTopicNameCache =
pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache();
+ inactivityMonitor.scheduleAtFixedRate(
+ () ->
TopicName.clearIfReachedMaxCapacity(pulsar.getConfiguration().getTopicNameCacheMaxCapacity()),
+ maxSecondsToClearTopicNameCache,
+ maxSecondsToClearTopicNameCache,
+ TimeUnit.SECONDS);
}
protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int
statsUpdateFrequencyInSecs) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index daa4393db55..3bbf423da6e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -24,11 +24,14 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertSame;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Optional;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.testng.annotations.AfterMethod;
@@ -56,6 +59,8 @@ public class PulsarServiceTest extends
MockedPulsarServiceBaseTest {
super.doInitConf();
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
+ conf.setTopicNameCacheMaxCapacity(5000);
+ conf.setMaxSecondsToClearTopicNameCache(5);
if (useStaticPorts) {
conf.setBrokerServicePortTls(Optional.of(6651));
conf.setBrokerServicePort(Optional.of(6660));
@@ -187,6 +192,34 @@ public class PulsarServiceTest extends
MockedPulsarServiceBaseTest {
assertEquals(pulsar.getWebServiceAddressTls(), "https://localhost:" +
pulsar.getWebService().getListenPortHTTPS().get());
}
+ @Test
+ public void testTopicCacheConfiguration() throws Exception {
+ cleanup();
+ setup();
+ assertEquals(conf.getTopicNameCacheMaxCapacity(), 5000);
+ assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 5);
+
+ List<TopicName> topicNameCached = new ArrayList<>();
+ for (int i = 0; i < 20; i++) {
+ topicNameCached.add(TopicName.get("public/default/tp_" + i));
+ }
+
+ // Verify: the cache does not clear since it is not larger than max
capacity.
+ Thread.sleep(10 * 1000);
+ for (int i = 0; i < 20; i++) {
+ assertTrue(topicNameCached.get(i) ==
TopicName.get("public/default/tp_" + i));
+ }
+
+ // Update max capacity.
+
admin.brokers().updateDynamicConfiguration("topicNameCacheMaxCapacity", "10");
+
+ // Verify: the cache were cleared.
+ Thread.sleep(10 * 1000);
+ for (int i = 0; i < 20; i++) {
+ assertFalse(topicNameCached.get(i) ==
TopicName.get("public/default/tp_" + i));
+ }
+ }
+
@Test
public void testBacklogAndRetentionCheck() throws PulsarServerException {
ServiceConfiguration config = new ServiceConfiguration();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
index b99f8d5338f..e95b9410f4d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
@@ -63,5 +63,7 @@ public class StandaloneTest {
assertEquals(standalone.getConfig().getAdvertisedListeners(),
"internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651");
assertEquals(standalone.getConfig().isDispatcherPauseOnAckStatePersistentEnabled(),
true);
+
assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1);
+ assertEquals(standalone.getConfig().getTopicNameCacheMaxCapacity(),
200);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index ebeaffc48e4..c64c54d2d19 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -74,6 +74,8 @@ public class ServiceConfigurationTest {
assertEquals(config.getBacklogQuotaDefaultLimitGB(), 0.05);
assertEquals(config.getHttpMaxRequestHeaderSize(), 1234);
assertEquals(config.isDispatcherPauseOnAckStatePersistentEnabled(),
true);
+ assertEquals(config.getMaxSecondsToClearTopicNameCache(), 1);
+ assertEquals(config.getTopicNameCacheMaxCapacity(), 200);
OffloadPoliciesImpl offloadPolicies =
OffloadPoliciesImpl.create(config.getProperties());
assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(),
"bookkeeper-first");
}
@@ -375,4 +377,15 @@ public class ServiceConfigurationTest {
conf = PulsarConfigurationLoader.create(properties,
ServiceConfiguration.class);
assertEquals(conf.getAllowAutoTopicCreationType(),
TopicType.NON_PARTITIONED);
}
+
+ @Test
+ public void testTopicNameCacheConfiguration() throws Exception {
+ ServiceConfiguration conf;
+ final Properties properties = new Properties();
+ properties.setProperty("maxSecondsToClearTopicNameCache", "2");
+ properties.setProperty("topicNameCacheMaxCapacity", "100");
+ conf = PulsarConfigurationLoader.create(properties,
ServiceConfiguration.class);
+ assertEquals(conf.getMaxSecondsToClearTopicNameCache(), 2);
+ assertEquals(conf.getTopicNameCacheMaxCapacity(), 100);
+ }
}
diff --git
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
index ddda30d0a4b..f344a3e3f63 100644
--- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
+++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
@@ -104,3 +104,5 @@ transactionPendingAckBatchedWriteEnabled=true
transactionPendingAckBatchedWriteMaxRecords=44
transactionPendingAckBatchedWriteMaxSize=55
transactionPendingAckBatchedWriteMaxDelayInMillis=66
+topicNameCacheMaxCapacity=200
+maxSecondsToClearTopicNameCache=1
diff --git
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
index 812c8dc9748..c520512e77b 100644
---
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
+++
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
@@ -95,3 +95,5 @@ supportedNamespaceBundleSplitAlgorithms=[range_equally_divide]
defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
maxMessagePublishBufferSizeInMB=-1
dispatcherPauseOnAckStatePersistentEnabled=true
+topicNameCacheMaxCapacity=200
+maxSecondsToClearTopicNameCache=1
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index d264eab9574..dd24c9a9712 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -19,16 +19,11 @@
package org.apache.pulsar.common.naming;
import com.google.common.base.Splitter;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.util.concurrent.UncheckedExecutionException;
import com.google.re2j.Pattern;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.List;
import java.util.Objects;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.util.Codec;
@@ -54,13 +49,17 @@ public class TopicName implements ServiceUnitId {
private final int partitionIndex;
- private static final LoadingCache<String, TopicName> cache =
CacheBuilder.newBuilder().maximumSize(100000)
- .expireAfterAccess(30, TimeUnit.MINUTES).build(new
CacheLoader<String, TopicName>() {
- @Override
- public TopicName load(String name) throws Exception {
- return new TopicName(name);
- }
- });
+ private static final ConcurrentHashMap<String, TopicName> cache = new
ConcurrentHashMap<>();
+
+ public static void clearIfReachedMaxCapacity(int maxCapacity) {
+ if (maxCapacity < 0) {
+ // Unlimited cache.
+ return;
+ }
+ if (cache.size() > maxCapacity) {
+ cache.clear();
+ }
+ }
public static TopicName get(String domain, NamespaceName namespaceName,
String topic) {
String name = domain + "://" + namespaceName.toString() + '/' + topic;
@@ -79,11 +78,11 @@ public class TopicName implements ServiceUnitId {
}
public static TopicName get(String topic) {
- try {
- return cache.get(topic);
- } catch (ExecutionException | UncheckedExecutionException e) {
- throw (RuntimeException) e.getCause();
+ TopicName tp = cache.get(topic);
+ if (tp != null) {
+ return tp;
}
+ return cache.computeIfAbsent(topic, k -> new TopicName(k));
}
public static TopicName getPartitionedTopicName(String topic) {