This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch handle_txn_of_chunk_message_reponse
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 66ebb3ad5e74bf87cdceddedadadaac488254fe7
Author: xiangying <[email protected]>
AuthorDate: Sat Jul 22 17:23:30 2023 +0800

    [fix][txn] Only do end send\ack operations for the last chunk message.
---
 pulsar/producer_partition.go | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 6bd9081..e38244b 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -984,7 +984,7 @@ func (p *partitionProducer) failTimeoutMessages() {
                                                sr.callback(nil, sr.msg, 
errSendTimeout)
                                        })
                                }
-                               if sr.transaction != nil {
+                               if (sr.totalChunks <= 1 || sr.chunkID == 
sr.totalChunks-1) && sr.transaction != nil {
                                        sr.transaction.endSendOrAckOp(nil)
                                }
                        }
@@ -1253,11 +1253,9 @@ func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt)
                                                sr.callback(msgID, sr.msg, nil)
                                        }
                                        
p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID)
+                                       sr.transaction.endSendOrAckOp(nil)
                                }
                        }
-                       if sr.transaction != nil {
-                               sr.transaction.endSendOrAckOp(nil)
-                       }
                }
 
                // Mark this pending item as done

Reply via email to