This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d3707c56db6 [improve][broker] Reduce unnecessary 
REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST (#23839)
d3707c56db6 is described below

commit d3707c56db691304f4a1caceb559d3f29508fa6f
Author: Baodi Shi <ba...@apache.org>
AuthorDate: Mon Jan 13 12:06:36 2025 +0800

    [improve][broker] Reduce unnecessary 
REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST (#23839)
---
 .../ReplicatedSubscriptionsController.java         | 23 ++++++
 .../broker/service/ReplicatedSubscriptionTest.java | 90 ++++++++++++++++++++++
 2 files changed, 113 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index 4fb0022194a..b21fe7acfdb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
@@ -220,6 +221,23 @@ public class ReplicatedSubscriptionsController implements 
AutoCloseable, Topic.P
     private void startNewSnapshot() {
         cleanupTimedOutSnapshots();
 
+        if (lastCompletedSnapshotStartTime == 0 && 
!pendingSnapshots.isEmpty()) {
+            // 1. If the remote cluster has disabled subscription replication 
or there's an incorrect config,
+            //    it will not respond to SNAPSHOT_REQUEST. Therefore, 
lastCompletedSnapshotStartTime will remain 0,
+            //    making it unnecessary to resend the request.
+            // 2. This approach prevents sending additional SNAPSHOT_REQUEST 
to both local_topic and remote_topic.
+            // 3. Since it's uncertain when the remote cluster will enable 
subscription replication,
+            //    the timeout mechanism of pendingSnapshots is used to ensure 
retries.
+            //
+            // In other words, when hit this case, The frequency of sending 
SNAPSHOT_REQUEST
+            // will use `replicatedSubscriptionsSnapshotTimeoutSeconds`.
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] PendingSnapshot exists but has never 
succeeded. "
+                        + "Skipping snapshot creation until pending snapshot 
timeout.", topic.getName());
+            }
+            return;
+        }
+
         if (topic.getLastMaxReadPositionMovedForwardTimestamp() < 
lastCompletedSnapshotStartTime
                 || topic.getLastMaxReadPositionMovedForwardTimestamp() == 0) {
             // There was no message written since the last snapshot, we can 
skip creating a new snapshot
@@ -324,6 +342,11 @@ public class ReplicatedSubscriptionsController implements 
AutoCloseable, Topic.P
         return localCluster;
     }
 
+    @VisibleForTesting
+    public ConcurrentMap<String, ReplicatedSubscriptionsSnapshotBuilder> 
pendingSnapshots() {
+        return pendingSnapshots;
+    }
+
     @Override
     public boolean isMarkerMessage() {
         // Everything published by this controller will be a marker a message
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
index 5b896a22baa..0f527993bba 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
@@ -64,6 +64,10 @@ import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TransactionIsolationLevel;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.MarkerType;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -1002,6 +1006,92 @@ public class ReplicatedSubscriptionTest extends 
ReplicatorTestBase {
         Assert.assertEquals(result, List.of("V2"));
     }
 
+    @Test
+    public void testReplicatedSubscriptionOneWay() throws Exception {
+        final String namespace = 
BrokerTestUtil.newUniqueName("pulsar-r4/replicatedsubscription");
+        final String topicName = "persistent://" + namespace + "/one-way";
+        int defaultSubscriptionsSnapshotFrequency = 
config1.getReplicatedSubscriptionsSnapshotFrequencyMillis();
+        int defaultSubscriptionsSnapshotTimeout = 
config1.getReplicatedSubscriptionsSnapshotTimeoutSeconds();
+        config1.setReplicatedSubscriptionsSnapshotTimeoutSeconds(2);
+        config1.setReplicatedSubscriptionsSnapshotFrequencyMillis(100);
+        
+        // cluster4 disabled ReplicatedSubscriptions
+        admin1.tenants().createTenant("pulsar-r4",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid4"), 
Sets.newHashSet(cluster1, cluster4)));
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet(cluster1, cluster4));
+        
+        String subscriptionName = "cluster-subscription";
+        boolean replicateSubscriptionState = true;
+
+        @Cleanup
+        PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        @Cleanup
+        final PulsarClient client4 = 
PulsarClient.builder().serviceUrl(url4.toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        // create subscription in cluster4
+        createReplicatedSubscription(client1, topicName, subscriptionName, 
replicateSubscriptionState);
+        // create subscription in cluster4
+        createReplicatedSubscription(client4, topicName, subscriptionName, 
replicateSubscriptionState);
+
+        // send messages in cluster1
+        @Cleanup
+        Producer<byte[]> producer = client1.newProducer().topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+        int numMessages = 6;
+        for (int i = 0; i < numMessages; i++) {
+            String body = "message" + i;
+            producer.send(body.getBytes(StandardCharsets.UTF_8));
+        }
+        producer.close();
+
+        // wait for snapshot marker request to be replicated
+        Thread.sleep(3 * 
config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
+
+        // Assert just have 1 pending snapshot in cluster1
+        final PersistentTopic topic1 =
+                (PersistentTopic) 
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+        ReplicatedSubscriptionsController r1Controller =
+                topic1.getReplicatedSubscriptionController().get();
+        assertEquals(r1Controller.pendingSnapshots().size(), 1);
+        
+        // Assert cluster4 just receive 1 snapshot request msg
+        int numSnapshotRequest = 0;
+        List<Message<byte[]>> r4Messages = admin4.topics()
+                .peekMessages(topicName, subscriptionName, 100, true, 
TransactionIsolationLevel.READ_UNCOMMITTED);
+        for (Message<byte[]> r4Message : r4Messages) {
+            MessageMetadata msgMetadata = ((MessageImpl<byte[]>) 
r4Message).getMessageBuilder();
+            if (msgMetadata.hasMarkerType() && msgMetadata.getMarkerType() == 
MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST_VALUE) {
+                numSnapshotRequest++;
+            }
+        }
+        Assert.assertEquals(numSnapshotRequest, 1);
+
+        // Wait pending snapshot timeout
+        
Thread.sleep(config1.getReplicatedSubscriptionsSnapshotTimeoutSeconds() * 1000);
+        numSnapshotRequest = 0;
+        r4Messages = admin4.topics()
+                .peekMessages(topicName, subscriptionName, 100, true, 
TransactionIsolationLevel.READ_UNCOMMITTED);
+        for (Message<byte[]> r4Message : r4Messages) {
+            MessageMetadata msgMetadata = ((MessageImpl<byte[]>) 
r4Message).getMessageBuilder();
+            if (msgMetadata.hasMarkerType() && msgMetadata.getMarkerType() == 
MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST_VALUE) {
+                numSnapshotRequest++;
+            }
+        }
+        Assert.assertEquals(numSnapshotRequest, 2);
+
+        // Set back to default config.
+        
config1.setReplicatedSubscriptionsSnapshotTimeoutSeconds(defaultSubscriptionsSnapshotTimeout);
+        
config1.setReplicatedSubscriptionsSnapshotFrequencyMillis(defaultSubscriptionsSnapshotFrequency);
+    }
+
     /**
      * Disable replication subscription.
      *    Test scheduled task case.

Reply via email to