This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 05929da  [ISSUE #617] retrieve transactionid from property first (#620)
05929da is described below

commit 05929dace17200e60a9b1d7157453308956dce94
Author: tenhan <[email protected]>
AuthorDate: Tue Mar 16 20:22:28 2021 +0800

    [ISSUE #617] retrieve transactionid from property first (#620)
---
 primitive/message.go | 1 +
 producer/producer.go | 9 ++++++++-
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git a/primitive/message.go b/primitive/message.go
index fd7e9c6..b330dc1 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -59,6 +59,7 @@ const (
        PropertyTranscationCheckTimes          = "TRANSACTION_CHECK_TIMES"
        PropertyCheckImmunityTimeInSeconds     = 
"CHECK_IMMUNITY_TIME_IN_SECONDS"
        PropertyShardingKey                    = "SHARDING_KEY"
+       PropertyTransactionID                  = "__transactionId__"
 )
 
 type Message struct {
diff --git a/producer/producer.go b/producer/producer.go
index 65e39c2..910bb23 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -462,13 +462,20 @@ func (tp *transactionProducer) checkTransactionState() {
                        if uniqueKey == "" {
                                uniqueKey = callback.Msg.MsgId
                        }
+                       transactionId := 
callback.Msg.GetProperty(primitive.PropertyTransactionID)
+                       if transactionId == "" {
+                               transactionId = callback.Header.TransactionId
+                       }
+                       if transactionId == "" {
+                               transactionId = callback.Msg.TransactionId
+                       }
                        header := &internal.EndTransactionRequestHeader{
                                CommitLogOffset:      
callback.Header.CommitLogOffset,
                                ProducerGroup:        tp.producer.group,
                                TranStateTableOffset: 
callback.Header.TranStateTableOffset,
                                FromTransactionCheck: true,
                                MsgID:                uniqueKey,
-                               TransactionId:        
callback.Header.TransactionId,
+                               TransactionId:        transactionId,
                                CommitOrRollback:     
tp.transactionState(localTransactionState),
                        }
 

Reply via email to