This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new aeb311090ea [fix] [client] Messages lost when consumer reconnect
(#20695)
aeb311090ea is described below
commit aeb311090ea50aec621cb264951b9d29976215a3
Author: fengyubiao <[email protected]>
AuthorDate: Tue Jul 4 18:01:24 2023 +0800
[fix] [client] Messages lost when consumer reconnect (#20695)
---
.../java/org/apache/pulsar/broker/service/ServerCnx.java | 2 +-
.../org/apache/pulsar/client/impl/ConnectionHandler.java | 13 +++++++------
2 files changed, 8 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index b475765fb00..33c24c8c973 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1080,7 +1080,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
"Consumer that failed is already present on
the connection");
} else {
Consumer consumer =
existingConsumerFuture.getNow(null);
- log.info("[{}] Consumer with the same id is already
created:"
+ log.warn("[{}] Consumer with the same id is already
created:"
+ " consumerId={}, consumer={}",
remoteAddress, consumerId, consumer);
commandSender.sendSuccessResponse(requestId);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index ea1e09670e9..1930254ed4f 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -63,6 +63,12 @@ public class ConnectionHandler {
}
protected void grabCnx() {
+ if (!duringConnect.compareAndSet(false, true)) {
+ log.info("[{}] [{}] Skip grabbing the connection since there is a
pending connection",
+ state.topic, state.getHandlerName());
+ return;
+ }
+
if (CLIENT_CNX_UPDATER.get(this) != null) {
log.warn("[{}] [{}] Client cnx already set, ignoring reconnection
request",
state.topic, state.getHandlerName());
@@ -75,11 +81,6 @@ public class ConnectionHandler {
state.topic, state.getHandlerName(), state.getState());
return;
}
- if (!duringConnect.compareAndSet(false, true)) {
- log.info("[{}] [{}] Skip grabbing the connection since there is a
pending connection",
- state.topic, state.getHandlerName());
- return;
- }
try {
CompletableFuture<ClientCnx> cnxFuture;
@@ -118,8 +119,8 @@ public class ConnectionHandler {
}
void reconnectLater(Throwable exception) {
- duringConnect.set(false);
CLIENT_CNX_UPDATER.set(this, null);
+ duringConnect.set(false);
if (!isValidStateForReconnection()) {
log.info("[{}] [{}] Ignoring reconnection request (state: {})",
state.topic, state.getHandlerName(), state.getState());