This is an automated email from the ASF dual-hosted git repository.
wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 088ab71 change transaction producer about struct, interface, chan
type from struct to struct pointer (#373)
088ab71 is described below
commit 088ab716498a0efab35942be8918b02c21cc600a
Author: zZ <[email protected]>
AuthorDate: Tue Jan 7 11:55:15 2020 +0800
change transaction producer about struct, interface, chan type from struct
to struct pointer (#373)
---
examples/producer/transaction/main.go | 8 +++-----
internal/callback.go | 2 +-
internal/client.go | 4 ++--
primitive/message.go | 4 ++--
producer/producer.go | 16 +++++++++-------
5 files changed, 17 insertions(+), 17 deletions(-)
diff --git a/examples/producer/transaction/main.go
b/examples/producer/transaction/main.go
index 8b017db..1b5181e 100644
--- a/examples/producer/transaction/main.go
+++ b/examples/producer/transaction/main.go
@@ -42,7 +42,7 @@ func NewDemoListener() *DemoListener {
}
}
-func (dl *DemoListener) ExecuteLocalTransaction(msg primitive.Message)
primitive.LocalTransactionState {
+func (dl *DemoListener) ExecuteLocalTransaction(msg *primitive.Message)
primitive.LocalTransactionState {
nextIndex := atomic.AddInt32(&dl.transactionIndex, 1)
fmt.Printf("nextIndex: %v for transactionID: %v\n", nextIndex,
msg.TransactionId)
status := nextIndex % 3
@@ -52,8 +52,8 @@ func (dl *DemoListener) ExecuteLocalTransaction(msg
primitive.Message) primitive
return primitive.UnknowState
}
-func (dl *DemoListener) CheckLocalTransaction(msg primitive.MessageExt)
primitive.LocalTransactionState {
- fmt.Printf("msg transactionID : %v\n", msg.TransactionId)
+func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt)
primitive.LocalTransactionState {
+ fmt.Printf("%v msg transactionID : %v\n", time.Now(), msg.TransactionId)
v, existed := dl.localTrans.Load(msg.TransactionId)
if !existed {
fmt.Printf("unknow msg: %v, return Commit", msg)
@@ -74,8 +74,6 @@ func (dl *DemoListener) CheckLocalTransaction(msg
primitive.MessageExt) primitiv
fmt.Printf("checkLocalTransaction default COMMIT_MESSAGE:
%v\n", msg)
return primitive.CommitMessageState
}
-
- return primitive.UnknowState
}
func main() {
diff --git a/internal/callback.go b/internal/callback.go
index 2ff182c..761fe36 100644
--- a/internal/callback.go
+++ b/internal/callback.go
@@ -26,6 +26,6 @@ import (
// remotingClient callback TransactionProducer
type CheckTransactionStateCallback struct {
Addr net.Addr
- Msg primitive.MessageExt
+ Msg *primitive.MessageExt
Header CheckTransactionStateRequestHeader
}
diff --git a/internal/client.go b/internal/client.go
index 50fba22..acc25ea 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -212,9 +212,9 @@ func GetOrNewRocketMQClient(option ClientOptions,
callbackCh chan interface{}) R
rlog.Warning("producer group is not equal", nil)
return nil
}
- callback := CheckTransactionStateCallback{
+ callback := &CheckTransactionStateCallback{
Addr: addr,
- Msg: *msgExt,
+ Msg: msgExt,
Header: *header,
}
callbackCh <- callback
diff --git a/primitive/message.go b/primitive/message.go
index 8633dae..4693aa5 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -423,11 +423,11 @@ const (
type TransactionListener interface {
// When send transactional prepare(half) message succeed, this method
will be invoked to execute local transaction.
- ExecuteLocalTransaction(Message) LocalTransactionState
+ ExecuteLocalTransaction(*Message) LocalTransactionState
// When no response to prepare(half) message. broker will send check
message to check the transaction status, and this
// method will be invoked to get local transaction status.
- CheckLocalTransaction(MessageExt) LocalTransactionState
+ CheckLocalTransaction(*MessageExt) LocalTransactionState
}
type MessageID struct {
diff --git a/producer/producer.go b/producer/producer.go
index f82104f..8e0661f 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -421,7 +421,7 @@ func (tp *transactionProducer) Shutdown() error {
func (tp *transactionProducer) checkTransactionState() {
for ch := range tp.producer.callbackCh {
switch callback := ch.(type) {
- case internal.CheckTransactionStateCallback:
+ case *internal.CheckTransactionStateCallback:
localTransactionState :=
tp.listener.CheckLocalTransaction(callback.Msg)
uniqueKey :=
callback.Msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
if uniqueKey == "" {
@@ -442,11 +442,13 @@ func (tp *transactionProducer) checkTransactionState() {
err :=
tp.producer.client.InvokeOneWay(context.Background(), callback.Addr.String(),
req,
tp.producer.options.SendMsgTimeout)
- rlog.Error("send ReqENDTransaction to broker error",
map[string]interface{}{
- "callback":
callback.Addr.String(),
- "request": req.String(),
- rlog.LogKeyUnderlayError: err,
- })
+ if err != nil {
+ rlog.Error("send ReqENDTransaction to broker
error", map[string]interface{}{
+ "callback":
callback.Addr.String(),
+ "request": req.String(),
+ rlog.LogKeyUnderlayError: err,
+ })
+ }
default:
rlog.Error(fmt.Sprintf("unknown type %v", ch), nil)
}
@@ -471,7 +473,7 @@ func (tp *transactionProducer) SendMessageInTransaction(ctx
context.Context, msg
if len(transactionId) > 0 {
msg.TransactionId = transactionId
}
- localTransactionState =
tp.listener.ExecuteLocalTransaction(*msg)
+ localTransactionState = tp.listener.ExecuteLocalTransaction(msg)
if localTransactionState != primitive.CommitMessageState {
rlog.Error("executeLocalTransaction but state
unexpected", map[string]interface{}{
"localState": localTransactionState,