This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new c4482dd0a18 [improve][broker] Do not retain the data in the system 
topic (#22022)
c4482dd0a18 is described below

commit c4482dd0a18ab8333fc0b8228b378f6ef0f264bc
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Feb 6 15:28:43 2024 +0800

    [improve][broker] Do not retain the data in the system topic (#22022)
    
    ### Motivation
    
    For some use case, the users need to store all the messages even though 
these message are acked by all subscription.
    So they set the retention policy of the namespace to infinite retention 
(setting both time and size limits to `-1`).  But the data in the system topic 
does not need for infinite retention.
    
    ### Modifications
    
    For system topics, do not retain messages that have already been 
acknowledged.
---
 .../pulsar/broker/service/BrokerService.java       | 15 +++++--
 .../pulsar/compaction/CompactionRetentionTest.java | 48 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 13bce3f67df..0a9d100bf7b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1771,10 +1771,17 @@ public class BrokerService implements Closeable {
             }
 
             if (retentionPolicies == null) {
-                retentionPolicies = policies.map(p -> 
p.retention_policies).orElseGet(
-                        () -> new 
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
-                                serviceConfig.getDefaultRetentionSizeInMB())
-                );
+                if (SystemTopicNames.isSystemTopic(topicName)) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("{} Disable data retention policy for system 
topic.", topicName);
+                    }
+                    retentionPolicies = new RetentionPolicies(0, 0);
+                } else {
+                    retentionPolicies = policies.map(p -> 
p.retention_policies).orElseGet(
+                            () -> new 
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
+                                    
serviceConfig.getDefaultRetentionSizeInMB())
+                    );
+                }
             }
 
             ManagedLedgerConfig managedLedgerConfig = new 
ManagedLedgerConfig();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
index 055c595fbfe..98bf2b819c2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -45,9 +46,13 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -212,6 +217,49 @@ public class CompactionRetentionTest extends 
MockedPulsarServiceBaseTest {
         );
     }
 
+    @Test
+    public void testRetentionPolicesForSystemTopic() throws Exception {
+        String namespace = "my-tenant/my-ns";
+        String topicPrefix = "persistent://" + namespace + "/";
+        admin.namespaces().setRetention(namespace, new RetentionPolicies(-1, 
-1));
+        // Check event topics and transaction internal topics.
+        for (String eventTopic : SystemTopicNames.EVENTS_TOPIC_NAMES) {
+            checkSystemTopicRetentionPolicy(topicPrefix + eventTopic);
+        }
+        checkSystemTopicRetentionPolicy(topicPrefix + 
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN);
+        checkSystemTopicRetentionPolicy(topicPrefix + 
SystemTopicNames.TRANSACTION_COORDINATOR_LOG);
+        checkSystemTopicRetentionPolicy(topicPrefix + 
SystemTopicNames.PENDING_ACK_STORE_SUFFIX);
+
+        // Check common topics.
+        checkCommonTopicRetentionPolicy(topicPrefix + "my-topic" + 
System.nanoTime());
+        // Specify retention policies for system topic.
+        pulsar.getConfiguration().setTopicLevelPoliciesEnabled(true);
+        pulsar.getConfiguration().setSystemTopicEnabled(true);
+        admin.topics().createNonPartitionedTopic(topicPrefix + 
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+        admin.topicPolicies().setRetention(topicPrefix + 
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT,
+                new RetentionPolicies(10, 10));
+        Awaitility.await().untilAsserted(() -> {
+            checkTopicRetentionPolicy(topicPrefix + 
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT,
+                    new RetentionPolicies(10, 10));
+        });
+    }
+
+    private void checkSystemTopicRetentionPolicy(String topicName) throws 
Exception {
+        checkTopicRetentionPolicy(topicName, new RetentionPolicies(0, 0));
+
+    }
+
+    private void checkCommonTopicRetentionPolicy(String topicName) throws 
Exception {
+        checkTopicRetentionPolicy(topicName, new RetentionPolicies(-1, -1));
+    }
+
+    private void checkTopicRetentionPolicy(String topicName, RetentionPolicies 
retentionPolicies) throws Exception {
+        ManagedLedgerConfig config = pulsar.getBrokerService()
+                .getManagedLedgerConfig(TopicName.get(topicName)).get();
+        Assert.assertEquals(config.getRetentionSizeInMB(), 
retentionPolicies.getRetentionSizeInMB());
+        
Assert.assertEquals(config.getRetentionTimeMillis(),retentionPolicies.getRetentionTimeInMinutes()
 * 60000L);
+    }
+
     private void testCompactionCursorRetention(String topic) throws Exception {
         Set<String> keys = Sets.newHashSet("a", "b", "c");
         Set<String> keysToExpire = Sets.newHashSet("x1", "x2");

Reply via email to