This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5df97b41a43 [improve][broker] Do not retain the data in the system
topic (#22022)
5df97b41a43 is described below
commit 5df97b41a431a714a448c198c312e98aa4a084e8
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Feb 6 15:28:43 2024 +0800
[improve][broker] Do not retain the data in the system topic (#22022)
### Motivation
For some use case, the users need to store all the messages even though
these message are acked by all subscription.
So they set the retention policy of the namespace to infinite retention
(setting both time and size limits to `-1`). But the data in the system topic
does not need for infinite retention.
### Modifications
For system topics, do not retain messages that have already been
acknowledged.
---
.../pulsar/broker/service/BrokerService.java | 15 +++++--
.../pulsar/compaction/CompactionRetentionTest.java | 48 ++++++++++++++++++++++
2 files changed, 59 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0383c63b1f3..5d75f8b917f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1779,10 +1779,17 @@ public class BrokerService implements Closeable {
}
if (retentionPolicies == null) {
- retentionPolicies = policies.map(p ->
p.retention_policies).orElseGet(
- () -> new
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
- serviceConfig.getDefaultRetentionSizeInMB())
- );
+ if (SystemTopicNames.isSystemTopic(topicName)) {
+ if (log.isDebugEnabled()) {
+ log.debug("{} Disable data retention policy for system
topic.", topicName);
+ }
+ retentionPolicies = new RetentionPolicies(0, 0);
+ } else {
+ retentionPolicies = policies.map(p ->
p.retention_policies).orElseGet(
+ () -> new
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
+
serviceConfig.getDefaultRetentionSizeInMB())
+ );
+ }
}
ManagedLedgerConfig managedLedgerConfig = new
ManagedLedgerConfig();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
index 055c595fbfe..98bf2b819c2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -45,9 +46,13 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -212,6 +217,49 @@ public class CompactionRetentionTest extends
MockedPulsarServiceBaseTest {
);
}
+ @Test
+ public void testRetentionPolicesForSystemTopic() throws Exception {
+ String namespace = "my-tenant/my-ns";
+ String topicPrefix = "persistent://" + namespace + "/";
+ admin.namespaces().setRetention(namespace, new RetentionPolicies(-1,
-1));
+ // Check event topics and transaction internal topics.
+ for (String eventTopic : SystemTopicNames.EVENTS_TOPIC_NAMES) {
+ checkSystemTopicRetentionPolicy(topicPrefix + eventTopic);
+ }
+ checkSystemTopicRetentionPolicy(topicPrefix +
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN);
+ checkSystemTopicRetentionPolicy(topicPrefix +
SystemTopicNames.TRANSACTION_COORDINATOR_LOG);
+ checkSystemTopicRetentionPolicy(topicPrefix +
SystemTopicNames.PENDING_ACK_STORE_SUFFIX);
+
+ // Check common topics.
+ checkCommonTopicRetentionPolicy(topicPrefix + "my-topic" +
System.nanoTime());
+ // Specify retention policies for system topic.
+ pulsar.getConfiguration().setTopicLevelPoliciesEnabled(true);
+ pulsar.getConfiguration().setSystemTopicEnabled(true);
+ admin.topics().createNonPartitionedTopic(topicPrefix +
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+ admin.topicPolicies().setRetention(topicPrefix +
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT,
+ new RetentionPolicies(10, 10));
+ Awaitility.await().untilAsserted(() -> {
+ checkTopicRetentionPolicy(topicPrefix +
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT,
+ new RetentionPolicies(10, 10));
+ });
+ }
+
+ private void checkSystemTopicRetentionPolicy(String topicName) throws
Exception {
+ checkTopicRetentionPolicy(topicName, new RetentionPolicies(0, 0));
+
+ }
+
+ private void checkCommonTopicRetentionPolicy(String topicName) throws
Exception {
+ checkTopicRetentionPolicy(topicName, new RetentionPolicies(-1, -1));
+ }
+
+ private void checkTopicRetentionPolicy(String topicName, RetentionPolicies
retentionPolicies) throws Exception {
+ ManagedLedgerConfig config = pulsar.getBrokerService()
+ .getManagedLedgerConfig(TopicName.get(topicName)).get();
+ Assert.assertEquals(config.getRetentionSizeInMB(),
retentionPolicies.getRetentionSizeInMB());
+
Assert.assertEquals(config.getRetentionTimeMillis(),retentionPolicies.getRetentionTimeInMinutes()
* 60000L);
+ }
+
private void testCompactionCursorRetention(String topic) throws Exception {
Set<String> keys = Sets.newHashSet("a", "b", "c");
Set<String> keysToExpire = Sets.newHashSet("x1", "x2");