This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4d4f1896e42eed23cc2c3dad0b6ff052ed0c0af6
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Sep 27 21:32:45 2022 +0300

    Skip creating a subscription replication snapshot if no messages have been 
published after the topic gets activated on a broker (#16618)
    
    * Skip creating a replication snapshot if no messages have been published
    
    * Adapt test to new behavior where replication snapshots happen only when 
there are new messages
    
    (cherry picked from commit 43ad6f951b6567dd2c4b015d602fa3316f45a74f)
---
 .../ReplicatedSubscriptionsController.java         |  3 +-
 .../broker/service/ReplicatorSubscriptionTest.java | 55 ++++++++++++++++------
 2 files changed, 42 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index 2b1ae4ba193..3fbc80a13b8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -202,7 +202,8 @@ public class ReplicatedSubscriptionsController implements 
AutoCloseable, Topic.P
     private void startNewSnapshot() {
         cleanupTimedOutSnapshots();
 
-        if (topic.getLastDataMessagePublishedTimestamp() < 
lastCompletedSnapshotStartTime) {
+        if (topic.getLastDataMessagePublishedTimestamp() < 
lastCompletedSnapshotStartTime
+                || topic.getLastDataMessagePublishedTimestamp() == 0) {
             // There was no message written since the last snapshot, we can 
skip creating a new snapshot
             if (log.isDebugEnabled()) {
                 log.debug("[{}] There is no new data in topic. Skipping 
snapshot creation.", topic.getName());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
index 2a4c7d17b8b..175574ed828 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Sets;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -173,6 +174,14 @@ public class ReplicatorSubscriptionTest extends 
ReplicatorTestBase {
         // create subscription in r1
         createReplicatedSubscription(client1, topicName, subscriptionName, 
true);
 
+        // Validate that no snapshots are created before messages are published
+        Thread.sleep(2 * 
config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
+        PersistentTopic t1 = (PersistentTopic) pulsar1.getBrokerService()
+                .getTopic(topicName, false).get().get();
+        ReplicatedSubscriptionsController rsc1 = 
t1.getReplicatedSubscriptionController().get();
+        // no snapshot should have been created before any messages are 
published
+        assertTrue(rsc1.getLastCompletedSnapshotId().isEmpty());
+
         @Cleanup
         PulsarClient client2 = PulsarClient.builder()
                 .serviceUrl(url2.toString())
@@ -196,9 +205,6 @@ public class ReplicatorSubscriptionTest extends 
ReplicatorTestBase {
         Thread.sleep(2 * 
config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
 
         // In R1
-        PersistentTopic t1 = (PersistentTopic) pulsar1.getBrokerService()
-                .getTopic(topicName, false).get().get();
-        ReplicatedSubscriptionsController rsc1 = 
t1.getReplicatedSubscriptionController().get();
         Position p1 = t1.getLastPosition();
         String snapshot1 = rsc1.getLastCompletedSnapshotId().get();
 
@@ -479,22 +485,35 @@ public class ReplicatorSubscriptionTest extends 
ReplicatorTestBase {
                 .statsInterval(0, TimeUnit.SECONDS)
                 .build();
 
-        // create consumer in r1
-        @Cleanup
-        Consumer<byte[]> consumer1 = client1.newConsumer()
-                .topic(topicName)
-                .subscriptionName(subscriptionName)
-                .replicateSubscriptionState(true)
-                .subscribe();
+        {
+            // create consumer in r1
+            @Cleanup
+            Consumer<byte[]> consumer = client1.newConsumer()
+                    .topic(topicName)
+                    .subscriptionName(subscriptionName)
+                    .replicateSubscriptionState(true)
+                    .subscribe();
 
-        // waiting to replicate topic/subscription to r1->r2
-        Awaitility.await().until(() -> 
pulsar2.getBrokerService().getTopics().containsKey(topicName));
-        final PersistentTopic topic2 = (PersistentTopic) 
pulsar2.getBrokerService().getTopic(topicName, false).join().get();
-        Awaitility.await().untilAsserted(() -> 
assertTrue(topic2.getReplicators().get("r1").isConnected()));
-        Awaitility.await().untilAsserted(() -> 
assertNotNull(topic2.getSubscription(subscriptionName)));
+            // send one message to trigger replication
+            @Cleanup
+            Producer<byte[]> producer = client1.newProducer().topic(topicName)
+                    .enableBatching(false)
+                    .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                    .create();
+            producer.send("message".getBytes(StandardCharsets.UTF_8));
+
+            assertEquals(readMessages(consumer, new HashSet<>(), 1, false), 1);
+
+            // waiting to replicate topic/subscription to r1->r2
+            Awaitility.await().until(() -> 
pulsar2.getBrokerService().getTopics().containsKey(topicName));
+            final PersistentTopic topic2 = (PersistentTopic) 
pulsar2.getBrokerService().getTopic(topicName, false).join().get();
+            Awaitility.await().untilAsserted(() -> 
assertTrue(topic2.getReplicators().get("r1").isConnected()));
+            Awaitility.await().untilAsserted(() -> 
assertNotNull(topic2.getSubscription(subscriptionName)));
+        }
 
         // unsubscribe replicated subscription in r2
         admin2.topics().deleteSubscription(topicName, subscriptionName);
+        final PersistentTopic topic2 = (PersistentTopic) 
pulsar2.getBrokerService().getTopic(topicName, false).join().get();
         assertNull(topic2.getSubscription(subscriptionName));
 
         // close replicator producer in r2
@@ -519,6 +538,12 @@ public class ReplicatorSubscriptionTest extends 
ReplicatorTestBase {
 
         // consume 6 messages in r1
         Set<String> receivedMessages = new LinkedHashSet<>();
+        @Cleanup
+        Consumer<byte[]> consumer1 = client1.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .replicateSubscriptionState(true)
+                .subscribe();
         assertEquals(readMessages(consumer1, receivedMessages, numMessages, 
false), numMessages);
 
         // wait for subscription to be replicated

Reply via email to