This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8798392 [broker] Optimize TopicPolicies#compactionThreshold with
HierarchyTopicPolicies (#13710)
8798392 is described below
commit 8798392c1b5978d82bb9a8ea9b3027cb570568f9
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Fri Jan 14 11:50:33 2022 +0800
[broker] Optimize TopicPolicies#compactionThreshold with
HierarchyTopicPolicies (#13710)
---
.../org/apache/pulsar/broker/service/AbstractTopic.java | 3 +++
.../pulsar/broker/service/persistent/PersistentTopic.java | 15 +--------------
.../apache/pulsar/broker/service/PersistentTopicTest.java | 9 ++++++---
.../common/policies/data/HierarchyTopicPolicies.java | 2 ++
4 files changed, 12 insertions(+), 17 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 08f0388..95f661a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -173,6 +173,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
+
topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
}
protected void updateTopicPolicyByNamespacePolicy(Policies
namespacePolicies) {
@@ -182,6 +183,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
if (namespacePolicies.deleted) {
return;
}
+
topicPolicies.getCompactionThreshold().updateNamespaceValue(namespacePolicies.compaction_threshold);
topicPolicies.getReplicationClusters().updateNamespaceValue(
Lists.newArrayList(CollectionUtils.emptyIfNull(namespacePolicies.replication_clusters)));
topicPolicies.getMessageTTLInSeconds().updateNamespaceValue(namespacePolicies.message_ttl_in_seconds);
@@ -230,6 +232,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
topicPolicies.getMessageTTLInSeconds().updateBrokerValue(config.getTtlDurationDefaultInSeconds());
topicPolicies.getDelayedDeliveryEnabled().updateBrokerValue(config.isDelayedDeliveryEnabled());
topicPolicies.getDelayedDeliveryTickTimeMillis().updateBrokerValue(config.getDelayedDeliveryTickTimeMillis());
+
topicPolicies.getCompactionThreshold().updateBrokerValue(config.getBrokerServiceCompactionThresholdInBytes());
}
private EnumSet<SubType> subTypeStringsToEnumSet(Set<String>
getSubscriptionTypesEnabled) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 46edd82..d7924f6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1457,20 +1457,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
public void checkCompaction() {
TopicName name = TopicName.get(topic);
try {
- Long compactionThreshold = getTopicPolicies()
- .map(TopicPolicies::getCompactionThreshold)
- .orElse(null);
- if (compactionThreshold == null) {
- Policies policies =
brokerService.pulsar().getPulsarResources().getNamespaceResources()
- .getPolicies(name.getNamespaceObject())
- .orElseThrow(() -> new
MetadataStoreException.NotFoundException());
- compactionThreshold = policies.compaction_threshold;
- }
- if (compactionThreshold == null) {
- compactionThreshold = brokerService.pulsar().getConfiguration()
- .getBrokerServiceCompactionThresholdInBytes();
- }
-
+ long compactionThreshold =
topicPolicies.getCompactionThreshold().get();
if (isSystemTopic() || compactionThreshold != 0
&& currentCompaction.isDone()) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 95e9741..c25c4dc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -1873,9 +1873,10 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
NamespaceResources nsr =
pulsar.getPulsarResources().getNamespaceResources();
NamespaceName ns =
TopicName.get(successTopicName).getNamespaceObject();
- doReturn(Optional.of(policies)).when(nsr).getPolicies(ns);
+
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(nsr).getPoliciesAsync(ns);
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
+ topic.initialize().get();
topic.checkCompaction();
@@ -1907,9 +1908,10 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
NamespaceResources nsr =
pulsar.getPulsarResources().getNamespaceResources();
NamespaceName ns =
TopicName.get(successTopicName).getNamespaceObject();
- doReturn(Optional.of(policies)).when(nsr).getPolicies(ns);
+
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(nsr).getPoliciesAsync(ns);
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
+ topic.initialize().get();
topic.checkCompaction();
@@ -1936,11 +1938,12 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
NamespaceResources nsr =
pulsar.getPulsarResources().getNamespaceResources();
NamespaceName ns =
TopicName.get(successTopicName).getNamespaceObject();
- doReturn(Optional.of(policies)).when(nsr).getPolicies(ns);
+
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(nsr).getPoliciesAsync(ns);
doReturn(1000L).when(ledgerMock).getEstimatedBacklogSize();
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
+ topic.initialize().get();
topic.checkCompaction();
verify(compactor, times(0)).compact(anyString());
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
index 7b2b5e9..820320b 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
@@ -41,6 +41,7 @@ public class HierarchyTopicPolicies {
final Map<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>>
backLogQuotaMap;
final PolicyHierarchyValue<Integer> topicMaxMessageSize;
final PolicyHierarchyValue<Integer> messageTTLInSeconds;
+ final PolicyHierarchyValue<Long> compactionThreshold;
final PolicyHierarchyValue<Integer> maxConsumerPerTopic;
final PolicyHierarchyValue<Boolean> delayedDeliveryEnabled;
final PolicyHierarchyValue<Long> delayedDeliveryTickTimeMillis;
@@ -64,5 +65,6 @@ public class HierarchyTopicPolicies {
messageTTLInSeconds = new PolicyHierarchyValue<>();
delayedDeliveryEnabled = new PolicyHierarchyValue<>();
delayedDeliveryTickTimeMillis = new PolicyHierarchyValue<>();
+ compactionThreshold = new PolicyHierarchyValue<>();
}
}