This is an automated email from the ASF dual-hosted git repository. bogong pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4d4f1896e42eed23cc2c3dad0b6ff052ed0c0af6 Author: Lari Hotari <[email protected]> AuthorDate: Tue Sep 27 21:32:45 2022 +0300 Skip creating a subscription replication snapshot if no messages have been published after the topic gets activated on a broker (#16618) * Skip creating a replication snapshot if no messages have been published * Adapt test to new behavior where replication snapshots happen only when there are new messages (cherry picked from commit 43ad6f951b6567dd2c4b015d602fa3316f45a74f) --- .../ReplicatedSubscriptionsController.java | 3 +- .../broker/service/ReplicatorSubscriptionTest.java | 55 ++++++++++++++++------ 2 files changed, 42 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java index 2b1ae4ba193..3fbc80a13b8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java @@ -202,7 +202,8 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P private void startNewSnapshot() { cleanupTimedOutSnapshots(); - if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime) { + if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime + || topic.getLastDataMessagePublishedTimestamp() == 0) { // There was no message written since the last snapshot, we can skip creating a new snapshot if (log.isDebugEnabled()) { log.debug("[{}] There is no new data in topic. Skipping snapshot creation.", topic.getName()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java index 2a4c7d17b8b..175574ed828 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java @@ -28,6 +28,7 @@ import com.google.common.collect.Sets; import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -173,6 +174,14 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase { // create subscription in r1 createReplicatedSubscription(client1, topicName, subscriptionName, true); + // Validate that no snapshots are created before messages are published + Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis()); + PersistentTopic t1 = (PersistentTopic) pulsar1.getBrokerService() + .getTopic(topicName, false).get().get(); + ReplicatedSubscriptionsController rsc1 = t1.getReplicatedSubscriptionController().get(); + // no snapshot should have been created before any messages are published + assertTrue(rsc1.getLastCompletedSnapshotId().isEmpty()); + @Cleanup PulsarClient client2 = PulsarClient.builder() .serviceUrl(url2.toString()) @@ -196,9 +205,6 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase { Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis()); // In R1 - PersistentTopic t1 = (PersistentTopic) pulsar1.getBrokerService() - .getTopic(topicName, false).get().get(); - ReplicatedSubscriptionsController rsc1 = t1.getReplicatedSubscriptionController().get(); Position p1 = t1.getLastPosition(); String snapshot1 = rsc1.getLastCompletedSnapshotId().get(); @@ -479,22 +485,35 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase { .statsInterval(0, TimeUnit.SECONDS) .build(); - // create consumer in r1 - @Cleanup - Consumer<byte[]> consumer1 = client1.newConsumer() - .topic(topicName) - .subscriptionName(subscriptionName) - .replicateSubscriptionState(true) - .subscribe(); + { + // create consumer in r1 + @Cleanup + Consumer<byte[]> consumer = client1.newConsumer() + .topic(topicName) + .subscriptionName(subscriptionName) + .replicateSubscriptionState(true) + .subscribe(); - // waiting to replicate topic/subscription to r1->r2 - Awaitility.await().until(() -> pulsar2.getBrokerService().getTopics().containsKey(topicName)); - final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); - Awaitility.await().untilAsserted(() -> assertTrue(topic2.getReplicators().get("r1").isConnected())); - Awaitility.await().untilAsserted(() -> assertNotNull(topic2.getSubscription(subscriptionName))); + // send one message to trigger replication + @Cleanup + Producer<byte[]> producer = client1.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + producer.send("message".getBytes(StandardCharsets.UTF_8)); + + assertEquals(readMessages(consumer, new HashSet<>(), 1, false), 1); + + // waiting to replicate topic/subscription to r1->r2 + Awaitility.await().until(() -> pulsar2.getBrokerService().getTopics().containsKey(topicName)); + final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + Awaitility.await().untilAsserted(() -> assertTrue(topic2.getReplicators().get("r1").isConnected())); + Awaitility.await().untilAsserted(() -> assertNotNull(topic2.getSubscription(subscriptionName))); + } // unsubscribe replicated subscription in r2 admin2.topics().deleteSubscription(topicName, subscriptionName); + final PersistentTopic topic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); assertNull(topic2.getSubscription(subscriptionName)); // close replicator producer in r2 @@ -519,6 +538,12 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase { // consume 6 messages in r1 Set<String> receivedMessages = new LinkedHashSet<>(); + @Cleanup + Consumer<byte[]> consumer1 = client1.newConsumer() + .topic(topicName) + .subscriptionName(subscriptionName) + .replicateSubscriptionState(true) + .subscribe(); assertEquals(readMessages(consumer1, receivedMessages, numMessages, false), numMessages); // wait for subscription to be replicated
