This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d3707c56db6 [improve][broker] Reduce unnecessary
REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST (#23839)
d3707c56db6 is described below
commit d3707c56db691304f4a1caceb559d3f29508fa6f
Author: Baodi Shi <[email protected]>
AuthorDate: Mon Jan 13 12:06:36 2025 +0800
[improve][broker] Reduce unnecessary
REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST (#23839)
---
.../ReplicatedSubscriptionsController.java | 23 ++++++
.../broker/service/ReplicatedSubscriptionTest.java | 90 ++++++++++++++++++++++
2 files changed, 113 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index 4fb0022194a..b21fe7acfdb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.persistent;
import static
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
@@ -220,6 +221,23 @@ public class ReplicatedSubscriptionsController implements
AutoCloseable, Topic.P
private void startNewSnapshot() {
cleanupTimedOutSnapshots();
+ if (lastCompletedSnapshotStartTime == 0 &&
!pendingSnapshots.isEmpty()) {
+ // 1. If the remote cluster has disabled subscription replication
or there's an incorrect config,
+ // it will not respond to SNAPSHOT_REQUEST. Therefore,
lastCompletedSnapshotStartTime will remain 0,
+ // making it unnecessary to resend the request.
+ // 2. This approach prevents sending additional SNAPSHOT_REQUEST
to both local_topic and remote_topic.
+ // 3. Since it's uncertain when the remote cluster will enable
subscription replication,
+ // the timeout mechanism of pendingSnapshots is used to ensure
retries.
+ //
+ // In other words, when hit this case, The frequency of sending
SNAPSHOT_REQUEST
+ // will use `replicatedSubscriptionsSnapshotTimeoutSeconds`.
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] PendingSnapshot exists but has never
succeeded. "
+ + "Skipping snapshot creation until pending snapshot
timeout.", topic.getName());
+ }
+ return;
+ }
+
if (topic.getLastMaxReadPositionMovedForwardTimestamp() <
lastCompletedSnapshotStartTime
|| topic.getLastMaxReadPositionMovedForwardTimestamp() == 0) {
// There was no message written since the last snapshot, we can
skip creating a new snapshot
@@ -324,6 +342,11 @@ public class ReplicatedSubscriptionsController implements
AutoCloseable, Topic.P
return localCluster;
}
+ @VisibleForTesting
+ public ConcurrentMap<String, ReplicatedSubscriptionsSnapshotBuilder>
pendingSnapshots() {
+ return pendingSnapshots;
+ }
+
@Override
public boolean isMarkerMessage() {
// Everything published by this controller will be a marker a message
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
index 5b896a22baa..0f527993bba 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
@@ -64,6 +64,10 @@ import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TransactionIsolationLevel;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.MarkerType;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -1002,6 +1006,92 @@ public class ReplicatedSubscriptionTest extends
ReplicatorTestBase {
Assert.assertEquals(result, List.of("V2"));
}
+ @Test
+ public void testReplicatedSubscriptionOneWay() throws Exception {
+ final String namespace =
BrokerTestUtil.newUniqueName("pulsar-r4/replicatedsubscription");
+ final String topicName = "persistent://" + namespace + "/one-way";
+ int defaultSubscriptionsSnapshotFrequency =
config1.getReplicatedSubscriptionsSnapshotFrequencyMillis();
+ int defaultSubscriptionsSnapshotTimeout =
config1.getReplicatedSubscriptionsSnapshotTimeoutSeconds();
+ config1.setReplicatedSubscriptionsSnapshotTimeoutSeconds(2);
+ config1.setReplicatedSubscriptionsSnapshotFrequencyMillis(100);
+
+ // cluster4 disabled ReplicatedSubscriptions
+ admin1.tenants().createTenant("pulsar-r4",
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid4"),
Sets.newHashSet(cluster1, cluster4)));
+ admin1.namespaces().createNamespace(namespace);
+ admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet(cluster1, cluster4));
+
+ String subscriptionName = "cluster-subscription";
+ boolean replicateSubscriptionState = true;
+
+ @Cleanup
+ PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+
+ @Cleanup
+ final PulsarClient client4 =
PulsarClient.builder().serviceUrl(url4.toString())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+
+ // create subscription in cluster4
+ createReplicatedSubscription(client1, topicName, subscriptionName,
replicateSubscriptionState);
+ // create subscription in cluster4
+ createReplicatedSubscription(client4, topicName, subscriptionName,
replicateSubscriptionState);
+
+ // send messages in cluster1
+ @Cleanup
+ Producer<byte[]> producer = client1.newProducer().topic(topicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ int numMessages = 6;
+ for (int i = 0; i < numMessages; i++) {
+ String body = "message" + i;
+ producer.send(body.getBytes(StandardCharsets.UTF_8));
+ }
+ producer.close();
+
+ // wait for snapshot marker request to be replicated
+ Thread.sleep(3 *
config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
+
+ // Assert just have 1 pending snapshot in cluster1
+ final PersistentTopic topic1 =
+ (PersistentTopic)
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+ ReplicatedSubscriptionsController r1Controller =
+ topic1.getReplicatedSubscriptionController().get();
+ assertEquals(r1Controller.pendingSnapshots().size(), 1);
+
+ // Assert cluster4 just receive 1 snapshot request msg
+ int numSnapshotRequest = 0;
+ List<Message<byte[]>> r4Messages = admin4.topics()
+ .peekMessages(topicName, subscriptionName, 100, true,
TransactionIsolationLevel.READ_UNCOMMITTED);
+ for (Message<byte[]> r4Message : r4Messages) {
+ MessageMetadata msgMetadata = ((MessageImpl<byte[]>)
r4Message).getMessageBuilder();
+ if (msgMetadata.hasMarkerType() && msgMetadata.getMarkerType() ==
MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST_VALUE) {
+ numSnapshotRequest++;
+ }
+ }
+ Assert.assertEquals(numSnapshotRequest, 1);
+
+ // Wait pending snapshot timeout
+
Thread.sleep(config1.getReplicatedSubscriptionsSnapshotTimeoutSeconds() * 1000);
+ numSnapshotRequest = 0;
+ r4Messages = admin4.topics()
+ .peekMessages(topicName, subscriptionName, 100, true,
TransactionIsolationLevel.READ_UNCOMMITTED);
+ for (Message<byte[]> r4Message : r4Messages) {
+ MessageMetadata msgMetadata = ((MessageImpl<byte[]>)
r4Message).getMessageBuilder();
+ if (msgMetadata.hasMarkerType() && msgMetadata.getMarkerType() ==
MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST_VALUE) {
+ numSnapshotRequest++;
+ }
+ }
+ Assert.assertEquals(numSnapshotRequest, 2);
+
+ // Set back to default config.
+
config1.setReplicatedSubscriptionsSnapshotTimeoutSeconds(defaultSubscriptionsSnapshotTimeout);
+
config1.setReplicatedSubscriptionsSnapshotFrequencyMillis(defaultSubscriptionsSnapshotFrequency);
+ }
+
/**
* Disable replication subscription.
* Test scheduled task case.