This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 3876c56 [Branch-2.7][Broker] Fix using partitioned topic name to get
topic policies (#11897)
3876c56 is described below
commit 3876c56e6a6a0614dd5d5fb11d91fdc9c8f4a259
Author: ran <[email protected]>
AuthorDate: Mon Sep 6 11:25:29 2021 +0800
[Branch-2.7][Broker] Fix using partitioned topic name to get topic policies
(#11897)
### Motivation
There is a bug that using the partitioned topic name to get topic policies.
The PR https://github.com/apache/pulsar/pull/11294 fix this issue, but it's
hard to cherry-pick the PR to `branch-2.7`, so create this PR to fix the issue
in `branch-2.7`.
This PR contains [PR-11294](https://github.com/apache/pulsar/pull/11294)
and [PR-11863](https://github.com/apache/pulsar/pull/11863).
### Modifications
1. Fix using the partitioned topic name to get topic policies.
2. Change some warning logs to debug level for the getting topic policies
operation.
### Verifying this change
The test method `TopicPoliciesTest#testBacklogQuotaWithPartitionedTopic` is
used to verify getting topic policies by the partitioned topic name.
---
.../pulsar/broker/service/AbstractTopic.java | 8 ++-----
.../pulsar/broker/service/BacklogQuotaManager.java | 6 +++++-
.../pulsar/broker/service/BrokerService.java | 16 ++++----------
.../SystemTopicBasedTopicPoliciesService.java | 4 ++--
.../pulsar/broker/admin/TopicPoliciesTest.java | 25 ++++++++++++++++++++++
.../pulsar/broker/service/BrokerServiceTest.java | 2 --
6 files changed, 38 insertions(+), 23 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 0d40ecb..23b2221 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -559,14 +559,10 @@ public abstract class AbstractTopic implements Topic {
* @return TopicPolicies is exist else return null.
*/
public TopicPolicies getTopicPolicies(TopicName topicName) {
- TopicName cloneTopicName = topicName;
- if (topicName.isPartitioned()) {
- cloneTopicName =
TopicName.get(topicName.getPartitionedTopicName());
- }
try {
- return
brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(cloneTopicName);
+ return
brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
- log.warn("Topic {} policies cache have not init.",
topicName.getPartitionedTopicName());
+ log.debug("Topic {} policies cache have not init.",
topicName.getPartitionedTopicName());
return null;
} catch (NullPointerException e) {
log.warn("Topic level policies are not enabled. " +
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 4b965fd..020efa1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -86,8 +86,12 @@ public class BacklogQuotaManager {
.map(TopicPolicies::getBackLogQuotaMap)
.map(map ->
map.get(BacklogQuotaType.destination_storage.name()))
.orElseGet(() ->
getBacklogQuota(topicName.getNamespace(),policyPath));
+ } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+ log.debug("Topic policies cache have not init, will apply the
namespace backlog quota: topicName={}",
+ topicName);
} catch (Exception e) {
- log.warn("Failed to read topic policies data, will apply the
namespace backlog quota: topicName={}", topicName, e);
+ log.error("Failed to read topic policies data, "
+ + "will apply the namespace backlog quota: topicName={}",
topicName, e);
}
return getBacklogQuota(topicName.getNamespace(),policyPath);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 4a18585..b7eca90 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1182,19 +1182,15 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
OffloadPolicies topicLevelOffloadPolicies = null;
if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
- TopicName cloneTopicName = topicName;
- if (topicName.isPartitioned()) {
- cloneTopicName =
TopicName.get(topicName.getPartitionedTopicName());
- }
try {
- TopicPolicies topicPolicies =
pulsar.getTopicPoliciesService().getTopicPolicies(cloneTopicName);
+ TopicPolicies topicPolicies =
pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
if (topicPolicies != null) {
persistencePolicies = topicPolicies.getPersistence();
retentionPolicies =
topicPolicies.getRetentionPolicies();
topicLevelOffloadPolicies =
topicPolicies.getOffloadPolicies();
}
} catch
(BrokerServiceException.TopicPoliciesCacheNotInitException e) {
- log.warn("Topic {} policies cache have not init.",
topicName);
+ log.debug("Topic {} policies have not been initialized
yet.", topicName);
}
}
@@ -2473,15 +2469,11 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
* @return TopicPolicies is exist else return null.
*/
public TopicPolicies getTopicPolicies(TopicName topicName) {
- TopicName cloneTopicName = topicName;
- if (topicName.isPartitioned()) {
- cloneTopicName =
TopicName.get(topicName.getPartitionedTopicName());
- }
try {
checkTopicLevelPolicyEnable();
- return
pulsar.getTopicPoliciesService().getTopicPolicies(cloneTopicName);
+ return
pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
- log.warn("Topic {} policies cache have not init.",
topicName.getPartitionedTopicName());
+ log.debug("Topic {} policies cache have not init.",
topicName.getPartitionedTopicName());
return null;
} catch (RestException | NullPointerException e) {
log.warn("Topic level policies are not enabled. " +
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 18388f1..3dc5a10 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -92,7 +92,7 @@ public class SystemTopicBasedTopicPoliciesService implements
TopicPoliciesServic
.domain(topicName.getDomain().toString())
.tenant(topicName.getTenant())
.namespace(topicName.getNamespaceObject().getLocalName())
- .topic(topicName.getLocalName())
+
.topic(TopicName.get(topicName.getPartitionedTopicName()).getLocalName())
.policies(policies)
.build())
.build()).whenComplete(((messageId, e) -> {
@@ -148,7 +148,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
&& !policyCacheInitMap.get(topicName.getNamespaceObject())) {
throw new TopicPoliciesCacheNotInitException();
}
- return policiesCache.get(topicName);
+ return
policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index a1f226f..efd1a33 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.testng.Assert;
@@ -233,6 +234,30 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
}
@Test
+ public void testBacklogQuotaWithPartitionedTopic() throws Exception {
+ final String topic = "persistent://" + myNamespace + "/test-" +
UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic, 3);
+
pulsarClient.newConsumer().topic(topic).subscriptionName("test").subscribe().close();
+ Awaitility.await()
+ .until(() ->
pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+ BacklogQuota backlogQuota = new BacklogQuota(1234,
BacklogQuota.RetentionPolicy.producer_exception);
+ admin.topics().setBacklogQuota(topic, backlogQuota);
+ Awaitility.await().atMost(3, TimeUnit.SECONDS)
+ .untilAsserted(() ->
Assert.assertEquals(admin.topics().getBacklogQuotaMap(topic)
+
.get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota));
+
+ TopicPolicies topicPolicies0 =
pulsar.getTopicPoliciesService().getTopicPolicies(
+ TopicName.get(topic).getPartition(0));
+ TopicPolicies topicPolicies1 =
pulsar.getTopicPoliciesService().getTopicPolicies(
+ TopicName.get(topic).getPartition(1));
+ Assert.assertEquals(topicPolicies0, topicPolicies1);
+ Assert.assertEquals(topicPolicies0.getBackLogQuotaMap()
+
.get(BacklogQuota.BacklogQuotaType.destination_storage.name()).getLimit(),
backlogQuota.getLimit());
+ Assert.assertEquals(topicPolicies0.getBackLogQuotaMap()
+
.get(BacklogQuota.BacklogQuotaType.destination_storage.name()).getPolicy(),
backlogQuota.getPolicy());
+ }
+
+ @Test
public void testCheckRetention() throws Exception {
BacklogQuota backlogQuota =
new BacklogQuota(10 * 1024 * 1024,
BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 214fb1e..9801311 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -593,8 +593,6 @@ public class BrokerServiceTest extends BrokerTestBase {
} catch (Exception e) {
fail("should not fail");
- } finally {
- pulsarClient.close();
}
}