poorbarcode commented on code in PR #21495:
URL: https://github.com/apache/pulsar/pull/21495#discussion_r1388771531


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java:
##########
@@ -704,6 +705,66 @@ public void 
testReplicatedSubscriptionWhenReplicatorProducerIsClosed() throws Ex
         Awaitility.await().untilAsserted(() -> 
assertNotNull(topic2.getSubscription(subscriptionName)));
     }
 
+    @Test
+    public void testReplicateSubBackLog() throws Exception {
+        String namespace = 
BrokerTestUtil.newUniqueName("pulsar/testReplicateSubBackLog");
+        String topic = "persistent://" + namespace + 
"/when-replicator-producer-is-closed";
+        String subName = "sub";
+
+        admin1.namespaces().createNamespace(namespace);
+        
pulsar1.getConfiguration().setReplicatedSubscriptionsSnapshotFrequencyMillis(100);
+        @Cleanup
+        PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+        @Cleanup
+        Consumer<byte[]> consumer = client1.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .ackTimeout(5, TimeUnit.SECONDS)
+                .subscriptionType(SubscriptionType.Shared)
+                .replicateSubscriptionState(true)
+                .subscribe();
+        @Cleanup
+        Producer<byte[]> producer = client1.newProducer()
+                .topic(topic)
+                .create();

Review Comment:
   Since the replicated replication will publish markers in these scenarios:
   - Publish a marker per second if there are new messages are sent
   - Ack message that after a snapshot
   
   We'd better make this test flow like this:
   - send some messages
   - sleep `2s`
   - ack messages
   - send some messages again
   - ack messages
   
   ---
   
   And could you also add another test:
   - Enable the feature replication on topic level or namespace level
   - Pub & Sub
   - Disable the feature replication.
   - Verify all things work as expected.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java:
##########
@@ -704,6 +705,66 @@ public void 
testReplicatedSubscriptionWhenReplicatorProducerIsClosed() throws Ex
         Awaitility.await().untilAsserted(() -> 
assertNotNull(topic2.getSubscription(subscriptionName)));
     }
 
+    @Test
+    public void testReplicateSubBackLog() throws Exception {
+        String namespace = 
BrokerTestUtil.newUniqueName("pulsar/testReplicateSubBackLog");
+        String topic = "persistent://" + namespace + 
"/when-replicator-producer-is-closed";
+        String subName = "sub";
+
+        admin1.namespaces().createNamespace(namespace);
+        
pulsar1.getConfiguration().setReplicatedSubscriptionsSnapshotFrequencyMillis(100);
+        @Cleanup
+        PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+        @Cleanup
+        Consumer<byte[]> consumer = client1.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .ackTimeout(5, TimeUnit.SECONDS)
+                .subscriptionType(SubscriptionType.Shared)
+                .replicateSubscriptionState(true)
+                .subscribe();
+        @Cleanup
+        Producer<byte[]> producer = client1.newProducer()
+                .topic(topic)
+                .create();
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage().send();
+        }
+        TopicStats topicStats = admin1.topics().getStats(topic, false, true);
+        long backlogSize = topicStats.getBacklogSize();
+        //There should no replicator snapshot marker write into the topic, so 
the backlog would not change.
+        try {
+            Awaitility.await().untilAsserted(() -> {
+                TopicStats stats = admin1.topics().getStats(topic, false, 
true);
+                assertNotEquals(stats.getBacklogSize(), backlogSize);
+            });
+            fail();
+        } catch (org.awaitility.core.ConditionTimeoutException ex) {
+        }

Review Comment:
   You want to confirm no marker was sent after `10s`, right? It is better to 
verify after `sleep(10s)`.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java:
##########
@@ -704,6 +705,66 @@ public void 
testReplicatedSubscriptionWhenReplicatorProducerIsClosed() throws Ex
         Awaitility.await().untilAsserted(() -> 
assertNotNull(topic2.getSubscription(subscriptionName)));
     }
 
+    @Test
+    public void testReplicateSubBackLog() throws Exception {
+        String namespace = 
BrokerTestUtil.newUniqueName("pulsar/testReplicateSubBackLog");
+        String topic = "persistent://" + namespace + 
"/when-replicator-producer-is-closed";
+        String subName = "sub";
+
+        admin1.namespaces().createNamespace(namespace);
+        
pulsar1.getConfiguration().setReplicatedSubscriptionsSnapshotFrequencyMillis(100);
+        @Cleanup
+        PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+        @Cleanup
+        Consumer<byte[]> consumer = client1.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .ackTimeout(5, TimeUnit.SECONDS)
+                .subscriptionType(SubscriptionType.Shared)
+                .replicateSubscriptionState(true)
+                .subscribe();
+        @Cleanup
+        Producer<byte[]> producer = client1.newProducer()
+                .topic(topic)
+                .create();
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage().send();
+        }
+        TopicStats topicStats = admin1.topics().getStats(topic, false, true);
+        long backlogSize = topicStats.getBacklogSize();
+        //There should no replicator snapshot marker write into the topic, so 
the backlog would not change.
+        try {
+            Awaitility.await().untilAsserted(() -> {
+                TopicStats stats = admin1.topics().getStats(topic, false, 
true);
+                assertNotEquals(stats.getBacklogSize(), backlogSize);
+            });
+            fail();
+        } catch (org.awaitility.core.ConditionTimeoutException ex) {
+        }
+        // Start writing snapshot marker after having remote replicate 
clusters,
+        // so the backlog would continue to increase.
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2"));

Review Comment:
   Could you also add a case that enabled replication on the topic level?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to