This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new c4482dd0a18 [improve][broker] Do not retain the data in the system
topic (#22022)
c4482dd0a18 is described below
commit c4482dd0a18ab8333fc0b8228b378f6ef0f264bc
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Feb 6 15:28:43 2024 +0800
[improve][broker] Do not retain the data in the system topic (#22022)
### Motivation
For some use case, the users need to store all the messages even though
these message are acked by all subscription.
So they set the retention policy of the namespace to infinite retention
(setting both time and size limits to `-1`). But the data in the system topic
does not need for infinite retention.
### Modifications
For system topics, do not retain messages that have already been
acknowledged.
---
.../pulsar/broker/service/BrokerService.java | 15 +++++--
.../pulsar/compaction/CompactionRetentionTest.java | 48 ++++++++++++++++++++++
2 files changed, 59 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 13bce3f67df..0a9d100bf7b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1771,10 +1771,17 @@ public class BrokerService implements Closeable {
}
if (retentionPolicies == null) {
- retentionPolicies = policies.map(p ->
p.retention_policies).orElseGet(
- () -> new
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
- serviceConfig.getDefaultRetentionSizeInMB())
- );
+ if (SystemTopicNames.isSystemTopic(topicName)) {
+ if (log.isDebugEnabled()) {
+ log.debug("{} Disable data retention policy for system
topic.", topicName);
+ }
+ retentionPolicies = new RetentionPolicies(0, 0);
+ } else {
+ retentionPolicies = policies.map(p ->
p.retention_policies).orElseGet(
+ () -> new
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
+
serviceConfig.getDefaultRetentionSizeInMB())
+ );
+ }
}
ManagedLedgerConfig managedLedgerConfig = new
ManagedLedgerConfig();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
index 055c595fbfe..98bf2b819c2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -45,9 +46,13 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -212,6 +217,49 @@ public class CompactionRetentionTest extends
MockedPulsarServiceBaseTest {
);
}
+ @Test
+ public void testRetentionPolicesForSystemTopic() throws Exception {
+ String namespace = "my-tenant/my-ns";
+ String topicPrefix = "persistent://" + namespace + "/";
+ admin.namespaces().setRetention(namespace, new RetentionPolicies(-1,
-1));
+ // Check event topics and transaction internal topics.
+ for (String eventTopic : SystemTopicNames.EVENTS_TOPIC_NAMES) {
+ checkSystemTopicRetentionPolicy(topicPrefix + eventTopic);
+ }
+ checkSystemTopicRetentionPolicy(topicPrefix +
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN);
+ checkSystemTopicRetentionPolicy(topicPrefix +
SystemTopicNames.TRANSACTION_COORDINATOR_LOG);
+ checkSystemTopicRetentionPolicy(topicPrefix +
SystemTopicNames.PENDING_ACK_STORE_SUFFIX);
+
+ // Check common topics.
+ checkCommonTopicRetentionPolicy(topicPrefix + "my-topic" +
System.nanoTime());
+ // Specify retention policies for system topic.
+ pulsar.getConfiguration().setTopicLevelPoliciesEnabled(true);
+ pulsar.getConfiguration().setSystemTopicEnabled(true);
+ admin.topics().createNonPartitionedTopic(topicPrefix +
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+ admin.topicPolicies().setRetention(topicPrefix +
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT,
+ new RetentionPolicies(10, 10));
+ Awaitility.await().untilAsserted(() -> {
+ checkTopicRetentionPolicy(topicPrefix +
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT,
+ new RetentionPolicies(10, 10));
+ });
+ }
+
+ private void checkSystemTopicRetentionPolicy(String topicName) throws
Exception {
+ checkTopicRetentionPolicy(topicName, new RetentionPolicies(0, 0));
+
+ }
+
+ private void checkCommonTopicRetentionPolicy(String topicName) throws
Exception {
+ checkTopicRetentionPolicy(topicName, new RetentionPolicies(-1, -1));
+ }
+
+ private void checkTopicRetentionPolicy(String topicName, RetentionPolicies
retentionPolicies) throws Exception {
+ ManagedLedgerConfig config = pulsar.getBrokerService()
+ .getManagedLedgerConfig(TopicName.get(topicName)).get();
+ Assert.assertEquals(config.getRetentionSizeInMB(),
retentionPolicies.getRetentionSizeInMB());
+
Assert.assertEquals(config.getRetentionTimeMillis(),retentionPolicies.getRetentionTimeInMinutes()
* 60000L);
+ }
+
private void testCompactionCursorRetention(String topic) throws Exception {
Set<String> keys = Sets.newHashSet("a", "b", "c");
Set<String> keysToExpire = Sets.newHashSet("x1", "x2");