wolfstudy commented on a change in pull request #651:
URL: https://github.com/apache/pulsar-client-go/pull/651#discussion_r739988750
##########
File path: pulsar/producer_partition.go
##########
@@ -784,57 +784,57 @@ func (p *partitionProducer) ReceivedSendReceipt(response
*pb.CommandSendReceipt)
return
}
- if pi.sequenceID != response.GetSequenceId() {
+ if pi.sequenceID < response.GetSequenceId() {
Review comment:
@baomingyu The java logic as follows:
```
if (sequenceId > op.sequenceId) {
log.warn("[{}] [{}] Got ack for msg. expecting: {} - {} -
got: {} - {} - queue-size: {}", topic, producerName,
op.sequenceId, op.highestSequenceId, sequenceId,
highestSequenceId, pendingMessages.size());
// Force connection closing so that messages can be
re-transmitted in a new connection
cnx.channel().close();
return;
} else if (sequenceId < op.sequenceId) {
// Ignoring the ack since it's referring to a message that
has already timed out.
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Got ack for timed out msg.
expecting: {} - {} - got: {} - {}", topic, producerName,
op.sequenceId, op.highestSequenceId, sequenceId,
highestSequenceId);
}
return;
} else {
// Add check `sequenceId >= highestSequenceId` for backward
compatibility.
if (sequenceId >= highestSequenceId || highestSequenceId ==
op.highestSequenceId) {
// Message was persisted correctly
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received ack for msg {} ",
topic, producerName, sequenceId);
}
pendingMessages.remove();
releaseSemaphoreForSendOp(op);
} else {
log.warn("[{}] [{}] Got ack for batch msg error.
expecting: {} - {} - got: {} - {} - queue-size: {}", topic, producerName,
op.sequenceId, op.highestSequenceId, sequenceId,
highestSequenceId, pendingMessages.size());
// Force connection closing so that messages can be
re-transmitted in a new connection
cnx.channel().close();
return;
}
}
```
In the previous processing, when the sequence ID is greater than the
sequenceID of the response, we also close the current connection, expecting it
to restore the logic here through internal retry.
The current implementation of pr ignores the case of `pi.sequenceID
<response.GetSequenceId()`. When `pi.sequenceID <response.GetSequenceId()`, we
should return directly here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]