This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f13d6ad5b22ca3c72e55f7b19425b437ce8025d5 Author: lipenghui <[email protected]> AuthorDate: Thu Jun 17 15:08:23 2021 +0800 Fix compaction not working for system topic (#10941) If a topic only have non-durable subscriptions but not durable subscriptions, and the non-durable subscription reach the end of the topic, we will get 0 estimated backlog size So that the compaction will never been triggered. The expected behavior is if we have no durable subscriptions, we should use the total size for triggering the compaction. (cherry picked from commit 797cb128045c1f2bf304c7de7454f1f12f8312e7) --- .../broker/service/persistent/PersistentTopic.java | 3 ++- .../pulsar/broker/admin/TopicPoliciesTest.java | 31 +++++++++++++++++++++- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 644b726..aa73813 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1374,7 +1374,8 @@ public class PersistentTopic extends AbstractTopic } else { // compaction has never run, so take full backlog size, // or total size if we have no durable subs yet. - backlogEstimate = subscriptions.isEmpty() + backlogEstimate = subscriptions.isEmpty() || subscriptions.values().stream() + .noneMatch(sub -> sub.getCursor().isDurable()) ? ledger.getTotalSize() : ledger.getEstimatedBacklogSize(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 276153d..30a48b2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -45,7 +45,6 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; @@ -54,6 +53,7 @@ import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.TopicStats; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -91,6 +91,8 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { private final String persistenceTopic = "persistent://" + myNamespace + "/test-set-persistence"; + private final String topicPolicyEventsTopic = "persistent://" + myNamespace + "/__change_events"; + @BeforeMethod @Override protected void setup() throws Exception { @@ -2278,4 +2280,31 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { } } + @Test + public void testSystemTopicShouldBeCompacted() throws Exception { + BacklogQuota backlogQuota = BacklogQuota.builder() + .limitSize(1024) + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build(); + log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); + + Awaitility.await() + .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); + + admin.topics().setBacklogQuota(testTopic, backlogQuota); + log.info("Backlog quota set success on topic: {}", testTopic); + + Awaitility.await() + .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic) + .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota)); + + pulsar.getBrokerService().checkCompaction(); + + Awaitility.await() + .untilAsserted(() -> { + TopicStats stats = admin.topics().getStats(topicPolicyEventsTopic); + Assert.assertTrue(stats.getSubscriptions().containsKey("__compaction")); + }); + } + }
