BewareMyPower commented on code in PR #20695:
URL: https://github.com/apache/pulsar/pull/20695#discussion_r1247866216


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java:
##########
@@ -340,6 +353,58 @@ public Object[][] codecProvider() {
         return new Object[][] { { 0 }, { 1000 } };
     }
 
+    @Test(dataProvider = "subscriptionTypes")
+    public void testConsumerReconnectTwice(SubscriptionType subscriptionType) 
throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+        final String subscriptionName = "subscription1";
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
+        // Create producer and consumer.
+        ConsumerImpl<String> consumer = (ConsumerImpl<String>) 
pulsarClient.newConsumer(Schema.STRING)
+                .subscriptionType(subscriptionType)
+                
.receiverQueueSize(1000).topic(topicName).subscriptionName(subscriptionName).subscribe();
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).enableBatching(false)
+                .topic(topicName).create();
+        int sendMessageCount = 10;
+        for (int i = 0; i < sendMessageCount; i++){
+            producer.send("msg- " + i);
+        }
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(consumer.numMessagesInQueue(), sendMessageCount);
+        });
+        printConsumerStats(topicName, subscriptionName);
+
+        // Do the second subscribe.
+        consumer.connectionOpened(consumer.getClientCnx());

Review Comment:
   It's still not clear in which case `ConsumerImpl#connectionOpened` will be 
called when the state is `Ready`. It does not make sense here that you called 
it manually.
   
   The `connectionOpened` could only be called by `ConnectionHandler#grabCnx`, 
which could only be triggered by `ConsumerImpl#connectionClosed` if the 
consumer has already been running, just like the **Background of scenarios that 
could trigger reconnection** in your PR description.
   
   However, `connectionClosed` will change the consumer's state to `Connecting` 
. If the state failed to be changed to `Connecting`, the `grabCnx` wouldn't be 
called.
   
   
https://github.com/apache/pulsar/blob/8d5303514efa6cf0fa98c7285339a11821c7cf79/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java#L137-L141
   
   If you changed line 378 to:
   
   ```java
           consumer.connectionClosed(consumer.getClientCnx());
           // Wait for a while until the `grabCnx` is called
           Thread.sleep(1000);
   ```
   
   You can see the state is `Connecting`.
   
   <img width="971" alt="image" 
src="https://github.com/apache/pulsar/assets/18204803/561b3297-3a11-4f86-b119-2c7ba677885a";>
   
   Please explain in which case could `connectionOpened` be called when the 
state is Ready, otherwise this PR might test a case that could never happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to