This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 088ab71  change transaction producer about struct, interface, chan 
type from struct to struct pointer (#373)
088ab71 is described below

commit 088ab716498a0efab35942be8918b02c21cc600a
Author: zZ <[email protected]>
AuthorDate: Tue Jan 7 11:55:15 2020 +0800

    change transaction producer about struct, interface, chan type from struct 
to struct pointer (#373)
---
 examples/producer/transaction/main.go |  8 +++-----
 internal/callback.go                  |  2 +-
 internal/client.go                    |  4 ++--
 primitive/message.go                  |  4 ++--
 producer/producer.go                  | 16 +++++++++-------
 5 files changed, 17 insertions(+), 17 deletions(-)

diff --git a/examples/producer/transaction/main.go 
b/examples/producer/transaction/main.go
index 8b017db..1b5181e 100644
--- a/examples/producer/transaction/main.go
+++ b/examples/producer/transaction/main.go
@@ -42,7 +42,7 @@ func NewDemoListener() *DemoListener {
        }
 }
 
-func (dl *DemoListener) ExecuteLocalTransaction(msg primitive.Message) 
primitive.LocalTransactionState {
+func (dl *DemoListener) ExecuteLocalTransaction(msg *primitive.Message) 
primitive.LocalTransactionState {
        nextIndex := atomic.AddInt32(&dl.transactionIndex, 1)
        fmt.Printf("nextIndex: %v for transactionID: %v\n", nextIndex, 
msg.TransactionId)
        status := nextIndex % 3
@@ -52,8 +52,8 @@ func (dl *DemoListener) ExecuteLocalTransaction(msg 
primitive.Message) primitive
        return primitive.UnknowState
 }
 
-func (dl *DemoListener) CheckLocalTransaction(msg primitive.MessageExt) 
primitive.LocalTransactionState {
-       fmt.Printf("msg transactionID : %v\n", msg.TransactionId)
+func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) 
primitive.LocalTransactionState {
+       fmt.Printf("%v msg transactionID : %v\n", time.Now(), msg.TransactionId)
        v, existed := dl.localTrans.Load(msg.TransactionId)
        if !existed {
                fmt.Printf("unknow msg: %v, return Commit", msg)
@@ -74,8 +74,6 @@ func (dl *DemoListener) CheckLocalTransaction(msg 
primitive.MessageExt) primitiv
                fmt.Printf("checkLocalTransaction default COMMIT_MESSAGE: 
%v\n", msg)
                return primitive.CommitMessageState
        }
-
-       return primitive.UnknowState
 }
 
 func main() {
diff --git a/internal/callback.go b/internal/callback.go
index 2ff182c..761fe36 100644
--- a/internal/callback.go
+++ b/internal/callback.go
@@ -26,6 +26,6 @@ import (
 // remotingClient callback TransactionProducer
 type CheckTransactionStateCallback struct {
        Addr   net.Addr
-       Msg    primitive.MessageExt
+       Msg    *primitive.MessageExt
        Header CheckTransactionStateRequestHeader
 }
diff --git a/internal/client.go b/internal/client.go
index 50fba22..acc25ea 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -212,9 +212,9 @@ func GetOrNewRocketMQClient(option ClientOptions, 
callbackCh chan interface{}) R
                                rlog.Warning("producer group is not equal", nil)
                                return nil
                        }
-                       callback := CheckTransactionStateCallback{
+                       callback := &CheckTransactionStateCallback{
                                Addr:   addr,
-                               Msg:    *msgExt,
+                               Msg:    msgExt,
                                Header: *header,
                        }
                        callbackCh <- callback
diff --git a/primitive/message.go b/primitive/message.go
index 8633dae..4693aa5 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -423,11 +423,11 @@ const (
 
 type TransactionListener interface {
        //  When send transactional prepare(half) message succeed, this method 
will be invoked to execute local transaction.
-       ExecuteLocalTransaction(Message) LocalTransactionState
+       ExecuteLocalTransaction(*Message) LocalTransactionState
 
        // When no response to prepare(half) message. broker will send check 
message to check the transaction status, and this
        // method will be invoked to get local transaction status.
-       CheckLocalTransaction(MessageExt) LocalTransactionState
+       CheckLocalTransaction(*MessageExt) LocalTransactionState
 }
 
 type MessageID struct {
diff --git a/producer/producer.go b/producer/producer.go
index f82104f..8e0661f 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -421,7 +421,7 @@ func (tp *transactionProducer) Shutdown() error {
 func (tp *transactionProducer) checkTransactionState() {
        for ch := range tp.producer.callbackCh {
                switch callback := ch.(type) {
-               case internal.CheckTransactionStateCallback:
+               case *internal.CheckTransactionStateCallback:
                        localTransactionState := 
tp.listener.CheckLocalTransaction(callback.Msg)
                        uniqueKey := 
callback.Msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
                        if uniqueKey == "" {
@@ -442,11 +442,13 @@ func (tp *transactionProducer) checkTransactionState() {
 
                        err := 
tp.producer.client.InvokeOneWay(context.Background(), callback.Addr.String(), 
req,
                                tp.producer.options.SendMsgTimeout)
-                       rlog.Error("send ReqENDTransaction to broker error", 
map[string]interface{}{
-                               "callback":               
callback.Addr.String(),
-                               "request":                req.String(),
-                               rlog.LogKeyUnderlayError: err,
-                       })
+                       if err != nil {
+                               rlog.Error("send ReqENDTransaction to broker 
error", map[string]interface{}{
+                                       "callback":               
callback.Addr.String(),
+                                       "request":                req.String(),
+                                       rlog.LogKeyUnderlayError: err,
+                               })
+                       }
                default:
                        rlog.Error(fmt.Sprintf("unknown type %v", ch), nil)
                }
@@ -471,7 +473,7 @@ func (tp *transactionProducer) SendMessageInTransaction(ctx 
context.Context, msg
                if len(transactionId) > 0 {
                        msg.TransactionId = transactionId
                }
-               localTransactionState = 
tp.listener.ExecuteLocalTransaction(*msg)
+               localTransactionState = tp.listener.ExecuteLocalTransaction(msg)
                if localTransactionState != primitive.CommitMessageState {
                        rlog.Error("executeLocalTransaction but state 
unexpected", map[string]interface{}{
                                "localState": localTransactionState,

Reply via email to