This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch handle_txn_of_chunk_message_reponse in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 66ebb3ad5e74bf87cdceddedadadaac488254fe7 Author: xiangying <[email protected]> AuthorDate: Sat Jul 22 17:23:30 2023 +0800 [fix][txn] Only do end send\ack operations for the last chunk message. --- pulsar/producer_partition.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 6bd9081..e38244b 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -984,7 +984,7 @@ func (p *partitionProducer) failTimeoutMessages() { sr.callback(nil, sr.msg, errSendTimeout) }) } - if sr.transaction != nil { + if (sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1) && sr.transaction != nil { sr.transaction.endSendOrAckOp(nil) } } @@ -1253,11 +1253,9 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) sr.callback(msgID, sr.msg, nil) } p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID) + sr.transaction.endSendOrAckOp(nil) } } - if sr.transaction != nil { - sr.transaction.endSendOrAckOp(nil) - } } // Mark this pending item as done
