This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 257efe9 fix retention policy in topic policy not work (#11021)
257efe9 is described below
commit 257efe9382361bc833e892e36ff357b3aaf9a145
Author: hangc0276 <[email protected]>
AuthorDate: Sat Jun 26 03:23:45 2021 +0800
fix retention policy in topic policy not work (#11021)
Retention policy in topic policy doesn't work due to retention policy
doesn't set into managed ledger configuration.
1. Set retention policy into managedLedger configuration on `onUpdate`
listener method.
2. Add test to cover this case.
(cherry picked from commit b7f8de40b358e0fc8c36774a14b9cb6863762056)
---
.../broker/service/persistent/PersistentTopic.java | 3 +++
.../pulsar/broker/admin/TopicPoliciesTest.java | 25 +++++++++++++++++++++-
2 files changed, 27 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d76fb28..bdc3fda 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2769,6 +2769,9 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
}
+
+ // update managed ledger config
+ checkPersistencePolicies();
}
private Optional<Policies> getNamespacePolicies() {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index efd1a33..0c27882 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -1571,7 +1571,7 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
}
}
- @Test
+ @Test(timeOut = 30000)
public void testSystemTopicShouldBeCompacted() throws Exception {
BacklogQuota backlogQuota = new BacklogQuota(1024,
BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
log.info("Backlog quota: {} will set to the topic: {}", backlogQuota,
testTopic);
@@ -1612,4 +1612,27 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
assertNull(NamespaceService.checkHeartbeatNamespace(topicName.getNamespaceObject()));
});
}
+
+ @Test(timeOut = 30000)
+ public void testTopicRetentionPolicySetInManagedLedgerConfig() throws
Exception {
+ RetentionPolicies nsRetentionPolicies = new RetentionPolicies(1, -1);
+ TopicName topicName = TopicName.get(testTopic);
+
+ // set and check retention policy on namespace level
+ admin.namespaces().setRetention(myNamespace, nsRetentionPolicies);
+ ManagedLedgerConfig managedLedgerConfig =
pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
+ Assert.assertEquals(managedLedgerConfig.getRetentionTimeMillis(),
+
TimeUnit.MINUTES.toMillis(nsRetentionPolicies.getRetentionTimeInMinutes()));
+ Assert.assertEquals(managedLedgerConfig.getRetentionSizeInMB(),
nsRetentionPolicies.getRetentionSizeInMB());
+
+ // set and check retention policy on topic level
+ RetentionPolicies topicRetentionPolicies = new RetentionPolicies(2,
-1);
+ admin.topics().setRetention(testTopic, topicRetentionPolicies);
+ Awaitility.await().untilAsserted(() -> {
+ ManagedLedgerConfig config =
pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
+ Assert.assertEquals(config.getRetentionTimeMillis(),
+
TimeUnit.MINUTES.toMillis(topicRetentionPolicies.getRetentionTimeInMinutes()));
+ Assert.assertEquals(config.getRetentionSizeInMB(),
topicRetentionPolicies.getRetentionSizeInMB());
+ });
+ }
}