codelipenghui commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r787417655



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -760,75 +785,86 @@ public void connectionOpened(final ClientCnx cnx) {
         }
         // startMessageRollbackDurationInSec should be consider only once when 
consumer connects to first time
         long startMessageRollbackDuration = (startMessageRollbackDurationInSec 
> 0
-                && startMessageId != null && 
startMessageId.equals(initialStartMessageId)) ? 
startMessageRollbackDurationInSec : 0;
-        ByteBuf request = Commands.newSubscribe(topic, subscription, 
consumerId, requestId, getSubType(), priorityLevel,
-                consumerName, isDurable, startMessageIdData, metadata, 
readCompacted,
-                conf.isReplicateSubscriptionState(), 
InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
-                startMessageRollbackDuration, si, createTopicIfDoesNotExist, 
conf.getKeySharedPolicy(),
-                conf.getSubscriptionProperties());
-
-        cnx.sendRequestWithId(request, requestId).thenRun(() -> {
-            synchronized (ConsumerImpl.this) {
-                if (changeToReadyState()) {
-                    consumerIsReconnectedToBroker(cnx, currentSize);
-                } else {
+                && startMessageId != null
+                && startMessageId.equals(initialStartMessageId)) ? 
startMessageRollbackDurationInSec : 0;
+
+        // synchronized this, because redeliverUnAckMessage eliminate the 
epoch inconsistency between them
+        synchronized (this) {
+            setClientCnx(cnx);
+            ByteBuf request = Commands.newSubscribe(topic, subscription, 
consumerId, requestId, getSubType(),
+                    priorityLevel, consumerName, isDurable, 
startMessageIdData, metadata, readCompacted,
+                    conf.isReplicateSubscriptionState(),
+                    
InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
+                    startMessageRollbackDuration, si, 
createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
+                    // this.consumerEpoch will increase

Review comment:
       Use the current epoch to subscribe.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1705,29 +1762,52 @@ public int numMessagesInQueue() {
 
     @Override
     public void redeliverUnacknowledgedMessages() {
-        ClientCnx cnx = cnx();
-        if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >= 
ProtocolVersion.v2.getValue()) {
-            int currentSize = 0;
-            synchronized (this) {
-                currentSize = incomingMessages.size();
-                clearIncomingMessages();
-                unAckedMessageTracker.clear();
-            }
-            
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId),
 cnx.ctx().voidPromise());
-            if (currentSize > 0) {
-                increaseAvailablePermits(cnx, currentSize);
+        // First : synchronized in order to handle consumer reconnect produce 
race condition, when broker receive
+        // redeliverUnacknowledgedMessages and consumer have not be created and
+        // then receive reconnect epoch change the broker is smaller than the 
client epoch, this will cause client epoch
+        // smaller than broker epoch forever. client will not receive message 
anymore.
+        // Second : we should synchronized `ClientCnx cnx = cnx()` to
+        // prevent use old cnx to send redeliverUnacknowledgedMessages to a 
old broker
+        synchronized (ConsumerImpl.this) {
+            ClientCnx cnx = cnx();
+            // V1 don't support redeliverUnacknowledgedMessages
+            if (cnx != null && cnx.getRemoteEndpointProtocolVersion() < 
ProtocolVersion.v2.getValue()) {
+                if ((getState() == State.Connecting)) {
+                    log.warn("[{}] Client Connection needs to be established " 
+
+                            "for redelivery of unacknowledged messages", this);
+                } else {
+                    log.warn("[{}] Reconnecting the client to redeliver the 
messages.", this);
+                    cnx.ctx().close();
+                }
+
+                return;
             }
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] [{}] Redeliver unacked messages and send 
{} permits", subscription, topic,
-                        consumerName, currentSize);
+
+            // clear local message
+            int currentSize = 0;
+            currentSize = incomingMessages.size();
+            clearIncomingMessages();
+            unAckedMessageTracker.clear();
+
+            // is channel is connected, we should send redeliver command to 
broker
+            if (isConnected(cnx) && cnx != null) {

Review comment:
       ```suggestion
               if (cnx != null && isConnected(cnx)) {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to