poorbarcode commented on code in PR #21495:
URL: https://github.com/apache/pulsar/pull/21495#discussion_r1388771531
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java:
##########
@@ -704,6 +705,66 @@ public void
testReplicatedSubscriptionWhenReplicatorProducerIsClosed() throws Ex
Awaitility.await().untilAsserted(() ->
assertNotNull(topic2.getSubscription(subscriptionName)));
}
+ @Test
+ public void testReplicateSubBackLog() throws Exception {
+ String namespace =
BrokerTestUtil.newUniqueName("pulsar/testReplicateSubBackLog");
+ String topic = "persistent://" + namespace +
"/when-replicator-producer-is-closed";
+ String subName = "sub";
+
+ admin1.namespaces().createNamespace(namespace);
+
pulsar1.getConfiguration().setReplicatedSubscriptionsSnapshotFrequencyMillis(100);
+ @Cleanup
+ PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ @Cleanup
+ Consumer<byte[]> consumer = client1.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .ackTimeout(5, TimeUnit.SECONDS)
+ .subscriptionType(SubscriptionType.Shared)
+ .replicateSubscriptionState(true)
+ .subscribe();
+ @Cleanup
+ Producer<byte[]> producer = client1.newProducer()
+ .topic(topic)
+ .create();
Review Comment:
Since the replicated replication will publish markers in these scenarios:
- Publish a marker per second if there are new messages are sent
- Ack message that after a snapshot
We'd better make this test flow like this:
- send some messages
- sleep `2s`
- ack messages
- send some messages again
- ack messages
---
And could you also add another test:
- Enable the feature replication on topic level or namespace level
- Pub & Sub
- Disable the feature replication.
- Verify all things work as expected.
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java:
##########
@@ -704,6 +705,66 @@ public void
testReplicatedSubscriptionWhenReplicatorProducerIsClosed() throws Ex
Awaitility.await().untilAsserted(() ->
assertNotNull(topic2.getSubscription(subscriptionName)));
}
+ @Test
+ public void testReplicateSubBackLog() throws Exception {
+ String namespace =
BrokerTestUtil.newUniqueName("pulsar/testReplicateSubBackLog");
+ String topic = "persistent://" + namespace +
"/when-replicator-producer-is-closed";
+ String subName = "sub";
+
+ admin1.namespaces().createNamespace(namespace);
+
pulsar1.getConfiguration().setReplicatedSubscriptionsSnapshotFrequencyMillis(100);
+ @Cleanup
+ PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ @Cleanup
+ Consumer<byte[]> consumer = client1.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .ackTimeout(5, TimeUnit.SECONDS)
+ .subscriptionType(SubscriptionType.Shared)
+ .replicateSubscriptionState(true)
+ .subscribe();
+ @Cleanup
+ Producer<byte[]> producer = client1.newProducer()
+ .topic(topic)
+ .create();
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage().send();
+ }
+ TopicStats topicStats = admin1.topics().getStats(topic, false, true);
+ long backlogSize = topicStats.getBacklogSize();
+ //There should no replicator snapshot marker write into the topic, so
the backlog would not change.
+ try {
+ Awaitility.await().untilAsserted(() -> {
+ TopicStats stats = admin1.topics().getStats(topic, false,
true);
+ assertNotEquals(stats.getBacklogSize(), backlogSize);
+ });
+ fail();
+ } catch (org.awaitility.core.ConditionTimeoutException ex) {
+ }
Review Comment:
You want to confirm no marker was sent after `10s`, right? It is better to
verify after `sleep(10s)`.
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java:
##########
@@ -704,6 +705,66 @@ public void
testReplicatedSubscriptionWhenReplicatorProducerIsClosed() throws Ex
Awaitility.await().untilAsserted(() ->
assertNotNull(topic2.getSubscription(subscriptionName)));
}
+ @Test
+ public void testReplicateSubBackLog() throws Exception {
+ String namespace =
BrokerTestUtil.newUniqueName("pulsar/testReplicateSubBackLog");
+ String topic = "persistent://" + namespace +
"/when-replicator-producer-is-closed";
+ String subName = "sub";
+
+ admin1.namespaces().createNamespace(namespace);
+
pulsar1.getConfiguration().setReplicatedSubscriptionsSnapshotFrequencyMillis(100);
+ @Cleanup
+ PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ @Cleanup
+ Consumer<byte[]> consumer = client1.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .ackTimeout(5, TimeUnit.SECONDS)
+ .subscriptionType(SubscriptionType.Shared)
+ .replicateSubscriptionState(true)
+ .subscribe();
+ @Cleanup
+ Producer<byte[]> producer = client1.newProducer()
+ .topic(topic)
+ .create();
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage().send();
+ }
+ TopicStats topicStats = admin1.topics().getStats(topic, false, true);
+ long backlogSize = topicStats.getBacklogSize();
+ //There should no replicator snapshot marker write into the topic, so
the backlog would not change.
+ try {
+ Awaitility.await().untilAsserted(() -> {
+ TopicStats stats = admin1.topics().getStats(topic, false,
true);
+ assertNotEquals(stats.getBacklogSize(), backlogSize);
+ });
+ fail();
+ } catch (org.awaitility.core.ConditionTimeoutException ex) {
+ }
+ // Start writing snapshot marker after having remote replicate
clusters,
+ // so the backlog would continue to increase.
+ admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r1", "r2"));
Review Comment:
Could you also add a case that enabled replication on the topic level?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]