poorbarcode commented on code in PR #20595:
URL: https://github.com/apache/pulsar/pull/20595#discussion_r1243789605


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java:
##########
@@ -63,11 +66,20 @@ protected void grabCnx() {
             return;
         }
 
-        if (!isValidStateForReconnection()) {
-            // Ignore connection closed when we are shutting down
-            log.info("[{}] [{}] Ignoring reconnection request (state: {})",
-                    state.topic, state.getHandlerName(), state.getState());
-            return;
+        synchronized (this) {
+            if (duringConnect) {
+                log.info("[{}] [{}] Skip grabbing the connection since there 
is a pending connection",
+                        state.topic, state.getHandlerName());
+                return;
+            }
+            if (isValidStateForReconnection()) {

Review Comment:
   If the stat of the consumer is `Ready`, the following steps will be 
executed, which makes the method `connectionOpened ` can be executed more than 
once, causing the message to be lost.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java:
##########
@@ -81,8 +93,15 @@ protected void grabCnx() {
             } else {
                 cnxFuture = state.client.getConnection(state.topic); //
             }
-            cnxFuture.thenAccept(cnx -> connection.connectionOpened(cnx)) //
-                    .exceptionally(this::handleConnectionError);
+            cnxFuture.thenCompose(cnx -> connection.connectionOpened(cnx))
+                    .exceptionally(this::handleConnectionError)
+                    .whenComplete((__, e) -> {
+                        if (e != null) {
+                            log.error("[{}] [{}] Unexpected exception after 
the connection",
+                                    state.topic, state.getHandlerName(), e);
+                        }
+                        duringConnect = false;

Review Comment:
   Before the variable `duringConnect` is changed to `false`, the next 
`reconnectLater` will be rejected, which makes the connection not be retried to 
connect forever.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to