This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8314160 [broker] Bug Fix: topic policy is not properly init if
namespace is loaded first. (#12833)
8314160 is described below
commit 8314160b0ecd4a4cb7ab5de025b872c9d50cd002
Author: JiangHaiting <[email protected]>
AuthorDate: Thu Nov 25 22:01:26 2021 +0800
[broker] Bug Fix: topic policy is not properly init if namespace is loaded
first. (#12833)
---
.../SystemTopicBasedTopicPoliciesService.java | 7 +++-
.../broker/service/TopicPoliciesService.java | 12 ++++++
.../broker/service/persistent/PersistentTopic.java | 16 +++++++-
.../pulsar/broker/admin/TopicPoliciesTest.java | 44 ++++++++++++++++++++++
.../client/api/DispatcherBlockConsumerTest.java | 13 +++++++
5 files changed, 90 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 2a35edc..6f49e2b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -184,6 +184,11 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
@Override
+ public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
+ return
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
+ }
+
+ @Override
public CompletableFuture<TopicPolicies>
getTopicPoliciesBypassCacheAsync(TopicName topicName) {
CompletableFuture<TopicPolicies> result = new CompletableFuture<>();
createSystemTopicFactoryIfNeeded();
@@ -480,7 +485,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
@VisibleForTesting
- Boolean getPoliciesCacheInit(NamespaceName namespaceName) {
+ public Boolean getPoliciesCacheInit(NamespaceName namespaceName) {
return policyCacheInitMap.get(namespaceName);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index 98076c5..b233ae3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -64,6 +64,13 @@ public interface TopicPoliciesService {
TopicPolicies getTopicPolicies(TopicName topicName) throws
TopicPoliciesCacheNotInitException;
/**
+ * Get policies from current cache.
+ * @param topicName topic name
+ * @return the topic policies
+ */
+ TopicPolicies getTopicPoliciesIfExists(TopicName topicName);
+
+ /**
* When getting TopicPolicies, if the initialization has not been
completed,
* we will go back off and try again until time out.
* @param topicName topic name
@@ -140,6 +147,11 @@ public interface TopicPoliciesService {
}
@Override
+ public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
+ return null;
+ }
+
+ @Override
public CompletableFuture<TopicPolicies>
getTopicPoliciesBypassCacheAsync(TopicName topicName) {
return CompletableFuture.completedFuture(null);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index efebcec..5c94e61 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -302,6 +302,7 @@ public class PersistentTopic extends AbstractTopic
@Override
public CompletableFuture<Void> initialize() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
+ futures.add(initTopicPolicy());
for (ManagedCursor cursor : ledger.getCursors()) {
if (cursor.getName().startsWith(replicatorPrefix)) {
String localCluster =
brokerService.pulsar().getConfiguration().getClusterName();
@@ -3109,7 +3110,9 @@ public class PersistentTopic extends AbstractTopic
subscriptions.forEach((subName, sub) -> {
sub.getConsumers().forEach(Consumer::checkPermissions);
Dispatcher dispatcher = sub.getDispatcher();
-
dispatcher.updateRateLimiter(policies.getSubscriptionDispatchRate());
+ if (dispatcher != null) {
+
dispatcher.updateRateLimiter(policies.getSubscriptionDispatchRate());
+ }
});
if (policies.getPublishRate() != null) {
@@ -3173,6 +3176,17 @@ public class PersistentTopic extends AbstractTopic
}
}
+ protected CompletableFuture<Void> initTopicPolicy() {
+ if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
+ &&
brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+ return CompletableFuture.completedFuture(null).thenRunAsync(() ->
onUpdate(
+ brokerService.getPulsar().getTopicPoliciesService()
+
.getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic))),
+ brokerService.getTopicOrderedExecutor());
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
private void registerTopicPolicyListener() {
if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
&&
brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index e0e6b5c..21579bc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -44,7 +44,9 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
+import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -63,6 +65,8 @@ import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.events.EventsTopicNames;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -128,6 +132,46 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
}
@Test
+ public void testTopicPolicyInitialValueWithNamespaceAlreadyLoaded() throws
Exception{
+ TopicName topicName = TopicName.get(
+ TopicDomain.persistent.value(),
+ NamespaceName.get(myNamespace),
+ "test-" + UUID.randomUUID()
+ );
+ String topic = topicName.toString();
+
+ SystemTopicBasedTopicPoliciesService policyService =
+ (SystemTopicBasedTopicPoliciesService)
pulsar.getTopicPoliciesService();
+
+ //set up topic with maxSubscriptionsPerTopic = 10
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topicPolicies().setMaxSubscriptionsPerTopicAsync(topic,
10).get();
+
+ //wait until topic loaded with right policy value.
+ Awaitility.await().untilAsserted(()-> {
+ AbstractTopic topic1 = (AbstractTopic)
pulsar.getBrokerService().getTopic(topic, true).get().get();
+
assertEquals(topic1.getHierarchyTopicPolicies().getMaxSubscriptionsPerTopic().get(),
Integer.valueOf(10));
+ });
+ //unload the topic
+
pulsar.getNamespaceService().unloadNamespaceBundle(pulsar.getNamespaceService().getBundle(topicName)).get();
+ assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
+
+ //load the nameserver, but topic is not init.
+ log.info("lookup:{}",admin.lookups().lookupTopic(topic));
+
assertTrue(pulsar.getBrokerService().isTopicNsOwnedByBroker(topicName));
+ assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
+ //make sure namespace policy reader is fully started.
+ Awaitility.await().untilAsserted(()-> {
+
assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()));
+ });
+
+ //load the topic.
+ AbstractTopic topic1 = (AbstractTopic)
pulsar.getBrokerService().getTopic(topic, true).get().get();
+
assertEquals(topic1.getHierarchyTopicPolicies().getMaxSubscriptionsPerTopic().get(),
Integer.valueOf(10));
+ }
+
+
+ @Test
public void testSetSizeBasedBacklogQuota() throws Exception {
BacklogQuota backlogQuota = BacklogQuota.builder()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index 0f21e21..5c7708e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -31,6 +31,7 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -676,6 +677,8 @@ public class DispatcherBlockConsumerTest extends
ProducerConsumerBase {
public void testBlockBrokerDispatching() {
log.info("-- Starting {} test --", methodName);
+ List<Long> timestamps = new ArrayList<>();
+ timestamps.add(System.currentTimeMillis());
int unAckedMessages =
pulsar.getConfiguration().getMaxUnackedMessagesPerBroker();
double unAckedMessagePercentage = pulsar.getConfiguration()
.getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked();
@@ -734,6 +737,7 @@ public class DispatcherBlockConsumerTest extends
ProducerConsumerBase {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
+ timestamps.add(System.currentTimeMillis());
/*****
* (1) try to consume messages: without acking messages and
dispatcher will be blocked once it reaches
@@ -779,6 +783,7 @@ public class DispatcherBlockConsumerTest extends
ProducerConsumerBase {
String dispatcherName =
blockedDispatchers.values().get(0).getName();
String subName =
dispatcherName.substring(dispatcherName.lastIndexOf("/") + 2,
dispatcherName.length());
assertEquals(subName, subscriberName1);
+ timestamps.add(System.currentTimeMillis());
/**
* (2) However, other subscription2 should still be able to
consume messages until it reaches to
@@ -799,6 +804,7 @@ public class DispatcherBlockConsumerTest extends
ProducerConsumerBase {
// (2.b) It should receive only messages with limit of
maxUnackPerDispatcher
assertEquals(messages2.size(), maxUnAckPerDispatcher,
receiverQueueSize);
assertEquals(blockedDispatchers.size(), 2);
+ timestamps.add(System.currentTimeMillis());
/** (3) if Subscription3 is acking then it shouldn't be blocked **/
consumer1Sub3 = (ConsumerImpl<byte[]>)
pulsarClient.newConsumer().topic(topicName)
@@ -816,6 +822,7 @@ public class DispatcherBlockConsumerTest extends
ProducerConsumerBase {
}
assertEquals(consumedMsgsSub3, totalProducedMsgs);
assertEquals(blockedDispatchers.size(), 2);
+ timestamps.add(System.currentTimeMillis());
/** (4) try to ack messages from sub1 which should unblock broker
*/
messages1.forEach(consumer1Sub1::acknowledgeAsync);
@@ -833,6 +840,7 @@ public class DispatcherBlockConsumerTest extends
ProducerConsumerBase {
assertEquals(messages1.size(), totalProducedMsgs);
// it unblocks all consumers
assertEquals(blockedDispatchers.size(), 0);
+ timestamps.add(System.currentTimeMillis());
/** (5) try redelivery on sub2 consumer and verify to consume all
messages */
consumerSub2.redeliverUnacknowledgedMessages();
@@ -851,11 +859,16 @@ public class DispatcherBlockConsumerTest extends
ProducerConsumerBase {
}
latch.await();
assertEquals(msgReceivedCount.get(), totalProducedMsgs);
+ timestamps.add(System.currentTimeMillis());
consumer1Sub1.close();
consumerSub2.close();
consumer1Sub3.close();
+ for (int i = 1; i < timestamps.size(); i++) {
+ //log time cost for each step.
+ log.info("Step {} cost {}ms", i, timestamps.get(i) -
timestamps.get(i - 1));
+ }
log.info("-- Exiting {} test --", methodName);
} catch (Exception e) {
fail();