eolivelli commented on a change in pull request #11382:
URL: https://github.com/apache/pulsar/pull/11382#discussion_r672895371
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
##########
@@ -457,6 +461,73 @@ public void testReplicatedSubscriptionRestApi2() throws
Exception {
String.format("numReceivedMessages2 (%d) should be less than
%d", numReceivedMessages2, numMessages));
}
+ /**
+ * Tests replicated subscriptions when replicator producer is closed
+ */
+ @Test
+ public void testReplicatedSubscriptionWhenReplicatorProducerIsClosed()
throws Exception {
+ String namespace =
BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
+ String topicName = "persistent://" + namespace +
"/when-replicator-producer-is-closed";
+ String subscriptionName = "sub";
+
+ admin1.namespaces().createNamespace(namespace);
+ admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r1", "r2"));
+
+ @Cleanup
+ PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+
+ // create consumer in r1
+ @Cleanup
+ Consumer<byte[]> consumer1 = client1.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subscriptionName)
+ .replicateSubscriptionState(true)
+ .subscribe();
+
+ // waiting to replicate topic/subscription to r1->r2
+ Awaitility.await().until(() ->
pulsar2.getBrokerService().getTopics().containsKey(topicName));
+ final PersistentTopic topic2 = (PersistentTopic)
pulsar2.getBrokerService().getTopic(topicName, false).join().get();
+ Awaitility.await().untilAsserted(() ->
assertTrue(topic2.getReplicators().get("r1").isConnected()));
+ Awaitility.await().untilAsserted(() ->
assertNotNull(topic2.getSubscription(subscriptionName)));
+
+ // unsubscribe replicated subscription in r2
+ admin2.topics().deleteSubscription(topicName, subscriptionName);
+ assertNull(topic2.getSubscription(subscriptionName));
+
+ // close replicator producer in r2
+ final Method closeReplProducersIfNoBacklog =
PersistentTopic.class.getDeclaredMethod("closeReplProducersIfNoBacklog", null);
+ closeReplProducersIfNoBacklog.setAccessible(true);
+ ((CompletableFuture<Void>)
closeReplProducersIfNoBacklog.invoke(topic2, null)).join();
+ assertFalse(topic2.getReplicators().get("r1").isConnected());
+
+ // send messages in r1
+ int numMessages = 6;
+ {
+ @Cleanup
+ Producer<byte[]> producer = client1.newProducer().topic(topicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+ for (int i = 0; i < numMessages; i++) {
+ String body = "message" + i;
+ producer.send(body.getBytes(StandardCharsets.UTF_8));
+
Thread.sleep(config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
Review comment:
is there a way to not use a blind sleep but to wait for a condition with
Awaiatility ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]