This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8798392  [broker] Optimize TopicPolicies#compactionThreshold with 
HierarchyTopicPolicies (#13710)
8798392 is described below

commit 8798392c1b5978d82bb9a8ea9b3027cb570568f9
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Fri Jan 14 11:50:33 2022 +0800

    [broker] Optimize TopicPolicies#compactionThreshold with 
HierarchyTopicPolicies (#13710)
---
 .../org/apache/pulsar/broker/service/AbstractTopic.java   |  3 +++
 .../pulsar/broker/service/persistent/PersistentTopic.java | 15 +--------------
 .../apache/pulsar/broker/service/PersistentTopicTest.java |  9 ++++++---
 .../common/policies/data/HierarchyTopicPolicies.java      |  2 ++
 4 files changed, 12 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 08f0388..95f661a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -173,6 +173,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         
topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
         
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
         
topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
+        
topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
     }
 
     protected void updateTopicPolicyByNamespacePolicy(Policies 
namespacePolicies) {
@@ -182,6 +183,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         if (namespacePolicies.deleted) {
             return;
         }
+        
topicPolicies.getCompactionThreshold().updateNamespaceValue(namespacePolicies.compaction_threshold);
         topicPolicies.getReplicationClusters().updateNamespaceValue(
                 
Lists.newArrayList(CollectionUtils.emptyIfNull(namespacePolicies.replication_clusters)));
         
topicPolicies.getMessageTTLInSeconds().updateNamespaceValue(namespacePolicies.message_ttl_in_seconds);
@@ -230,6 +232,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         
topicPolicies.getMessageTTLInSeconds().updateBrokerValue(config.getTtlDurationDefaultInSeconds());
         
topicPolicies.getDelayedDeliveryEnabled().updateBrokerValue(config.isDelayedDeliveryEnabled());
         
topicPolicies.getDelayedDeliveryTickTimeMillis().updateBrokerValue(config.getDelayedDeliveryTickTimeMillis());
+        
topicPolicies.getCompactionThreshold().updateBrokerValue(config.getBrokerServiceCompactionThresholdInBytes());
     }
 
     private EnumSet<SubType> subTypeStringsToEnumSet(Set<String> 
getSubscriptionTypesEnabled) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 46edd82..d7924f6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1457,20 +1457,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     public void checkCompaction() {
         TopicName name = TopicName.get(topic);
         try {
-            Long compactionThreshold = getTopicPolicies()
-                .map(TopicPolicies::getCompactionThreshold)
-                .orElse(null);
-            if (compactionThreshold == null) {
-                Policies policies = 
brokerService.pulsar().getPulsarResources().getNamespaceResources()
-                        .getPolicies(name.getNamespaceObject())
-                        .orElseThrow(() -> new 
MetadataStoreException.NotFoundException());
-                compactionThreshold = policies.compaction_threshold;
-            }
-            if (compactionThreshold == null) {
-                compactionThreshold = brokerService.pulsar().getConfiguration()
-                        .getBrokerServiceCompactionThresholdInBytes();
-            }
-
+            long compactionThreshold = 
topicPolicies.getCompactionThreshold().get();
             if (isSystemTopic() || compactionThreshold != 0
                 && currentCompaction.isDone()) {
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 95e9741..c25c4dc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -1873,9 +1873,10 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
         NamespaceResources nsr = 
pulsar.getPulsarResources().getNamespaceResources();
         NamespaceName ns = 
TopicName.get(successTopicName).getNamespaceObject();
-        doReturn(Optional.of(policies)).when(nsr).getPolicies(ns);
+        
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(nsr).getPoliciesAsync(ns);
 
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        topic.initialize().get();
 
         topic.checkCompaction();
 
@@ -1907,9 +1908,10 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
         NamespaceResources nsr = 
pulsar.getPulsarResources().getNamespaceResources();
         NamespaceName ns = 
TopicName.get(successTopicName).getNamespaceObject();
-        doReturn(Optional.of(policies)).when(nsr).getPolicies(ns);
+        
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(nsr).getPoliciesAsync(ns);
 
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        topic.initialize().get();
 
         topic.checkCompaction();
 
@@ -1936,11 +1938,12 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
         NamespaceResources nsr = 
pulsar.getPulsarResources().getNamespaceResources();
         NamespaceName ns = 
TopicName.get(successTopicName).getNamespaceObject();
-        doReturn(Optional.of(policies)).when(nsr).getPolicies(ns);
+        
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(nsr).getPoliciesAsync(ns);
 
         doReturn(1000L).when(ledgerMock).getEstimatedBacklogSize();
 
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        topic.initialize().get();
         topic.checkCompaction();
         verify(compactor, times(0)).compact(anyString());
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
index 7b2b5e9..820320b 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
@@ -41,6 +41,7 @@ public class HierarchyTopicPolicies {
     final Map<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>> 
backLogQuotaMap;
     final PolicyHierarchyValue<Integer> topicMaxMessageSize;
     final PolicyHierarchyValue<Integer> messageTTLInSeconds;
+    final PolicyHierarchyValue<Long> compactionThreshold;
     final PolicyHierarchyValue<Integer> maxConsumerPerTopic;
     final PolicyHierarchyValue<Boolean> delayedDeliveryEnabled;
     final PolicyHierarchyValue<Long> delayedDeliveryTickTimeMillis;
@@ -64,5 +65,6 @@ public class HierarchyTopicPolicies {
         messageTTLInSeconds = new PolicyHierarchyValue<>();
         delayedDeliveryEnabled = new PolicyHierarchyValue<>();
         delayedDeliveryTickTimeMillis = new PolicyHierarchyValue<>();
+        compactionThreshold = new PolicyHierarchyValue<>();
     }
 }

Reply via email to