This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new ab6f1c4  [broker] Bug Fix: topic policy is not properly init if 
namespace is loaded first. (#12833) (#13422)
ab6f1c4 is described below

commit ab6f1c4e1906d680ecbfe9611cea8aafdffa0aaa
Author: JiangHaiting <[email protected]>
AuthorDate: Tue Dec 21 19:23:56 2021 +0800

    [broker] Bug Fix: topic policy is not properly init if namespace is loaded 
first. (#12833) (#13422)
    
    Cherry-pick #12833 to branch-2.9
    Fix some conflicts in test case: 
testTopicPolicyInitialValueWithNamespaceAlreadyLoaded
---
 .../SystemTopicBasedTopicPoliciesService.java      |  7 +++-
 .../broker/service/TopicPoliciesService.java       | 12 ++++++
 .../broker/service/persistent/PersistentTopic.java | 16 +++++++-
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 45 ++++++++++++++++++++++
 4 files changed, 78 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 12ba88e..6662202 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -173,6 +173,11 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     }
 
     @Override
+    public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
+        return 
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
+    }
+
+    @Override
     public CompletableFuture<TopicPolicies> 
getTopicPoliciesBypassCacheAsync(TopicName topicName) {
         CompletableFuture<TopicPolicies> result = new CompletableFuture<>();
         createSystemTopicFactoryIfNeeded();
@@ -469,7 +474,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     }
 
     @VisibleForTesting
-    Boolean getPoliciesCacheInit(NamespaceName namespaceName) {
+    public Boolean getPoliciesCacheInit(NamespaceName namespaceName) {
         return policyCacheInitMap.get(namespaceName);
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index 48d2f1e..53de087 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -62,6 +62,13 @@ public interface TopicPoliciesService {
     TopicPolicies getTopicPolicies(TopicName topicName) throws 
TopicPoliciesCacheNotInitException;
 
     /**
+     * Get policies from current cache.
+     * @param topicName topic name
+     * @return the topic policies
+     */
+    TopicPolicies getTopicPoliciesIfExists(TopicName topicName);
+
+    /**
      * When getting TopicPolicies, if the initialization has not been 
completed,
      * we will go back off and try again until time out.
      * @param topicName topic name
@@ -146,6 +153,11 @@ public interface TopicPoliciesService {
         }
 
         @Override
+        public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
+            return null;
+        }
+
+        @Override
         public CompletableFuture<TopicPolicies> 
getTopicPoliciesBypassCacheAsync(TopicName topicName) {
             return CompletableFuture.completedFuture(null);
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 053d72d..1b2e11f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -300,6 +300,7 @@ public class PersistentTopic extends AbstractTopic
     @Override
     public CompletableFuture<Void> initialize() {
         List<CompletableFuture<Void>> futures = new ArrayList<>();
+        futures.add(initTopicPolicy());
         for (ManagedCursor cursor : ledger.getCursors()) {
             if (cursor.getName().startsWith(replicatorPrefix)) {
                 String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
@@ -3083,7 +3084,9 @@ public class PersistentTopic extends AbstractTopic
         subscriptions.forEach((subName, sub) -> {
             sub.getConsumers().forEach(Consumer::checkPermissions);
             Dispatcher dispatcher = sub.getDispatcher();
-            
dispatcher.updateRateLimiter(policies.getSubscriptionDispatchRate());
+            if (dispatcher != null) {
+                
dispatcher.updateRateLimiter(policies.getSubscriptionDispatchRate());
+            }
         });
 
         if (policies.getPublishRate() != null) {
@@ -3150,6 +3153,17 @@ public class PersistentTopic extends AbstractTopic
         }
     }
 
+    protected CompletableFuture<Void> initTopicPolicy() {
+        if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
+                && 
brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+            return CompletableFuture.completedFuture(null).thenRunAsync(() -> 
onUpdate(
+                            brokerService.getPulsar().getTopicPoliciesService()
+                                    
.getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic))),
+                    brokerService.getTopicOrderedExecutor());
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
     private void registerTopicPolicyListener() {
         if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
                 && 
brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 91de203..5047c0c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -44,7 +44,9 @@ import org.apache.pulsar.broker.ConfigHelper;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BacklogQuotaManager;
+import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
+import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -63,6 +65,8 @@ import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.events.EventsTopicNames;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -126,6 +130,47 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testTopicPolicyInitialValueWithNamespaceAlreadyLoaded() throws 
Exception{
+        TopicName topicName = TopicName.get(
+                TopicDomain.persistent.value(),
+                NamespaceName.get(myNamespace),
+                "test-" + UUID.randomUUID()
+        );
+        String topic = topicName.toString();
+
+        SystemTopicBasedTopicPoliciesService policyService =
+                (SystemTopicBasedTopicPoliciesService) 
pulsar.getTopicPoliciesService();
+
+        //set up topic with inactiveTopicPolicies.maxInactiveDurationSeconds = 
100
+        InactiveTopicPolicies inactiveTopicPolicies =
+                new 
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up,
 100, true);
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topicPolicies().setInactiveTopicPolicies(topic, 
inactiveTopicPolicies);
+
+        //wait until topic loaded with right policy value.
+        Awaitility.await().untilAsserted(()-> {
+            AbstractTopic topic1 = (AbstractTopic) 
pulsar.getBrokerService().getTopic(topic, true).get().get();
+            
assertEquals(topic1.getInactiveTopicPolicies().getMaxInactiveDurationSeconds(), 
100);
+        });
+        //unload the topic
+        
pulsar.getNamespaceService().unloadNamespaceBundle(pulsar.getNamespaceService().getBundle(topicName)).get();
+        assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
+
+        //load the nameserver, but topic is not init.
+        log.info("lookup:{}",admin.lookups().lookupTopic(topic));
+        
assertTrue(pulsar.getBrokerService().isTopicNsOwnedByBroker(topicName));
+        assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
+        //make sure namespace policy reader is fully started.
+        Awaitility.await().untilAsserted(()-> {
+            
assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()));
+        });
+
+        //load the topic.
+        AbstractTopic topic1 = (AbstractTopic) 
pulsar.getBrokerService().getTopic(topic, true).get().get();
+        
assertEquals(topic1.getInactiveTopicPolicies().getMaxInactiveDurationSeconds(), 
100);
+    }
+
+    @Test
     public void testSetSizeBasedBacklogQuota() throws Exception {
 
         BacklogQuota backlogQuota = BacklogQuota.builder()

Reply via email to