This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 118ad6c If a topic has compaction policies configured, we must ensure
the subscription is always pre-created (#11672)
118ad6c is described below
commit 118ad6c0bbcb7288c993c4b4a811896be51a13e9
Author: Matteo Merli <[email protected]>
AuthorDate: Sun Aug 15 19:10:22 2021 -0700
If a topic has compaction policies configured, we must ensure the
subscription is always pre-created (#11672)
* If a topic has compaction policies configured, we must ensure the
subscription always pre-created
* Removed unused imports
---
.../pulsar/broker/service/BrokerService.java | 6 +-
.../broker/service/persistent/PersistentTopic.java | 53 ++++++++++--
.../broker/service/persistent/SystemTopic.java | 18 +---
.../pulsar/compaction/CompactionRetentionTest.java | 96 +++++++++++++++++++++-
4 files changed, 143 insertions(+), 30 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index df0204c..ce9abcb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1250,11 +1250,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
? new SystemTopic(topic, ledger,
BrokerService.this)
: new PersistentTopic(topic, ledger,
BrokerService.this);
CompletableFuture<Void>
preCreateSubForCompaction =
-
CompletableFuture.completedFuture(null);
- if (persistentTopic instanceof SystemTopic) {
- preCreateSubForCompaction = ((SystemTopic)
persistentTopic)
-
.preCreateSubForCompactionIfNeeded();
- }
+
persistentTopic.preCreateSubscriptionForCompactionIfNeeded();
CompletableFuture<Void> replicationFuture =
persistentTopic.checkReplication();
FutureUtil.waitForAll(Lists.newArrayList(preCreateSubForCompaction,
replicationFuture))
.thenCompose(v -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 73c28f7..a6610a3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -116,6 +116,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
@@ -1389,6 +1390,28 @@ public class PersistentTopic extends AbstractTopic
messageDeduplication.purgeInactiveProducers();
}
+ public CompletableFuture<Boolean> isCompactionEnabled() {
+ Optional<Long> topicCompactionThreshold = getTopicPolicies()
+ .map(TopicPolicies::getCompactionThreshold);
+ if (topicCompactionThreshold.isPresent() &&
topicCompactionThreshold.get() > 0) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ TopicName topicName = TopicName.get(topic);
+ return
brokerService.getPulsar().getPulsarResources().getNamespaceResources()
+ .getPolicies(topicName.getNamespaceObject())
+ .thenApply(policies -> {
+ if (policies.isPresent()) {
+ return policies.get().compaction_threshold != null
+ && policies.get().compaction_threshold > 0;
+ } else {
+ // Check broker default
+ return brokerService.pulsar().getConfiguration()
+ .getBrokerServiceCompactionThresholdInBytes()
> 0;
+ }
+ });
+ }
+
public void checkCompaction() {
TopicName name = TopicName.get(topic);
try {
@@ -1438,11 +1461,24 @@ public class PersistentTopic extends AbstractTopic
}
}
- /**
- * Return if the topic has triggered compaction before or not.
- */
- protected boolean hasCompactionTriggered() {
- return subscriptions.containsKey(COMPACTION_SUBSCRIPTION);
+ public CompletableFuture<Void>
preCreateSubscriptionForCompactionIfNeeded() {
+ if (subscriptions.containsKey(COMPACTION_SUBSCRIPTION)) {
+ // The compaction cursor is already there, nothing to do
+ return CompletableFuture.completedFuture(null);
+ }
+
+ return isCompactionEnabled()
+ .thenCompose(enabled -> {
+ if (enabled) {
+ // If a topic has a compaction policy setup, we must
make sure that the compaction cursor
+ // is pre-created, in order to ensure all the data
will be seen by the compactor.
+ return createSubscription(COMPACTION_SUBSCRIPTION,
+
CommandSubscribe.InitialPosition.Earliest, false)
+ .thenCompose(__ ->
CompletableFuture.completedFuture(null));
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ });
}
CompletableFuture<Void> startReplicator(String remoteCluster) {
@@ -2361,7 +2397,10 @@ public class PersistentTopic extends AbstractTopic
if (this.subscribeRateLimiter.isPresent()) {
subscribeRateLimiter.get().onPoliciesUpdate(data);
}
- return CompletableFuture.allOf(replicationFuture, dedupFuture,
persistentPoliciesFuture);
+
+
+ return CompletableFuture.allOf(replicationFuture, dedupFuture,
persistentPoliciesFuture,
+ preCreateSubscriptionForCompactionIfNeeded());
}
/**
@@ -3018,6 +3057,8 @@ public class PersistentTopic extends AbstractTopic
checkDeduplicationStatus();
+ preCreateSubscriptionForCompactionIfNeeded();
+
// update managed ledger config
checkPersistencePolicies();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
index 231f4e9..aaf83a9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
@@ -19,13 +19,11 @@
package org.apache.pulsar.broker.service.persistent;
-import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
-import org.apache.pulsar.common.api.proto.CommandSubscribe;
public class SystemTopic extends PersistentTopic {
@@ -64,18 +62,8 @@ public class SystemTopic extends PersistentTopic {
return CompletableFuture.completedFuture(null);
}
- public CompletableFuture<Void> preCreateSubForCompactionIfNeeded() {
- if (!super.hasCompactionTriggered()) {
- // To pre-create the subscription for the compactor to avoid lost
any data since we are using reader
- // for reading data from the __change_events topic, if no durable
subscription on the topic,
- // the data might be lost. Since we are using the topic compaction
on the __change_events topic
- // to reduce the topic policy cache recovery time,
- // so we can leverage the topic compaction cursor for retaining
the data.
- return super.createSubscription(COMPACTION_SUBSCRIPTION,
- CommandSubscribe.InitialPosition.Earliest, false)
- .thenCompose(__ ->
CompletableFuture.completedFuture(null));
- } else {
- return CompletableFuture.completedFuture(null);
- }
+ public CompletableFuture<Boolean> isCompactionEnabled() {
+ // All system topics are using compaction, even though is not
explicitly set in the policies.
+ return CompletableFuture.completedFuture(true);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
index a73d1f5..3085051 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
@@ -77,6 +77,7 @@ import
org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -94,12 +95,14 @@ public class CompactionRetentionTest extends
MockedPulsarServiceBaseTest {
public void setup() throws Exception {
conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
conf.setManagedLedgerMaxEntriesPerLedger(2);
+ conf.setTopicLevelPoliciesEnabled(true);
+ conf.setSystemTopicEnabled(true);
super.internalSetup();
- admin.clusters().createCluster("use",
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+ admin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
admin.tenants().createTenant("my-tenant",
- new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
Sets.newHashSet("use")));
- admin.namespaces().createNamespace("my-tenant/use/my-ns");
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
Sets.newHashSet("test")));
+ admin.namespaces().createNamespace("my-tenant/my-ns",
Collections.singleton("test"));
compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
@@ -121,7 +124,7 @@ public class CompactionRetentionTest extends
MockedPulsarServiceBaseTest {
*/
@Test
public void testCompaction() throws Exception {
- String topic = "persistent://my-tenant/use/my-ns/my-topic-" +
System.nanoTime();
+ String topic = "persistent://my-tenant/my-ns/my-topic-" +
System.nanoTime();
Set<String> keys = Sets.newHashSet("a", "b", "c");
Set<String> keysToExpire = Sets.newHashSet("x1", "x2");
@@ -195,6 +198,91 @@ public class CompactionRetentionTest extends
MockedPulsarServiceBaseTest {
validateMessages(pulsarClient, false, topic, round,
Collections.emptySet());
}
+
+ /**
+ * When a topic is created, if the compaction threshold are set, the data
should be retained in the compacted view,
+ * even if the topic is not yet compacted.
+ */
+ @Test
+ public void testCompactionRetentionOnTopicCreationWithNamespacePolicies()
throws Exception {
+ String namespace = "my-tenant/my-ns";
+ String topic = "persistent://my-tenant/my-ns/my-topic-" +
System.nanoTime();
+ admin.namespaces().setCompactionThreshold(namespace, 10);
+
+ testCompactionCursorRetention(topic);
+ }
+
+ @Test
+ public void
testCompactionRetentionAfterTopicCreationWithNamespacePolicies() throws
Exception {
+ String namespace = "my-tenant/my-ns";
+ String topic = "persistent://my-tenant/my-ns/my-topic-" +
System.nanoTime();
+
+ // Pre-create the topic, so that compaction is enabled only after the
topic was created
+ pulsarClient.newProducer(Schema.INT32).topic(topic).create().close();
+
+ admin.namespaces().setCompactionThreshold(namespace, 10);
+
+ Awaitility.await().untilAsserted(() ->
+ testCompactionCursorRetention(topic)
+ );
+ }
+
+ @Test
+ public void testCompactionRetentionOnTopicCreationWithTopicPolicies()
throws Exception {
+ String topic = "persistent://my-tenant/my-ns/my-topic-" +
System.nanoTime();
+
+ // Pre-create the topic, otherwise setting policies will fail
+ pulsarClient.newProducer(Schema.INT32).topic(topic).create().close();
+
+ admin.topics().setCompactionThreshold(topic, 10);
+
+ Awaitility.await().untilAsserted(() ->
+ testCompactionCursorRetention(topic)
+ );
+ }
+
+ private void testCompactionCursorRetention(String topic) throws Exception {
+ Set<String> keys = Sets.newHashSet("a", "b", "c");
+ Set<String> keysToExpire = Sets.newHashSet("x1", "x2");
+ Set<String> allKeys = new HashSet<>();
+ allKeys.addAll(keys);
+ allKeys.addAll(keysToExpire);
+
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(SerializationFeature.INDENT_OUTPUT, true);
+
+ @Cleanup
+ Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .create();
+
+ Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+
+ log.info(" ---- X 1: {}", mapper.writeValueAsString(
+ admin.topics().getInternalStats(topic, false)));
+
+ int round = 1;
+
+ for (String key : allKeys) {
+ producer.newMessage()
+ .key(key)
+ .value(round)
+ .send();
+ }
+
+ log.info(" ---- X 2: {}", mapper.writeValueAsString(
+ admin.topics().getInternalStats(topic, false)));
+
+ validateMessages(pulsarClient, true, topic, round, allKeys);
+
+ compactor.compact(topic).join();
+
+ log.info(" ---- X 3: {}", mapper.writeValueAsString(
+ admin.topics().getInternalStats(topic, false)));
+
+ validateMessages(pulsarClient, true, topic, round, allKeys);
+ }
+
private void validateMessages(PulsarClient client, boolean readCompacted,
String topic, int round, Set<String> expectedKeys)
throws Exception {
@Cleanup