This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5df97b41a43 [improve][broker] Do not retain the data in the system 
topic (#22022)
5df97b41a43 is described below

commit 5df97b41a431a714a448c198c312e98aa4a084e8
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Feb 6 15:28:43 2024 +0800

    [improve][broker] Do not retain the data in the system topic (#22022)
    
    ### Motivation
    
    For some use case, the users need to store all the messages even though 
these message are acked by all subscription.
    So they set the retention policy of the namespace to infinite retention 
(setting both time and size limits to `-1`).  But the data in the system topic 
does not need for infinite retention.
    
    ### Modifications
    
    For system topics, do not retain messages that have already been 
acknowledged.
---
 .../pulsar/broker/service/BrokerService.java       | 15 +++++--
 .../pulsar/compaction/CompactionRetentionTest.java | 48 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0383c63b1f3..5d75f8b917f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1779,10 +1779,17 @@ public class BrokerService implements Closeable {
             }
 
             if (retentionPolicies == null) {
-                retentionPolicies = policies.map(p -> 
p.retention_policies).orElseGet(
-                        () -> new 
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
-                                serviceConfig.getDefaultRetentionSizeInMB())
-                );
+                if (SystemTopicNames.isSystemTopic(topicName)) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("{} Disable data retention policy for system 
topic.", topicName);
+                    }
+                    retentionPolicies = new RetentionPolicies(0, 0);
+                } else {
+                    retentionPolicies = policies.map(p -> 
p.retention_policies).orElseGet(
+                            () -> new 
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
+                                    
serviceConfig.getDefaultRetentionSizeInMB())
+                    );
+                }
             }
 
             ManagedLedgerConfig managedLedgerConfig = new 
ManagedLedgerConfig();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
index 055c595fbfe..98bf2b819c2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -45,9 +46,13 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -212,6 +217,49 @@ public class CompactionRetentionTest extends 
MockedPulsarServiceBaseTest {
         );
     }
 
+    @Test
+    public void testRetentionPolicesForSystemTopic() throws Exception {
+        String namespace = "my-tenant/my-ns";
+        String topicPrefix = "persistent://" + namespace + "/";
+        admin.namespaces().setRetention(namespace, new RetentionPolicies(-1, 
-1));
+        // Check event topics and transaction internal topics.
+        for (String eventTopic : SystemTopicNames.EVENTS_TOPIC_NAMES) {
+            checkSystemTopicRetentionPolicy(topicPrefix + eventTopic);
+        }
+        checkSystemTopicRetentionPolicy(topicPrefix + 
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN);
+        checkSystemTopicRetentionPolicy(topicPrefix + 
SystemTopicNames.TRANSACTION_COORDINATOR_LOG);
+        checkSystemTopicRetentionPolicy(topicPrefix + 
SystemTopicNames.PENDING_ACK_STORE_SUFFIX);
+
+        // Check common topics.
+        checkCommonTopicRetentionPolicy(topicPrefix + "my-topic" + 
System.nanoTime());
+        // Specify retention policies for system topic.
+        pulsar.getConfiguration().setTopicLevelPoliciesEnabled(true);
+        pulsar.getConfiguration().setSystemTopicEnabled(true);
+        admin.topics().createNonPartitionedTopic(topicPrefix + 
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+        admin.topicPolicies().setRetention(topicPrefix + 
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT,
+                new RetentionPolicies(10, 10));
+        Awaitility.await().untilAsserted(() -> {
+            checkTopicRetentionPolicy(topicPrefix + 
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT,
+                    new RetentionPolicies(10, 10));
+        });
+    }
+
+    private void checkSystemTopicRetentionPolicy(String topicName) throws 
Exception {
+        checkTopicRetentionPolicy(topicName, new RetentionPolicies(0, 0));
+
+    }
+
+    private void checkCommonTopicRetentionPolicy(String topicName) throws 
Exception {
+        checkTopicRetentionPolicy(topicName, new RetentionPolicies(-1, -1));
+    }
+
+    private void checkTopicRetentionPolicy(String topicName, RetentionPolicies 
retentionPolicies) throws Exception {
+        ManagedLedgerConfig config = pulsar.getBrokerService()
+                .getManagedLedgerConfig(TopicName.get(topicName)).get();
+        Assert.assertEquals(config.getRetentionSizeInMB(), 
retentionPolicies.getRetentionSizeInMB());
+        
Assert.assertEquals(config.getRetentionTimeMillis(),retentionPolicies.getRetentionTimeInMinutes()
 * 60000L);
+    }
+
     private void testCompactionCursorRetention(String topic) throws Exception {
         Set<String> keys = Sets.newHashSet("a", "b", "c");
         Set<String> keysToExpire = Sets.newHashSet("x1", "x2");

Reply via email to