This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 25b9785  If a topic has compaction policies configured, we must ensure 
the subscription is always pre-created (#11672)
25b9785 is described below

commit 25b9785745e049b39df1d4035017efbf0ebe308f
Author: Matteo Merli <[email protected]>
AuthorDate: Sun Aug 15 19:10:22 2021 -0700

    If a topic has compaction policies configured, we must ensure the 
subscription is always pre-created (#11672)
    
    * If a topic has compaction policies configured, we must ensure the 
subscription always pre-created
    
    * Removed unused imports
---
 .../pulsar/broker/service/BrokerService.java       | 10 +--
 .../broker/service/persistent/PersistentTopic.java | 53 ++++++++++--
 .../broker/service/persistent/SystemTopic.java     | 18 +---
 .../pulsar/compaction/CompactionRetentionTest.java | 96 +++++++++++++++++++++-
 4 files changed, 146 insertions(+), 31 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0c646b7..4a2cad9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1233,15 +1233,13 @@ public class BrokerService implements Closeable {
                                         ? new SystemTopic(topic, ledger, 
BrokerService.this)
                                         : new PersistentTopic(topic, ledger, 
BrokerService.this);
                                 CompletableFuture<Void> 
preCreateSubForCompaction =
-                                        
CompletableFuture.completedFuture(null);
-                                if (persistentTopic instanceof SystemTopic) {
-                                    preCreateSubForCompaction = ((SystemTopic) 
persistentTopic)
-                                            
.preCreateSubForCompactionIfNeeded();
-                                }
+                                        
persistentTopic.preCreateSubscriptionForCompactionIfNeeded();
                                 CompletableFuture<Void> replicationFuture = 
persistentTopic
                                         .initialize()
                                         .thenCompose(__ -> 
persistentTopic.checkReplication());
-                                
FutureUtil.waitForAll(Lists.newArrayList(preCreateSubForCompaction, 
replicationFuture))
+
+
+                                
CompletableFuture.allOf(preCreateSubForCompaction, replicationFuture)
                                 .thenCompose(v -> {
                                     // Also check dedup status
                                     return 
persistentTopic.checkDeduplicationStatus();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 9cb9339..7128f08 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -116,6 +116,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
@@ -1399,6 +1400,28 @@ public class PersistentTopic extends AbstractTopic
         messageDeduplication.purgeInactiveProducers();
     }
 
+    public CompletableFuture<Boolean> isCompactionEnabled() {
+        Optional<Long> topicCompactionThreshold = getTopicPolicies()
+                .map(TopicPolicies::getCompactionThreshold);
+        if (topicCompactionThreshold.isPresent() && 
topicCompactionThreshold.get() > 0) {
+            return CompletableFuture.completedFuture(true);
+        }
+
+        TopicName topicName = TopicName.get(topic);
+        return 
brokerService.getPulsar().getPulsarResources().getNamespaceResources()
+                .getPolicies(topicName.getNamespaceObject())
+                .thenApply(policies -> {
+                    if (policies.isPresent()) {
+                        return policies.get().compaction_threshold != null
+                                && policies.get().compaction_threshold > 0;
+                    } else {
+                        // Check broker default
+                        return brokerService.pulsar().getConfiguration()
+                                .getBrokerServiceCompactionThresholdInBytes() 
> 0;
+                    }
+                });
+    }
+
     public void checkCompaction() {
         TopicName name = TopicName.get(topic);
         try {
@@ -1448,11 +1471,24 @@ public class PersistentTopic extends AbstractTopic
         }
     }
 
-    /**
-     * Return if the topic has triggered compaction before or not.
-     */
-    protected boolean hasCompactionTriggered() {
-        return subscriptions.containsKey(COMPACTION_SUBSCRIPTION);
+    public CompletableFuture<Void> 
preCreateSubscriptionForCompactionIfNeeded() {
+        if (subscriptions.containsKey(COMPACTION_SUBSCRIPTION)) {
+            // The compaction cursor is already there, nothing to do
+            return CompletableFuture.completedFuture(null);
+        }
+
+        return isCompactionEnabled()
+                .thenCompose(enabled -> {
+                    if (enabled) {
+                        // If a topic has a compaction policy setup, we must 
make sure that the compaction cursor
+                        // is pre-created, in order to ensure all the data 
will be seen by the compactor.
+                        return createSubscription(COMPACTION_SUBSCRIPTION,
+                                        
CommandSubscribe.InitialPosition.Earliest, false)
+                                .thenCompose(__ -> 
CompletableFuture.completedFuture(null));
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                });
     }
 
     CompletableFuture<Void> startReplicator(String remoteCluster) {
@@ -2371,7 +2407,10 @@ public class PersistentTopic extends AbstractTopic
         if (this.subscribeRateLimiter.isPresent()) {
             subscribeRateLimiter.get().onPoliciesUpdate(data);
         }
-        return CompletableFuture.allOf(replicationFuture, dedupFuture, 
persistentPoliciesFuture);
+
+
+        return CompletableFuture.allOf(replicationFuture, dedupFuture, 
persistentPoliciesFuture,
+                preCreateSubscriptionForCompactionIfNeeded());
     }
 
     /**
@@ -3033,6 +3072,8 @@ public class PersistentTopic extends AbstractTopic
 
         checkDeduplicationStatus();
 
+        preCreateSubscriptionForCompactionIfNeeded();
+
         // update managed ledger config
         checkPersistencePolicies();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
index 231f4e9..aaf83a9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
@@ -19,13 +19,11 @@
 
 package org.apache.pulsar.broker.service.persistent;
 
-import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
-import org.apache.pulsar.common.api.proto.CommandSubscribe;
 
 public class SystemTopic extends PersistentTopic {
 
@@ -64,18 +62,8 @@ public class SystemTopic extends PersistentTopic {
         return CompletableFuture.completedFuture(null);
     }
 
-    public CompletableFuture<Void> preCreateSubForCompactionIfNeeded() {
-        if (!super.hasCompactionTriggered()) {
-            // To pre-create the subscription for the compactor to avoid lost 
any data since we are using reader
-            // for reading data from the __change_events topic, if no durable 
subscription on the topic,
-            // the data might be lost. Since we are using the topic compaction 
on the __change_events topic
-            // to reduce the topic policy cache recovery time,
-            // so we can leverage the topic compaction cursor for retaining 
the data.
-            return super.createSubscription(COMPACTION_SUBSCRIPTION,
-                    CommandSubscribe.InitialPosition.Earliest, false)
-                    .thenCompose(__ -> 
CompletableFuture.completedFuture(null));
-        } else {
-            return CompletableFuture.completedFuture(null);
-        }
+    public CompletableFuture<Boolean> isCompactionEnabled() {
+        // All system topics are using compaction, even though is not 
explicitly set in the policies.
+        return CompletableFuture.completedFuture(true);
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
index a73d1f5..3085051 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
@@ -77,6 +77,7 @@ import 
org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -94,12 +95,14 @@ public class CompactionRetentionTest extends 
MockedPulsarServiceBaseTest {
     public void setup() throws Exception {
         conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
         conf.setManagedLedgerMaxEntriesPerLedger(2);
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setSystemTopicEnabled(true);
         super.internalSetup();
 
-        admin.clusters().createCluster("use", 
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        admin.clusters().createCluster("test", 
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
         admin.tenants().createTenant("my-tenant",
-                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("use")));
-        admin.namespaces().createNamespace("my-tenant/use/my-ns");
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("my-tenant/my-ns", 
Collections.singleton("test"));
 
         compactionScheduler = Executors.newSingleThreadScheduledExecutor(
                 new 
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
@@ -121,7 +124,7 @@ public class CompactionRetentionTest extends 
MockedPulsarServiceBaseTest {
      */
     @Test
     public void testCompaction() throws Exception {
-        String topic = "persistent://my-tenant/use/my-ns/my-topic-" + 
System.nanoTime();
+        String topic = "persistent://my-tenant/my-ns/my-topic-" + 
System.nanoTime();
 
         Set<String> keys = Sets.newHashSet("a", "b", "c");
         Set<String> keysToExpire = Sets.newHashSet("x1", "x2");
@@ -195,6 +198,91 @@ public class CompactionRetentionTest extends 
MockedPulsarServiceBaseTest {
         validateMessages(pulsarClient, false, topic, round, 
Collections.emptySet());
     }
 
+
+    /**
+     * When a topic is created, if the compaction threshold are set, the data 
should be retained in the compacted view,
+     * even if the topic is not yet compacted.
+     */
+    @Test
+    public void testCompactionRetentionOnTopicCreationWithNamespacePolicies() 
throws Exception {
+        String namespace = "my-tenant/my-ns";
+        String topic = "persistent://my-tenant/my-ns/my-topic-" + 
System.nanoTime();
+        admin.namespaces().setCompactionThreshold(namespace, 10);
+
+        testCompactionCursorRetention(topic);
+    }
+
+    @Test
+    public void 
testCompactionRetentionAfterTopicCreationWithNamespacePolicies() throws 
Exception {
+        String namespace = "my-tenant/my-ns";
+        String topic = "persistent://my-tenant/my-ns/my-topic-" + 
System.nanoTime();
+
+        // Pre-create the topic, so that compaction is enabled only after the 
topic was created
+        pulsarClient.newProducer(Schema.INT32).topic(topic).create().close();
+
+        admin.namespaces().setCompactionThreshold(namespace, 10);
+
+        Awaitility.await().untilAsserted(() ->
+                testCompactionCursorRetention(topic)
+        );
+    }
+
+    @Test
+    public void testCompactionRetentionOnTopicCreationWithTopicPolicies() 
throws Exception {
+        String topic = "persistent://my-tenant/my-ns/my-topic-" + 
System.nanoTime();
+
+        // Pre-create the topic, otherwise setting policies will fail
+        pulsarClient.newProducer(Schema.INT32).topic(topic).create().close();
+
+        admin.topics().setCompactionThreshold(topic, 10);
+
+        Awaitility.await().untilAsserted(() ->
+                testCompactionCursorRetention(topic)
+        );
+    }
+
+    private void testCompactionCursorRetention(String topic) throws Exception {
+        Set<String> keys = Sets.newHashSet("a", "b", "c");
+        Set<String> keysToExpire = Sets.newHashSet("x1", "x2");
+        Set<String> allKeys = new HashSet<>();
+        allKeys.addAll(keys);
+        allKeys.addAll(keysToExpire);
+
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.configure(SerializationFeature.INDENT_OUTPUT, true);
+
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .create();
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+
+        log.info(" ---- X 1: {}", mapper.writeValueAsString(
+                admin.topics().getInternalStats(topic, false)));
+
+        int round = 1;
+
+        for (String key : allKeys) {
+            producer.newMessage()
+                    .key(key)
+                    .value(round)
+                    .send();
+        }
+
+        log.info(" ---- X 2: {}", mapper.writeValueAsString(
+                admin.topics().getInternalStats(topic, false)));
+
+        validateMessages(pulsarClient, true, topic, round, allKeys);
+
+        compactor.compact(topic).join();
+
+        log.info(" ---- X 3: {}", mapper.writeValueAsString(
+                admin.topics().getInternalStats(topic, false)));
+
+        validateMessages(pulsarClient, true, topic, round, allKeys);
+    }
+
     private void validateMessages(PulsarClient client, boolean readCompacted, 
String topic, int round, Set<String> expectedKeys)
             throws Exception {
         @Cleanup

Reply via email to