This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 257efe9  fix retention policy in topic policy not work (#11021)
257efe9 is described below

commit 257efe9382361bc833e892e36ff357b3aaf9a145
Author: hangc0276 <[email protected]>
AuthorDate: Sat Jun 26 03:23:45 2021 +0800

    fix retention policy in topic policy not work (#11021)
    
    Retention policy in topic policy doesn't work due to retention policy 
doesn't set into managed ledger configuration.
    
    1. Set retention policy into managedLedger configuration on `onUpdate` 
listener method.
    2. Add test to cover this case.
    
    (cherry picked from commit b7f8de40b358e0fc8c36774a14b9cb6863762056)
---
 .../broker/service/persistent/PersistentTopic.java |  3 +++
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 25 +++++++++++++++++++++-
 2 files changed, 27 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d76fb28..bdc3fda 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2769,6 +2769,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
                 
subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
         }
+
+        // update managed ledger config
+        checkPersistencePolicies();
     }
 
     private Optional<Policies> getNamespacePolicies() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index efd1a33..0c27882 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -1571,7 +1571,7 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
-    @Test
+   @Test(timeOut = 30000)
     public void testSystemTopicShouldBeCompacted() throws Exception {
         BacklogQuota backlogQuota = new BacklogQuota(1024, 
BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
         log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, 
testTopic);
@@ -1612,4 +1612,27 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
             
assertNull(NamespaceService.checkHeartbeatNamespace(topicName.getNamespaceObject()));
         });
     }
+
+    @Test(timeOut = 30000)
+    public void testTopicRetentionPolicySetInManagedLedgerConfig() throws 
Exception {
+        RetentionPolicies nsRetentionPolicies = new RetentionPolicies(1, -1);
+        TopicName topicName = TopicName.get(testTopic);
+
+        // set and check retention policy on namespace level
+        admin.namespaces().setRetention(myNamespace, nsRetentionPolicies);
+        ManagedLedgerConfig managedLedgerConfig = 
pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
+        Assert.assertEquals(managedLedgerConfig.getRetentionTimeMillis(),
+                
TimeUnit.MINUTES.toMillis(nsRetentionPolicies.getRetentionTimeInMinutes()));
+        Assert.assertEquals(managedLedgerConfig.getRetentionSizeInMB(), 
nsRetentionPolicies.getRetentionSizeInMB());
+
+        // set and check retention policy on topic level
+        RetentionPolicies topicRetentionPolicies = new RetentionPolicies(2, 
-1);
+        admin.topics().setRetention(testTopic, topicRetentionPolicies);
+        Awaitility.await().untilAsserted(() -> {
+            ManagedLedgerConfig config = 
pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
+            Assert.assertEquals(config.getRetentionTimeMillis(),
+                    
TimeUnit.MINUTES.toMillis(topicRetentionPolicies.getRetentionTimeInMinutes()));
+            Assert.assertEquals(config.getRetentionSizeInMB(), 
topicRetentionPolicies.getRetentionSizeInMB());
+        });
+    }
 }

Reply via email to