This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new ab6f1c4 [broker] Bug Fix: topic policy is not properly init if
namespace is loaded first. (#12833) (#13422)
ab6f1c4 is described below
commit ab6f1c4e1906d680ecbfe9611cea8aafdffa0aaa
Author: JiangHaiting <[email protected]>
AuthorDate: Tue Dec 21 19:23:56 2021 +0800
[broker] Bug Fix: topic policy is not properly init if namespace is loaded
first. (#12833) (#13422)
Cherry-pick #12833 to branch-2.9
Fix some conflicts in test case:
testTopicPolicyInitialValueWithNamespaceAlreadyLoaded
---
.../SystemTopicBasedTopicPoliciesService.java | 7 +++-
.../broker/service/TopicPoliciesService.java | 12 ++++++
.../broker/service/persistent/PersistentTopic.java | 16 +++++++-
.../pulsar/broker/admin/TopicPoliciesTest.java | 45 ++++++++++++++++++++++
4 files changed, 78 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 12ba88e..6662202 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -173,6 +173,11 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
@Override
+ public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
+ return
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
+ }
+
+ @Override
public CompletableFuture<TopicPolicies>
getTopicPoliciesBypassCacheAsync(TopicName topicName) {
CompletableFuture<TopicPolicies> result = new CompletableFuture<>();
createSystemTopicFactoryIfNeeded();
@@ -469,7 +474,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
@VisibleForTesting
- Boolean getPoliciesCacheInit(NamespaceName namespaceName) {
+ public Boolean getPoliciesCacheInit(NamespaceName namespaceName) {
return policyCacheInitMap.get(namespaceName);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index 48d2f1e..53de087 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -62,6 +62,13 @@ public interface TopicPoliciesService {
TopicPolicies getTopicPolicies(TopicName topicName) throws
TopicPoliciesCacheNotInitException;
/**
+ * Get policies from current cache.
+ * @param topicName topic name
+ * @return the topic policies
+ */
+ TopicPolicies getTopicPoliciesIfExists(TopicName topicName);
+
+ /**
* When getting TopicPolicies, if the initialization has not been
completed,
* we will go back off and try again until time out.
* @param topicName topic name
@@ -146,6 +153,11 @@ public interface TopicPoliciesService {
}
@Override
+ public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
+ return null;
+ }
+
+ @Override
public CompletableFuture<TopicPolicies>
getTopicPoliciesBypassCacheAsync(TopicName topicName) {
return CompletableFuture.completedFuture(null);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 053d72d..1b2e11f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -300,6 +300,7 @@ public class PersistentTopic extends AbstractTopic
@Override
public CompletableFuture<Void> initialize() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
+ futures.add(initTopicPolicy());
for (ManagedCursor cursor : ledger.getCursors()) {
if (cursor.getName().startsWith(replicatorPrefix)) {
String localCluster =
brokerService.pulsar().getConfiguration().getClusterName();
@@ -3083,7 +3084,9 @@ public class PersistentTopic extends AbstractTopic
subscriptions.forEach((subName, sub) -> {
sub.getConsumers().forEach(Consumer::checkPermissions);
Dispatcher dispatcher = sub.getDispatcher();
-
dispatcher.updateRateLimiter(policies.getSubscriptionDispatchRate());
+ if (dispatcher != null) {
+
dispatcher.updateRateLimiter(policies.getSubscriptionDispatchRate());
+ }
});
if (policies.getPublishRate() != null) {
@@ -3150,6 +3153,17 @@ public class PersistentTopic extends AbstractTopic
}
}
+ protected CompletableFuture<Void> initTopicPolicy() {
+ if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
+ &&
brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+ return CompletableFuture.completedFuture(null).thenRunAsync(() ->
onUpdate(
+ brokerService.getPulsar().getTopicPoliciesService()
+
.getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic))),
+ brokerService.getTopicOrderedExecutor());
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
private void registerTopicPolicyListener() {
if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
&&
brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 91de203..5047c0c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -44,7 +44,9 @@ import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BacklogQuotaManager;
+import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
+import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -63,6 +65,8 @@ import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.events.EventsTopicNames;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -126,6 +130,47 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
}
@Test
+ public void testTopicPolicyInitialValueWithNamespaceAlreadyLoaded() throws
Exception{
+ TopicName topicName = TopicName.get(
+ TopicDomain.persistent.value(),
+ NamespaceName.get(myNamespace),
+ "test-" + UUID.randomUUID()
+ );
+ String topic = topicName.toString();
+
+ SystemTopicBasedTopicPoliciesService policyService =
+ (SystemTopicBasedTopicPoliciesService)
pulsar.getTopicPoliciesService();
+
+ //set up topic with inactiveTopicPolicies.maxInactiveDurationSeconds =
100
+ InactiveTopicPolicies inactiveTopicPolicies =
+ new
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up,
100, true);
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topicPolicies().setInactiveTopicPolicies(topic,
inactiveTopicPolicies);
+
+ //wait until topic loaded with right policy value.
+ Awaitility.await().untilAsserted(()-> {
+ AbstractTopic topic1 = (AbstractTopic)
pulsar.getBrokerService().getTopic(topic, true).get().get();
+
assertEquals(topic1.getInactiveTopicPolicies().getMaxInactiveDurationSeconds(),
100);
+ });
+ //unload the topic
+
pulsar.getNamespaceService().unloadNamespaceBundle(pulsar.getNamespaceService().getBundle(topicName)).get();
+ assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
+
+ //load the nameserver, but topic is not init.
+ log.info("lookup:{}",admin.lookups().lookupTopic(topic));
+
assertTrue(pulsar.getBrokerService().isTopicNsOwnedByBroker(topicName));
+ assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
+ //make sure namespace policy reader is fully started.
+ Awaitility.await().untilAsserted(()-> {
+
assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()));
+ });
+
+ //load the topic.
+ AbstractTopic topic1 = (AbstractTopic)
pulsar.getBrokerService().getTopic(topic, true).get().get();
+
assertEquals(topic1.getInactiveTopicPolicies().getMaxInactiveDurationSeconds(),
100);
+ }
+
+ @Test
public void testSetSizeBasedBacklogQuota() throws Exception {
BacklogQuota backlogQuota = BacklogQuota.builder()