This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8314160  [broker] Bug Fix: topic policy is not properly init if 
namespace is loaded first. (#12833)
8314160 is described below

commit 8314160b0ecd4a4cb7ab5de025b872c9d50cd002
Author: JiangHaiting <[email protected]>
AuthorDate: Thu Nov 25 22:01:26 2021 +0800

    [broker] Bug Fix: topic policy is not properly init if namespace is loaded 
first. (#12833)
---
 .../SystemTopicBasedTopicPoliciesService.java      |  7 +++-
 .../broker/service/TopicPoliciesService.java       | 12 ++++++
 .../broker/service/persistent/PersistentTopic.java | 16 +++++++-
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 44 ++++++++++++++++++++++
 .../client/api/DispatcherBlockConsumerTest.java    | 13 +++++++
 5 files changed, 90 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 2a35edc..6f49e2b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -184,6 +184,11 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     }
 
     @Override
+    public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
+        return 
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
+    }
+
+    @Override
     public CompletableFuture<TopicPolicies> 
getTopicPoliciesBypassCacheAsync(TopicName topicName) {
         CompletableFuture<TopicPolicies> result = new CompletableFuture<>();
         createSystemTopicFactoryIfNeeded();
@@ -480,7 +485,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     }
 
     @VisibleForTesting
-    Boolean getPoliciesCacheInit(NamespaceName namespaceName) {
+    public Boolean getPoliciesCacheInit(NamespaceName namespaceName) {
         return policyCacheInitMap.get(namespaceName);
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index 98076c5..b233ae3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -64,6 +64,13 @@ public interface TopicPoliciesService {
     TopicPolicies getTopicPolicies(TopicName topicName) throws 
TopicPoliciesCacheNotInitException;
 
     /**
+     * Get policies from current cache.
+     * @param topicName topic name
+     * @return the topic policies
+     */
+    TopicPolicies getTopicPoliciesIfExists(TopicName topicName);
+
+    /**
      * When getting TopicPolicies, if the initialization has not been 
completed,
      * we will go back off and try again until time out.
      * @param topicName topic name
@@ -140,6 +147,11 @@ public interface TopicPoliciesService {
         }
 
         @Override
+        public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
+            return null;
+        }
+
+        @Override
         public CompletableFuture<TopicPolicies> 
getTopicPoliciesBypassCacheAsync(TopicName topicName) {
             return CompletableFuture.completedFuture(null);
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index efebcec..5c94e61 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -302,6 +302,7 @@ public class PersistentTopic extends AbstractTopic
     @Override
     public CompletableFuture<Void> initialize() {
         List<CompletableFuture<Void>> futures = new ArrayList<>();
+        futures.add(initTopicPolicy());
         for (ManagedCursor cursor : ledger.getCursors()) {
             if (cursor.getName().startsWith(replicatorPrefix)) {
                 String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
@@ -3109,7 +3110,9 @@ public class PersistentTopic extends AbstractTopic
         subscriptions.forEach((subName, sub) -> {
             sub.getConsumers().forEach(Consumer::checkPermissions);
             Dispatcher dispatcher = sub.getDispatcher();
-            
dispatcher.updateRateLimiter(policies.getSubscriptionDispatchRate());
+            if (dispatcher != null) {
+                
dispatcher.updateRateLimiter(policies.getSubscriptionDispatchRate());
+            }
         });
 
         if (policies.getPublishRate() != null) {
@@ -3173,6 +3176,17 @@ public class PersistentTopic extends AbstractTopic
         }
     }
 
+    protected CompletableFuture<Void> initTopicPolicy() {
+        if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
+                && 
brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+            return CompletableFuture.completedFuture(null).thenRunAsync(() -> 
onUpdate(
+                            brokerService.getPulsar().getTopicPoliciesService()
+                                    
.getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic))),
+                    brokerService.getTopicOrderedExecutor());
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
     private void registerTopicPolicyListener() {
         if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
                 && 
brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index e0e6b5c..21579bc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -44,7 +44,9 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.pulsar.broker.ConfigHelper;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
+import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -63,6 +65,8 @@ import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.events.EventsTopicNames;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -128,6 +132,46 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testTopicPolicyInitialValueWithNamespaceAlreadyLoaded() throws 
Exception{
+        TopicName topicName = TopicName.get(
+                TopicDomain.persistent.value(),
+                NamespaceName.get(myNamespace),
+                "test-" + UUID.randomUUID()
+        );
+        String topic = topicName.toString();
+
+        SystemTopicBasedTopicPoliciesService policyService =
+                (SystemTopicBasedTopicPoliciesService) 
pulsar.getTopicPoliciesService();
+
+        //set up topic with maxSubscriptionsPerTopic = 10
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topicPolicies().setMaxSubscriptionsPerTopicAsync(topic, 
10).get();
+
+        //wait until topic loaded with right policy value.
+        Awaitility.await().untilAsserted(()-> {
+            AbstractTopic topic1 = (AbstractTopic) 
pulsar.getBrokerService().getTopic(topic, true).get().get();
+            
assertEquals(topic1.getHierarchyTopicPolicies().getMaxSubscriptionsPerTopic().get(),
 Integer.valueOf(10));
+        });
+        //unload the topic
+        
pulsar.getNamespaceService().unloadNamespaceBundle(pulsar.getNamespaceService().getBundle(topicName)).get();
+        assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
+
+        //load the nameserver, but topic is not init.
+        log.info("lookup:{}",admin.lookups().lookupTopic(topic));
+        
assertTrue(pulsar.getBrokerService().isTopicNsOwnedByBroker(topicName));
+        assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
+        //make sure namespace policy reader is fully started.
+        Awaitility.await().untilAsserted(()-> {
+            
assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()));
+        });
+
+        //load the topic.
+        AbstractTopic topic1 = (AbstractTopic) 
pulsar.getBrokerService().getTopic(topic, true).get().get();
+        
assertEquals(topic1.getHierarchyTopicPolicies().getMaxSubscriptionsPerTopic().get(),
 Integer.valueOf(10));
+    }
+
+
+    @Test
     public void testSetSizeBasedBacklogQuota() throws Exception {
 
         BacklogQuota backlogQuota = BacklogQuota.builder()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index 0f21e21..5c7708e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -31,6 +31,7 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Queues;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -676,6 +677,8 @@ public class DispatcherBlockConsumerTest extends 
ProducerConsumerBase {
     public void testBlockBrokerDispatching() {
         log.info("-- Starting {} test --", methodName);
 
+        List<Long> timestamps = new ArrayList<>();
+        timestamps.add(System.currentTimeMillis());
         int unAckedMessages = 
pulsar.getConfiguration().getMaxUnackedMessagesPerBroker();
         double unAckedMessagePercentage = pulsar.getConfiguration()
                 .getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked();
@@ -734,6 +737,7 @@ public class DispatcherBlockConsumerTest extends 
ProducerConsumerBase {
                 String message = "my-message-" + i;
                 producer.send(message.getBytes());
             }
+            timestamps.add(System.currentTimeMillis());
 
             /*****
              * (1) try to consume messages: without acking messages and 
dispatcher will be blocked once it reaches
@@ -779,6 +783,7 @@ public class DispatcherBlockConsumerTest extends 
ProducerConsumerBase {
             String dispatcherName = 
blockedDispatchers.values().get(0).getName();
             String subName = 
dispatcherName.substring(dispatcherName.lastIndexOf("/") + 2, 
dispatcherName.length());
             assertEquals(subName, subscriberName1);
+            timestamps.add(System.currentTimeMillis());
 
             /**
              * (2) However, other subscription2 should still be able to 
consume messages until it reaches to
@@ -799,6 +804,7 @@ public class DispatcherBlockConsumerTest extends 
ProducerConsumerBase {
             // (2.b) It should receive only messages with limit of 
maxUnackPerDispatcher
             assertEquals(messages2.size(), maxUnAckPerDispatcher, 
receiverQueueSize);
             assertEquals(blockedDispatchers.size(), 2);
+            timestamps.add(System.currentTimeMillis());
 
             /** (3) if Subscription3 is acking then it shouldn't be blocked **/
             consumer1Sub3 = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topicName)
@@ -816,6 +822,7 @@ public class DispatcherBlockConsumerTest extends 
ProducerConsumerBase {
             }
             assertEquals(consumedMsgsSub3, totalProducedMsgs);
             assertEquals(blockedDispatchers.size(), 2);
+            timestamps.add(System.currentTimeMillis());
 
             /** (4) try to ack messages from sub1 which should unblock broker 
*/
             messages1.forEach(consumer1Sub1::acknowledgeAsync);
@@ -833,6 +840,7 @@ public class DispatcherBlockConsumerTest extends 
ProducerConsumerBase {
             assertEquals(messages1.size(), totalProducedMsgs);
             // it unblocks all consumers
             assertEquals(blockedDispatchers.size(), 0);
+            timestamps.add(System.currentTimeMillis());
 
             /** (5) try redelivery on sub2 consumer and verify to consume all 
messages */
             consumerSub2.redeliverUnacknowledgedMessages();
@@ -851,11 +859,16 @@ public class DispatcherBlockConsumerTest extends 
ProducerConsumerBase {
             }
             latch.await();
             assertEquals(msgReceivedCount.get(), totalProducedMsgs);
+            timestamps.add(System.currentTimeMillis());
 
             consumer1Sub1.close();
             consumerSub2.close();
             consumer1Sub3.close();
 
+            for (int i = 1; i < timestamps.size(); i++) {
+                //log time cost for each step.
+                log.info("Step {} cost {}ms", i, timestamps.get(i) - 
timestamps.get(i - 1));
+            }
             log.info("-- Exiting {} test --", methodName);
         } catch (Exception e) {
             fail();

Reply via email to