This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2322004069f [fix][broker] Do not write replicated snapshot marker when
the topic which is not enable replication (#21495)
2322004069f is described below
commit 2322004069f23ecca1a12a4ced898fed854275fb
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Nov 14 20:35:29 2023 +0800
[fix][broker] Do not write replicated snapshot marker when the topic which
is not enable replication (#21495)
### Motivation
[PIP
33](https://github.com/apache/pulsar/wiki/PIP-33%3A-Replicated-subscriptions)
introduces a new concept ` Replicated subscriptions`. When a topic has a
consumer (subscription) that enables replicated subscriptions, it will write
markers into the original topic. Even if there is no replicated cluster
configured for this topic, the mark will still be written. And that will make
the backlog of the topic keep increasing.
---
The mark will be written in the following two ways:
1. A scheduled task writes a marker at a fixed time interval if there are
new messages published.
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java#L78-L86
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java#L77-L86
2. Acknowledging message will trigger a check if the first snapshot is
written and the mark delete position moves, if true, It will write a marker.
https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java#L114-L150
### Modifications
According to the topic policy to create or remove
`ReplicatedSubscriptionsController` of this topic.
---
.../broker/service/persistent/PersistentTopic.java | 9 +-
.../broker/service/ReplicatorSubscriptionTest.java | 211 +++++++++++++++++++++
2 files changed, 217 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index ba40a75651d..13bcf769618 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3010,7 +3010,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
updateTopicPolicyByNamespacePolicy(data);
-
+ checkReplicatedSubscriptionControllerState();
isEncryptionRequired = data.encryption_required;
isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
@@ -3497,12 +3497,14 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
boolean isCurrentlyEnabled =
replicatedSubscriptionsController.isPresent();
boolean isEnableReplicatedSubscriptions =
brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();
+ boolean replicationEnabled =
this.topicPolicies.getReplicationClusters().get().size() > 1;
- if (shouldBeEnabled && !isCurrentlyEnabled &&
isEnableReplicatedSubscriptions) {
+ if (shouldBeEnabled && !isCurrentlyEnabled &&
isEnableReplicatedSubscriptions && replicationEnabled) {
log.info("[{}] Enabling replicated subscriptions controller",
topic);
replicatedSubscriptionsController = Optional.of(new
ReplicatedSubscriptionsController(this,
brokerService.pulsar().getConfiguration().getClusterName()));
- } else if (isCurrentlyEnabled && !shouldBeEnabled ||
!isEnableReplicatedSubscriptions) {
+ } else if (isCurrentlyEnabled && !shouldBeEnabled ||
!isEnableReplicatedSubscriptions
+ || !replicationEnabled) {
log.info("[{}] Disabled replicated subscriptions controller",
topic);
replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);
replicatedSubscriptionsController = Optional.empty();
@@ -3685,6 +3687,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
updateTopicPolicy(policies);
shadowTopics = policies.getShadowTopics();
updateDispatchRateLimiter();
+ checkReplicatedSubscriptionControllerState();
updateSubscriptionsDispatcherRateLimiter().thenRun(() -> {
updatePublishDispatcher();
updateSubscribeRateLimiter();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
index f816aa2dd24..529fb923f59 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
@@ -32,6 +32,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -50,7 +51,9 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -60,6 +63,7 @@ import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
/**
@@ -728,6 +732,213 @@ public class ReplicatorSubscriptionTest extends
ReplicatorTestBase {
Awaitility.await().untilAsserted(() ->
assertNotNull(topic2.getSubscription(subscriptionName)));
}
+ @DataProvider(name = "isTopicPolicyEnabled")
+ private Object[][] isTopicPolicyEnabled() {
+ // Todo: fix replication can not be enabled at topic level.
+ return new Object[][] { { Boolean.FALSE } };
+ }
+
+ /**
+ * Test the replication subscription can work normal in the following
cases:
+ * <p>
+ * 1. Do not write data into the original topic when the topic does
not configure a remote cluster. {topic1}
+ * 1. Publish message to the topic and then wait a moment,
+ * the backlog will not increase after publishing completely.
+ * 2. Acknowledge the messages, the last confirm entry does not
change.
+ * 2. Snapshot and mark will be written after topic configure a
remote cluster. {topic2}
+ * 1. publish message to topic. After publishing completely, the
backlog of the topic keep increase.
+ * 2. Wait the snapshot complete, the backlog stop changing.
+ * 3. Publish messages to wait another snapshot complete.
+ * 4. Ack messages to move the mark delete position after the
position record in the first snapshot.
+ * 5. Check new entry (a mark) appending to the original topic.
+ * 3. Stopping writing snapshot and mark after remove the remote
cluster of the topic. {topic2}
+ * similar to step 1.
+ * </p>
+ */
+ @Test(dataProvider = "isTopicPolicyEnabled")
+ public void testWriteMarkerTaskOfReplicateSubscriptions(boolean
isTopicPolicyEnabled) throws Exception {
+ // 1. Prepare resource and use proper configuration.
+ String namespace =
BrokerTestUtil.newUniqueName("pulsar/testReplicateSubBackLog");
+ String topic1 = "persistent://" + namespace + "/replication-enable";
+ String topic2 = "persistent://" + namespace + "/replication-disable";
+ String subName = "sub";
+
+ admin1.namespaces().createNamespace(namespace);
+
pulsar1.getConfiguration().setTopicLevelPoliciesEnabled(isTopicPolicyEnabled);
+ pulsar1.getConfiguration().setReplicationPolicyCheckDurationSeconds(1);
+
pulsar1.getConfiguration().setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
+ // 2. Build Producer and Consumer.
+ @Cleanup
+ PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ @Cleanup
+ Consumer<byte[]> consumer1 = client1.newConsumer()
+ .topic(topic1)
+ .subscriptionName(subName)
+ .ackTimeout(5, TimeUnit.SECONDS)
+ .subscriptionType(SubscriptionType.Shared)
+ .replicateSubscriptionState(true)
+ .subscribe();
+ @Cleanup
+ Producer<byte[]> producer1 = client1.newProducer()
+ .topic(topic1)
+ .create();
+ // 3. Test replication subscription work as expected.
+ // Test case 1: disable replication, backlog will not increase.
+ testReplicatedSubscriptionWhenDisableReplication(producer1, consumer1,
topic1);
+
+ // Test case 2: enable replication, mark and snapshot work as expected.
+ if (isTopicPolicyEnabled) {
+ admin1.topics().createNonPartitionedTopic(topic2);
+ admin1.topics().setReplicationClusters(topic2, List.of("r1",
"r2"));
+ } else {
+ admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r1", "r2"));
+ }
+ @Cleanup
+ Consumer<byte[]> consumer2 = client1.newConsumer()
+ .topic(topic2)
+ .subscriptionName(subName)
+ .ackTimeout(5, TimeUnit.SECONDS)
+ .subscriptionType(SubscriptionType.Shared)
+ .replicateSubscriptionState(true)
+ .subscribe();
+ @Cleanup
+ Producer<byte[]> producer2 = client1.newProducer()
+ .topic(topic2)
+ .create();
+ testReplicatedSubscriptionWhenEnableReplication(producer2, consumer2,
topic2);
+
+ // Test case 3: enable replication, mark and snapshot work as expected.
+ if (isTopicPolicyEnabled) {
+ admin1.topics().setReplicationClusters(topic2, List.of("r1"));
+ } else {
+ admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r1"));
+ }
+ testReplicatedSubscriptionWhenDisableReplication(producer2, consumer2,
topic2);
+ // 4. Clear resource.
+ pulsar1.getConfiguration().setForceDeleteNamespaceAllowed(true);
+ admin1.namespaces().deleteNamespace(namespace, true);
+ pulsar1.getConfiguration().setForceDeleteNamespaceAllowed(false);
+ }
+
+ /**
+ * Disable replication subscription.
+ * Test scheduled task case.
+ * 1. Send three messages |1:0|1:1|1:2|.
+ * 2. Get topic backlog, as backlog1.
+ * 3. Wait a moment.
+ * 4. Get the topic backlog again, the backlog will not increase.
+ * Test acknowledge messages case.
+ * 1. Get the last confirm entry, as LAC1.
+ * 2. Acknowledge these messages |1:0|1:1|.
+ * 3. wait a moment.
+ * 4. Get the last confirm entry, as LAC2. LAC1 is equal to LAC2.
+ * Clear environment.
+ * 1. Ack all the retained messages. |1:2|
+ * 2. Wait for the backlog to return to zero.
+ */
+ private void
testReplicatedSubscriptionWhenDisableReplication(Producer<byte[]> producer,
Consumer<byte[]> consumer,
+ String
topic) throws Exception {
+ final int messageSum = 3;
+ // Test scheduled task case.
+ for (int i = 0; i < messageSum; i++) {
+ producer.newMessage().send();
+ }
+ long backlog1 = admin1.topics().getStats(topic,
false).getBacklogSize();
+ Thread.sleep(3000);
+ long backlog2 = admin1.topics().getStats(topic,
false).getBacklogSize();
+ assertEquals(backlog1, backlog2);
+ // Test acknowledge messages case.
+ String lastConfirmEntry1 =
admin1.topics().getInternalStats(topic).lastConfirmedEntry;
+ for (int i = 0; i < messageSum - 1; i++) {
+ consumer.acknowledge(consumer.receive(5, TimeUnit.SECONDS));
+ }
+ Awaitility.await().untilAsserted(() -> {
+ String lastConfirmEntry2 =
admin1.topics().getInternalStats(topic).lastConfirmedEntry;
+ assertEquals(lastConfirmEntry1, lastConfirmEntry2);
+ });
+ // Clear environment.
+ consumer.acknowledge(consumer.receive(5, TimeUnit.SECONDS));
+ Awaitility.await().untilAsserted(() -> {
+ long backlog4 = admin1.topics().getStats(topic,
false).getBacklogSize();
+ assertEquals(backlog4, 0);
+ });
+ }
+
+ /**
+ * Enable replication subscription.
+ * Test scheduled task case.
+ * 1. Wait replicator connected.
+ * 2. Send three messages |1:0|1:1|1:2|.
+ * 3. Get topic backlog, as backlog1.
+ * 4. Wait a moment.
+ * 5. Get the topic backlog again, as backlog2. The backlog2 is bigger
than backlog1. |1:0|1:1|1:2|mark|.
+ * 6. Wait the snapshot complete.
+ * Test acknowledge messages case.
+ * 1. Write messages and wait another snapshot complete.
|1:0|1:1|1:2|mark|1:3|1:4|1:5|mark|
+ * 2. Ack message |1:0|1:1|1:2|1:3|1:4|.
+ * 3. Get last confirm entry, as LAC1.
+ * 2. Wait a moment.
+ * 3. Get Last confirm entry, as LAC2. LAC2 different to LAC1.
|1:5|mark|mark|
+ * Clear environment.
+ * 1. Ack all the retained message |1:5|.
+ * 2. Wait for the backlog to return to zero.
+ */
+ private void
testReplicatedSubscriptionWhenEnableReplication(Producer<byte[]> producer,
Consumer<byte[]> consumer,
+ String topic)
throws Exception {
+ final int messageSum = 3;
+ Awaitility.await().untilAsserted(() -> {
+ List<String> keys = pulsar1.getBrokerService()
+ .getTopic(topic, false).get().get()
+ .getReplicators().keys();
+ assertEquals(keys.size(), 1);
+ assertTrue(pulsar1.getBrokerService()
+ .getTopic(topic, false).get().get()
+ .getReplicators().get(keys.get(0)).isConnected());
+ });
+ // Test scheduled task case.
+ sendMessageAndWaitSnapshotComplete(producer, topic, messageSum);
+ // Test acknowledge messages case.
+ // After snapshot write completely, acknowledging message to move the
mark delete position
+ // after the position recorded in the snapshot will trigger to write a
new marker.
+ sendMessageAndWaitSnapshotComplete(producer, topic, messageSum);
+ String lastConfirmedEntry3 = admin1.topics().getInternalStats(topic,
false).lastConfirmedEntry;
+ for (int i = 0; i < messageSum * 2 - 1; i++) {
+ consumer.acknowledge(consumer.receive(5, TimeUnit.SECONDS));
+ }
+ Awaitility.await().untilAsserted(() -> {
+ String lastConfirmedEntry4 =
admin1.topics().getInternalStats(topic, false).lastConfirmedEntry;
+ assertNotEquals(lastConfirmedEntry3, lastConfirmedEntry4);
+ });
+ // Clear environment.
+ consumer.acknowledge(consumer.receive(5, TimeUnit.SECONDS));
+ Awaitility.await().untilAsserted(() -> {
+ long backlog4 = admin1.topics().getStats(topic,
false).getBacklogSize();
+ assertEquals(backlog4, 0);
+ });
+ }
+
+ private void sendMessageAndWaitSnapshotComplete(Producer<byte[]> producer,
String topic,
+ int messageSum) throws
Exception {
+ for (int i = 0; i < messageSum; i++) {
+ producer.newMessage().send();
+ }
+ long backlog1 = admin1.topics().getStats(topic,
false).getBacklogSize();
+ Awaitility.await().untilAsserted(() -> {
+ long backlog2 = admin1.topics().getStats(topic,
false).getBacklogSize();
+ assertTrue(backlog2 > backlog1);
+ });
+ // Wait snapshot write completely, stop writing marker into topic.
+ Awaitility.await().untilAsserted(() -> {
+ String lastConfirmedEntry1 =
admin1.topics().getInternalStats(topic, false).lastConfirmedEntry;
+ PersistentTopicInternalStats persistentTopicInternalStats =
admin1.topics().getInternalStats(topic, false);
+ Thread.sleep(1000);
+ String lastConfirmedEntry2 =
admin1.topics().getInternalStats(topic, false).lastConfirmedEntry;
+ assertEquals(lastConfirmedEntry1, lastConfirmedEntry2);
+ });
+ }
+
void publishMessages(Producer<byte[]> producer, int startIndex, int
numMessages, Set<String> sentMessages)
throws PulsarClientException {
for (int i = startIndex; i < startIndex + numMessages; i++) {