This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new aeb311090ea [fix] [client] Messages lost when consumer reconnect 
(#20695)
aeb311090ea is described below

commit aeb311090ea50aec621cb264951b9d29976215a3
Author: fengyubiao <[email protected]>
AuthorDate: Tue Jul 4 18:01:24 2023 +0800

    [fix] [client] Messages lost when consumer reconnect (#20695)
---
 .../java/org/apache/pulsar/broker/service/ServerCnx.java    |  2 +-
 .../org/apache/pulsar/client/impl/ConnectionHandler.java    | 13 +++++++------
 2 files changed, 8 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index b475765fb00..33c24c8c973 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1080,7 +1080,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                 "Consumer that failed is already present on 
the connection");
                     } else {
                         Consumer consumer = 
existingConsumerFuture.getNow(null);
-                        log.info("[{}] Consumer with the same id is already 
created:"
+                        log.warn("[{}] Consumer with the same id is already 
created:"
                                         + " consumerId={}, consumer={}",
                                 remoteAddress, consumerId, consumer);
                         commandSender.sendSuccessResponse(requestId);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index ea1e09670e9..1930254ed4f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -63,6 +63,12 @@ public class ConnectionHandler {
     }
 
     protected void grabCnx() {
+        if (!duringConnect.compareAndSet(false, true)) {
+            log.info("[{}] [{}] Skip grabbing the connection since there is a 
pending connection",
+                    state.topic, state.getHandlerName());
+            return;
+        }
+
         if (CLIENT_CNX_UPDATER.get(this) != null) {
             log.warn("[{}] [{}] Client cnx already set, ignoring reconnection 
request",
                     state.topic, state.getHandlerName());
@@ -75,11 +81,6 @@ public class ConnectionHandler {
                     state.topic, state.getHandlerName(), state.getState());
             return;
         }
-        if (!duringConnect.compareAndSet(false, true)) {
-            log.info("[{}] [{}] Skip grabbing the connection since there is a 
pending connection",
-                    state.topic, state.getHandlerName());
-            return;
-        }
 
         try {
             CompletableFuture<ClientCnx> cnxFuture;
@@ -118,8 +119,8 @@ public class ConnectionHandler {
     }
 
     void reconnectLater(Throwable exception) {
-        duringConnect.set(false);
         CLIENT_CNX_UPDATER.set(this, null);
+        duringConnect.set(false);
         if (!isValidStateForReconnection()) {
             log.info("[{}] [{}] Ignoring reconnection request (state: {})",
                     state.topic, state.getHandlerName(), state.getState());

Reply via email to