lhotari commented on a change in pull request #11382:
URL: https://github.com/apache/pulsar/pull/11382#discussion_r672996098



##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
##########
@@ -457,6 +461,71 @@ public void testReplicatedSubscriptionRestApi2() throws 
Exception {
                 String.format("numReceivedMessages2 (%d) should be less than 
%d", numReceivedMessages2, numMessages));
     }
 
+    /**
+     * Tests replicated subscriptions when replicator producer is closed
+     */
+    @Test
+    public void testReplicatedSubscriptionWhenReplicatorProducerIsClosed() 
throws Exception {
+        String namespace = 
BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
+        String topicName = "persistent://" + namespace + 
"/when-replicator-producer-is-closed";
+        String subscriptionName = "sub";
+
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2"));
+
+        @Cleanup
+        PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        // create consumer in r1
+        @Cleanup
+        Consumer<byte[]> consumer1 = client1.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .replicateSubscriptionState(true)
+                .subscribe();
+
+        // waiting to replicate topic/subscription to r1->r2
+        Awaitility.await().until(() -> 
pulsar2.getBrokerService().getTopics().containsKey(topicName));
+        final PersistentTopic topic2 = (PersistentTopic) 
pulsar2.getBrokerService().getTopic(topicName, false).join().get();
+        Awaitility.await().untilAsserted(() -> 
assertTrue(topic2.getReplicators().get("r1").isConnected()));
+        Awaitility.await().untilAsserted(() -> 
assertNotNull(topic2.getSubscription(subscriptionName)));
+
+        // unsubscribe replicated subscription in r2
+        admin2.topics().deleteSubscription(topicName, subscriptionName);
+        assertNull(topic2.getSubscription(subscriptionName));
+
+        // close replicator producer in r2
+        final Method closeReplProducersIfNoBacklog = 
PersistentTopic.class.getDeclaredMethod("closeReplProducersIfNoBacklog", null);
+        closeReplProducersIfNoBacklog.setAccessible(true);
+        ((CompletableFuture<Void>) 
closeReplProducersIfNoBacklog.invoke(topic2, null)).join();
+        assertFalse(topic2.getReplicators().get("r1").isConnected());
+
+        // send messages in r1
+        int numMessages = 6;
+        {
+            @Cleanup
+            Producer<byte[]> producer = client1.newProducer().topic(topicName)
+                    .enableBatching(false)
+                    .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                    .create();
+            for (int i = 0; i < numMessages; i++) {
+                String body = "message" + i;
+                producer.send(body.getBytes(StandardCharsets.UTF_8));
+            }
+            producer.close();

Review comment:
       @eolivelli The style here is something that I have been using in tests. 
Blame me for this. :) Lombok closes the producer at the end of the current 
scope of the variable. There's a new code block that is started with curly 
braces on line 507 which starts the block and it ends on 518.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to