This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 72f391105af6b1cdd4f25ef66557806505f12df4 Author: lipenghui <[email protected]> AuthorDate: Tue Jun 22 18:11:06 2021 +0800 Fix potential data lost on the system topic when topic compaction have not triggered yet (#11003) To pre-create the subscription for the compactor to avoid lost any data since we are using reader for reading data from the __change_events topic, if no durable subscription on the topic, the data might be lost. Since we are using the topic compaction on the __change_events topic to reduce the topic policy cache recovery time, so we can leverage the topic compaction cursor for retaining the data. (cherry picked from commit 94ec03111369e694f432ca219be77820648d2188) --- .../pulsar/broker/service/BrokerService.java | 11 ++++- .../broker/service/persistent/PersistentTopic.java | 13 ++++-- .../broker/service/persistent/SystemTopic.java | 21 ++++++++- .../pulsar/broker/admin/TopicPoliciesTest.java | 14 +++++- .../SystemTopicBasedTopicPoliciesServiceTest.java | 53 ++++++++++++---------- 5 files changed, 79 insertions(+), 33 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 9c9d482..dc35db2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1223,8 +1223,15 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies PersistentTopic persistentTopic = isSystemTopic(topic) ? new SystemTopic(topic, ledger, BrokerService.this) : new PersistentTopic(topic, ledger, BrokerService.this); + CompletableFuture<Void> preCreateSubForCompaction = + CompletableFuture.completedFuture(null); + if (persistentTopic instanceof SystemTopic) { + preCreateSubForCompaction = ((SystemTopic) persistentTopic) + .preCreateSubForCompactionIfNeeded(); + } CompletableFuture<Void> replicationFuture = persistentTopic.checkReplication(); - replicationFuture.thenCompose(v -> { + FutureUtil.waitForAll(Lists.newArrayList(preCreateSubForCompaction, replicationFuture)) + .thenCompose(v -> { // Also check dedup status return persistentTopic.checkDeduplicationStatus(); }).thenRun(() -> { @@ -1255,7 +1262,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies return null; }); - } catch (NamingException e) { + } catch (NamingException | PulsarServerException e) { log.warn("Failed to create topic {}-{}", topic, e.getMessage()); pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 38b3264..8f7f356 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames; +import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; import com.carrotsearch.hppc.ObjectObjectHashMap; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; @@ -147,7 +148,6 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactedTopic; import org.apache.pulsar.compaction.CompactedTopicImpl; -import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl; import org.apache.pulsar.utils.StatsOutputStream; @@ -381,7 +381,7 @@ public class PersistentTopic extends AbstractTopic private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor, boolean replicated) { checkNotNull(compactedTopic); - if (subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION)) { + if (subscriptionName.equals(COMPACTION_SUBSCRIPTION)) { return new CompactorSubscription(this, compactedTopic, subscriptionName, cursor); } else { return new PersistentSubscription(this, subscriptionName, cursor, replicated); @@ -1369,7 +1369,7 @@ public class PersistentTopic extends AbstractTopic long backlogEstimate = 0; - PersistentSubscription compactionSub = subscriptions.get(Compactor.COMPACTION_SUBSCRIPTION); + PersistentSubscription compactionSub = subscriptions.get(COMPACTION_SUBSCRIPTION); if (compactionSub != null) { backlogEstimate = compactionSub.estimateBacklogSize(); } else { @@ -1396,6 +1396,13 @@ public class PersistentTopic extends AbstractTopic } } + /** + * Return if the topic has triggered compaction before or not. + */ + protected boolean hasCompactionTriggered() { + return subscriptions.containsKey(COMPACTION_SUBSCRIPTION); + } + CompletableFuture<Void> startReplicator(String remoteCluster) { log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster); final CompletableFuture<Void> future = new CompletableFuture<>(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java index 1b49dbe..788aa94 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java @@ -19,14 +19,18 @@ package org.apache.pulsar.broker.service.persistent; +import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; +import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.common.api.proto.CommandSubscribe; public class SystemTopic extends PersistentTopic { public SystemTopic(String topic, ManagedLedger ledger, BrokerService brokerService) - throws BrokerServiceException.NamingException { + throws BrokerServiceException.NamingException, PulsarServerException { super(topic, ledger, brokerService); } @@ -54,4 +58,19 @@ public class SystemTopic extends PersistentTopic { public void checkGC() { // do nothing for system topic } + + public CompletableFuture<Void> preCreateSubForCompactionIfNeeded() { + if (!super.hasCompactionTriggered()) { + // To pre-create the subscription for the compactor to avoid lost any data since we are using reader + // for reading data from the __change_events topic, if no durable subscription on the topic, + // the data might be lost. Since we are using the topic compaction on the __change_events topic + // to reduce the topic policy cache recovery time, + // so we can leverage the topic compaction cursor for retaining the data. + return super.createSubscription(COMPACTION_SUBSCRIPTION, + CommandSubscribe.InitialPosition.Earliest, false) + .thenCompose(__ -> CompletableFuture.completedFuture(null)); + } else { + return CompletableFuture.completedFuture(null); + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 30a48b2..3d47212 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -49,6 +49,7 @@ import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SubscribeRate; @@ -2295,6 +2296,15 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { log.info("Backlog quota set success on topic: {}", testTopic); Awaitility.await() + .untilAsserted(() -> { + TopicStats stats = admin.topics().getStats(topicPolicyEventsTopic); + Assert.assertTrue(stats.getSubscriptions().containsKey("__compaction")); + }); + + PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicPolicyEventsTopic); + long previousCompactedLedgerId = internalStats.compactedLedger.ledgerId; + + Awaitility.await() .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic) .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota)); @@ -2302,8 +2312,8 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { Awaitility.await() .untilAsserted(() -> { - TopicStats stats = admin.topics().getStats(topicPolicyEventsTopic); - Assert.assertTrue(stats.getSubscriptions().containsKey("__compaction")); + PersistentTopicInternalStats iStats = admin.topics().getInternalStats(topicPolicyEventsTopic); + Assert.assertTrue(iStats.compactedLedger.ledgerId != previousCompactedLedgerId); }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index 0d81459..aaa4618 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -85,11 +85,14 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy).get(); // Wait for all topic policies updated. - Thread.sleep(3000); + Awaitility.await().untilAsserted(() -> + Assert.assertTrue(systemTopicBasedTopicPoliciesService + .getPoliciesCacheInit(TOPIC1.getNamespaceObject()))); - Assert.assertTrue(systemTopicBasedTopicPoliciesService.getPoliciesCacheInit(TOPIC1.getNamespaceObject())); // Assert broker is cache all topic policies - Assert.assertEquals(systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1).getMaxConsumerPerTopic().intValue(), 10); + Awaitility.await().untilAsserted(() -> + Assert.assertEquals(systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1) + .getMaxConsumerPerTopic().intValue(), 10)); // Update policy for TOPIC1 TopicPolicies policies1 = TopicPolicies.builder() @@ -127,21 +130,21 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic .build(); systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC6, policies6).get(); - Thread.sleep(1000); - - TopicPolicies policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1); - TopicPolicies policiesGet2 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2); - TopicPolicies policiesGet3 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC3); - TopicPolicies policiesGet4 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC4); - TopicPolicies policiesGet5 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC5); - TopicPolicies policiesGet6 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC6); - - Assert.assertEquals(policiesGet1, policies1); - Assert.assertEquals(policiesGet2, policies2); - Assert.assertEquals(policiesGet3, policies3); - Assert.assertEquals(policiesGet4, policies4); - Assert.assertEquals(policiesGet5, policies5); - Assert.assertEquals(policiesGet6, policies6); + Awaitility.await().untilAsserted(() -> { + TopicPolicies policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1); + TopicPolicies policiesGet2 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2); + TopicPolicies policiesGet3 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC3); + TopicPolicies policiesGet4 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC4); + TopicPolicies policiesGet5 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC5); + TopicPolicies policiesGet6 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC6); + + Assert.assertEquals(policiesGet1, policies1); + Assert.assertEquals(policiesGet2, policies2); + Assert.assertEquals(policiesGet3, policies3); + Assert.assertEquals(policiesGet4, policies4); + Assert.assertEquals(policiesGet5, policies5); + Assert.assertEquals(policiesGet6, policies6); + }); // Remove reader cache will remove policies cache Assert.assertEquals(systemTopicBasedTopicPoliciesService.getPoliciesCacheSize(), 6); @@ -164,13 +167,13 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic policies1.setMaxProducerPerTopic(106); systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1); - Thread.sleep(1000); - // reader for NAMESPACE1 will back fill the reader cache - policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1); - policiesGet2 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2); - Assert.assertEquals(policies1, policiesGet1); - Assert.assertEquals(policies2, policiesGet2); + Awaitility.await().untilAsserted(() -> { + TopicPolicies policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1); + TopicPolicies policiesGet2 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2); + Assert.assertEquals(policies1, policiesGet1); + Assert.assertEquals(policies2, policiesGet2); + }); // Check reader cache is correct. Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE2))); @@ -178,7 +181,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE3))); // Check get without cache - policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPoliciesBypassCacheAsync(TOPIC1).get(); + TopicPolicies policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPoliciesBypassCacheAsync(TOPIC1).get(); Assert.assertEquals(policies1, policiesGet1); }
