wolfstudy commented on a change in pull request #651:
URL: https://github.com/apache/pulsar-client-go/pull/651#discussion_r739988750



##########
File path: pulsar/producer_partition.go
##########
@@ -784,57 +784,57 @@ func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt)
                return
        }
 
-       if pi.sequenceID != response.GetSequenceId() {
+       if pi.sequenceID < response.GetSequenceId() {

Review comment:
       @baomingyu The java logic as follows:
   
   ```
   if (sequenceId > op.sequenceId) {
                   log.warn("[{}] [{}] Got ack for msg. expecting: {} - {} - 
got: {} - {} - queue-size: {}", topic, producerName,
                           op.sequenceId, op.highestSequenceId, sequenceId, 
highestSequenceId, pendingMessages.size());
                   // Force connection closing so that messages can be 
re-transmitted in a new connection
                   cnx.channel().close();
                   return;
               } else if (sequenceId < op.sequenceId) {
                   // Ignoring the ack since it's referring to a message that 
has already timed out.
                   if (log.isDebugEnabled()) {
                       log.debug("[{}] [{}] Got ack for timed out msg. 
expecting: {} - {} - got: {} - {}", topic, producerName,
                               op.sequenceId, op.highestSequenceId, sequenceId, 
highestSequenceId);
                   }
                   return;
               } else {
                   // Add check `sequenceId >= highestSequenceId` for backward 
compatibility.
                   if (sequenceId >= highestSequenceId || highestSequenceId == 
op.highestSequenceId) {
                       // Message was persisted correctly
                       if (log.isDebugEnabled()) {
                           log.debug("[{}] [{}] Received ack for msg {} ", 
topic, producerName, sequenceId);
                       }
                       pendingMessages.remove();
                       releaseSemaphoreForSendOp(op);
                   } else {
                       log.warn("[{}] [{}] Got ack for batch msg error. 
expecting: {} - {} - got: {} - {} - queue-size: {}", topic, producerName,
                               op.sequenceId, op.highestSequenceId, sequenceId, 
highestSequenceId, pendingMessages.size());
                       // Force connection closing so that messages can be 
re-transmitted in a new connection
                       cnx.channel().close();
                       return;
                   }
               }
   ```
   
   In the previous processing, when the sequence ID is greater than the 
sequenceID of the response, we also close the current connection, expecting it 
to restore the logic here through internal retry.
   
   The current implementation of pr ignores the case of `pi.sequenceID 
<response.GetSequenceId()`. When `pi.sequenceID <response.GetSequenceId()`, we 
should return directly here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to