This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 09ccdd4  add trace feature. resolve #124 (#125)
09ccdd4 is described below

commit 09ccdd463d6c8b1ab6497abf95b7e187be83bc46
Author: xujianhai666 <[email protected]>
AuthorDate: Thu Jul 25 16:20:51 2019 +0800

    add trace feature. resolve #124 (#125)
---
 consumer/interceptor.go         | 109 +++++++++
 consumer/push_consumer.go       |  20 +-
 examples/consumer/trace/main.go |  57 +++++
 examples/producer/trace/main.go |  65 ++++++
 go.mod                          |   1 +
 go.sum                          |   9 +
 internal/client.go              |   2 +-
 internal/constants.go           |   1 +
 internal/route.go               |   2 +-
 internal/trace.go               | 482 ++++++++++++++++++++++++++++++++++++++++
 internal/trace_test.go          | 125 +++++++++++
 internal/utils/helper.go        |  52 -----
 internal/utils/helper_test.go   |  33 ---
 internal/utils/net.go           |  24 +-
 internal/utils/net_test.go      |   2 +-
 primitive/ctx.go                |  65 +++++-
 primitive/interceptor.go        |   6 +
 primitive/message.go            |  44 ++++
 primitive/result.go             |  12 +-
 primitive/result_test.go        |  18 +-
 producer/interceptor.go         |  97 ++++++++
 producer/producer.go            |  17 ++
 22 files changed, 1134 insertions(+), 109 deletions(-)

diff --git a/consumer/interceptor.go b/consumer/interceptor.go
new file mode 100644
index 0000000..260ac6f
--- /dev/null
+++ b/consumer/interceptor.go
@@ -0,0 +1,109 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+       "context"
+       "time"
+
+       "github.com/apache/rocketmq-client-go/internal"
+       "github.com/apache/rocketmq-client-go/internal/utils"
+       "github.com/apache/rocketmq-client-go/primitive"
+)
+
+// WithTrace support rocketmq trace: 
https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace. 
+func WithTrace(traceCfg primitive.TraceConfig) Option {
+       return func(options *consumerOptions) {
+
+               ori := options.Interceptors
+               options.Interceptors = make([]primitive.Interceptor, 0)
+               options.Interceptors = append(options.Interceptors, 
newTraceInterceptor(traceCfg))
+               options.Interceptors = append(options.Interceptors, ori...)
+       }
+}
+
+func newTraceInterceptor(traceCfg primitive.TraceConfig) primitive.Interceptor 
{
+       dispatcher := internal.NewTraceDispatcher(traceCfg.TraceTopic, 
traceCfg.Access)
+       dispatcher.Start()
+
+       return func(ctx context.Context, req, reply interface{}, next 
primitive.Invoker) error {
+               consumerCtx, exist := primitive.GetConsumerCtx(ctx)
+               if !exist || len(consumerCtx.Msgs) == 0 {
+                       return next(ctx, req, reply)
+               }
+
+               beginT := time.Now()
+               // before traceCtx
+               traceCx := internal.TraceContext{
+                       RequestId: internal.CreateUniqID(),
+                       TimeStamp: time.Now().UnixNano() / 
int64(time.Millisecond),
+                       TraceType: internal.SubBefore,
+                       GroupName: consumerCtx.ConsumerGroup,
+                       IsSuccess: true,
+               }
+               beans := make([]internal.TraceBean, 0)
+               for _, msg := range consumerCtx.Msgs {
+                       if msg == nil {
+                               continue
+                       }
+                       regionID := msg.GetRegionID()
+                       traceOn := msg.IsTraceOn()
+                       if traceOn == "false" {
+                               continue
+                       }
+                       bean := internal.TraceBean{
+                               Topic:      msg.Topic,
+                               MsgId:      msg.MsgId,
+                               Tags:       msg.GetTags(),
+                               Keys:       msg.GetKeys(),
+                               StoreTime:  msg.StoreTimestamp,
+                               BodyLength: int(msg.StoreSize),
+                               RetryTimes: int(msg.ReconsumeTimes),
+                               ClientHost: utils.LocalIP,
+                               StoreHost:  utils.LocalIP,
+                       }
+                       beans = append(beans, bean)
+                       traceCx.RegionId = regionID
+               }
+               if len(beans) > 0 {
+                       traceCx.TraceBeans = beans
+                       traceCx.TimeStamp = time.Now().UnixNano() / 
int64(time.Millisecond)
+                       dispatcher.Append(traceCx)
+               }
+
+               err := next(ctx, req, reply)
+
+               // after traceCtx
+               costTime := time.Since(beginT).Nanoseconds() / 
int64(time.Millisecond)
+               ctxType := consumerCtx.Properties[primitive.PropCtxType]
+               afterCtx := internal.TraceContext{
+                       TimeStamp: time.Now().UnixNano() / 
int64(time.Millisecond),
+
+                       TraceType:   internal.SubAfter,
+                       RegionId:    traceCx.RegionId,
+                       GroupName:   traceCx.GroupName,
+                       RequestId:   traceCx.RequestId,
+                       IsSuccess:   consumerCtx.Success,
+                       CostTime:    costTime,
+                       TraceBeans:  traceCx.TraceBeans,
+                       ContextCode: 
primitive.ConsumeReturnType(ctxType).Ordinal(),
+               }
+               dispatcher.Append(afterCtx)
+               return err
+       }
+}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 3b011c3..7b4d65e 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -640,6 +640,9 @@ func (pc *pushConsumer) consumeInner(ctx context.Context, 
subMsgs []*primitive.M
 
                        realReply := reply.(*ConsumeResultHolder)
                        realReply.ConsumeResult = r
+
+                       msgCtx, _ := primitive.GetConsumerCtx(ctx)
+                       msgCtx.Success = realReply.ConsumeResult == 
ConsumeSuccess
                        return e
                })
                return container.ConsumeResult, err
@@ -694,6 +697,9 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                        var err error
                        msgCtx := &primitive.ConsumeMessageContext{
                                Properties: make(map[string]string),
+                               ConsumerGroup: pc.consumerGroup,
+                               MQ: mq,
+                               Msgs: msgs,
                        }
                        ctx := context.Background()
                        ctx = primitive.WithConsumerCtx(ctx, msgCtx)
@@ -706,16 +712,15 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
 
                        consumeRT := time.Now().Sub(beginTime)
                        if err != nil {
-                               msgCtx.Properties["ConsumeContextType"] = 
"EXCEPTION"
+                               msgCtx.Properties[primitive.PropCtxType] = 
string(primitive.ExceptionRetrun)
                        } else if consumeRT >= pc.option.ConsumeTimeout {
-                               msgCtx.Properties["ConsumeContextType"] = 
"TIMEOUT"
+                               msgCtx.Properties[primitive.PropCtxType] = 
string(primitive.TimeoutReturn)
                        } else if result == ConsumeSuccess {
-                               msgCtx.Properties["ConsumeContextType"] = 
"SUCCESS"
-                       } else {
-                               msgCtx.Properties["ConsumeContextType"] = 
"RECONSUME_LATER"
+                               msgCtx.Properties[primitive.PropCtxType] = 
string(primitive.SuccessReturn)
+                       } else if result == ConsumeRetryLater{
+                               msgCtx.Properties[primitive.PropCtxType] = 
string(primitive.FailedReturn)
                        }
 
-                       // TODO hook
                        increaseConsumeRT(pc.consumerGroup, mq.Topic, 
int64(consumeRT/time.Millisecond))
 
                        if !pq.dropped {
@@ -808,6 +813,9 @@ func (pc *pushConsumer) consumeMessageOrderly(pq 
*processQueue, mq *primitive.Me
                        ctx := context.Background()
                        msgCtx := &primitive.ConsumeMessageContext{
                                Properties: make(map[string]string),
+                               ConsumerGroup: pc.consumerGroup,
+                               MQ: mq,
+                               Msgs: msgs,
                        }
                        ctx = primitive.WithConsumerCtx(ctx, msgCtx)
                        ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)
diff --git a/examples/consumer/trace/main.go b/examples/consumer/trace/main.go
new file mode 100644
index 0000000..9102175
--- /dev/null
+++ b/examples/consumer/trace/main.go
@@ -0,0 +1,57 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "time"
+
+       "github.com/apache/rocketmq-client-go"
+       "github.com/apache/rocketmq-client-go/consumer"
+       "github.com/apache/rocketmq-client-go/primitive"
+)
+
+func main() {
+       namesrvs := []string{"127.0.0.1:9876"}
+       traceCfg := primitive.TraceConfig{
+               Access: primitive.Local,
+       }
+
+       c, _ := rocketmq.NewPushConsumer(
+               consumer.WithGroupName("testGroup"),
+               consumer.WithNameServer(namesrvs),
+               consumer.WithTrace(traceCfg),
+       )
+       err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx 
context.Context,
+               msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
+               fmt.Printf("subscribe callback: %v \n", msgs)
+               return consumer.ConsumeSuccess, nil
+       })
+       if err != nil {
+               fmt.Println(err.Error())
+       }
+       // Note: start after subscribe
+       err = c.Start()
+       if err != nil {
+               fmt.Println(err.Error())
+               os.Exit(-1)
+       }
+       time.Sleep(time.Hour)
+}
diff --git a/examples/producer/trace/main.go b/examples/producer/trace/main.go
new file mode 100644
index 0000000..266c783
--- /dev/null
+++ b/examples/producer/trace/main.go
@@ -0,0 +1,65 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "time"
+
+       "github.com/apache/rocketmq-client-go"
+       "github.com/apache/rocketmq-client-go/primitive"
+       "github.com/apache/rocketmq-client-go/producer"
+)
+
+func main() {
+       namesrvs := []string{"127.0.0.1:9876"}
+       traceCfg := primitive.TraceConfig{
+               Access:   primitive.Local,
+       }
+
+       p, _ := rocketmq.NewProducer(
+               producer.WithNameServer(namesrvs),
+               producer.WithRetry(2),
+               producer.WithTrace(traceCfg))
+       err := p.Start()
+       if err != nil {
+               fmt.Printf("start producer error: %s", err.Error())
+               os.Exit(1)
+       }
+       for i := 0; i < 1; i++ {
+               res, err := p.SendSync(context.Background(), &primitive.Message{
+                       Topic: "TopicTest",
+                       Body:  []byte("Hello RocketMQ Go Client!"),
+               })
+
+               if err != nil {
+                       fmt.Printf("send message error: %s\n", err)
+               } else {
+                       fmt.Printf("send message success: result=%s\n", 
res.String())
+               }
+       }
+
+       time.Sleep(10 * time.Second)
+
+       err = p.Shutdown()
+       if err != nil {
+               fmt.Printf("shundown producer error: %s", err.Error())
+       }
+}
diff --git a/go.mod b/go.mod
index 322c847..e1b0d88 100644
--- a/go.mod
+++ b/go.mod
@@ -7,6 +7,7 @@ require (
        github.com/golang/mock v1.3.1
        github.com/pkg/errors v0.8.1
        github.com/sirupsen/logrus v1.4.1
+       github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945
        github.com/stretchr/testify v1.3.0
        github.com/tidwall/gjson v1.2.1
        github.com/tidwall/match v1.0.1 // indirect
diff --git a/go.sum b/go.sum
index 06d0d34..cc4c4b6 100644
--- a/go.sum
+++ b/go.sum
@@ -5,6 +5,10 @@ github.com/emirpasic/gods v1.12.0 
h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg
 github.com/emirpasic/gods v1.12.0/go.mod 
h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
 github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s=
 github.com/golang/mock v1.3.1/go.mod 
h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
+github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 
h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
+github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod 
h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
+github.com/jtolds/gls v4.20.0+incompatible 
h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
+github.com/jtolds/gls v4.20.0+incompatible/go.mod 
h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1 
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod 
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
@@ -13,6 +17,10 @@ github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
 github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/sirupsen/logrus v1.4.1 
h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
 github.com/sirupsen/logrus v1.4.1/go.mod 
h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
+github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d 
h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
+github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod 
h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
+github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 
h1:N8Bg45zpk/UcpNGnfJt2y/3lRWASHNTUET8owPYCgYI=
+github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945/go.mod 
h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
 github.com/stretchr/objx v0.1.0/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.1.1/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/testify v1.2.2/go.mod 
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
@@ -32,4 +40,5 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod 
h1:STP8DvDyc/dI5b8T5h
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a 
h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod 
h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
 golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod 
h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
diff --git a/internal/client.go b/internal/client.go
index 558c28d..d58efc4 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -88,7 +88,7 @@ func DefaultClientOptions() ClientOptions {
        opts := ClientOptions{
                InstanceName: "DEFAULT",
                RetryTimes:   3,
-               ClientIP:     utils.LocalIP(),
+               ClientIP:     utils.LocalIP,
        }
        return opts
 }
diff --git a/internal/constants.go b/internal/constants.go
index a234c18..e2e911f 100644
--- a/internal/constants.go
+++ b/internal/constants.go
@@ -21,6 +21,7 @@ const (
        RetryGroupTopicPrefix    = "%RETRY%"
        DefaultConsumerGroup     = "DEFAULT_CONSUMER"
        ClientInnerProducerGroup = "CLIENT_INNER_PRODUCER"
+       SystemTopicPrefix        = "rmq_sys_"
 )
 
 func GetRetryTopic(group string) string {
diff --git a/internal/route.go b/internal/route.go
index 8b1e713..6053fd8 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -275,7 +275,7 @@ func FetchPublishMessageQueues(topic string) 
([]*primitive.MessageQueue, error)
        if !exist {
                routeData, err = queryTopicRouteInfoFromServer(topic)
                if err != nil {
-                       rlog.Error("queryTopicRouteInfoFromServer failed. 
topic: %v", topic)
+                       rlog.Error("queryTopicRouteInfoFromServer failed. 
topic: ", topic)
                        return nil, err
                }
                routeDataMap.Store(topic, routeData)
diff --git a/internal/trace.go b/internal/trace.go
new file mode 100644
index 0000000..dc3b246
--- /dev/null
+++ b/internal/trace.go
@@ -0,0 +1,482 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package internal
+
+import (
+       "bytes"
+       "context"
+       "encoding/binary"
+       "encoding/hex"
+       "fmt"
+       "os"
+       "runtime"
+       "strconv"
+       "strings"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/apache/rocketmq-client-go/internal/remote"
+       "github.com/apache/rocketmq-client-go/internal/utils"
+       "github.com/apache/rocketmq-client-go/primitive"
+       "github.com/apache/rocketmq-client-go/rlog"
+)
+
+var (
+       counter        int16 = 0
+       startTimestamp int64 = 0
+       nextTimestamp  int64 = 0
+       prefix         string
+       locker         sync.Mutex
+       classLoadId    int32 = 0
+)
+
+func init() {
+       buf := new(bytes.Buffer)
+
+       ip, err := utils.ClientIP4()
+       if err != nil {
+               ip = utils.FakeIP()
+       }
+       _, _ = buf.Write(ip)
+       _ = binary.Write(buf, binary.BigEndian, Pid())
+       _ = binary.Write(buf, binary.BigEndian, classLoadId)
+       prefix = strings.ToUpper(hex.EncodeToString(buf.Bytes()))
+}
+
+func CreateUniqID() string {
+       locker.Lock()
+       defer locker.Unlock()
+
+       if time.Now().Unix() > nextTimestamp {
+               updateTimestamp()
+       }
+       counter++
+       buf := new(bytes.Buffer)
+       _ = binary.Write(buf, binary.BigEndian, 
int32((time.Now().Unix()-startTimestamp)*1000))
+       _ = binary.Write(buf, binary.BigEndian, counter)
+
+       return prefix + hex.EncodeToString(buf.Bytes())
+}
+
+func updateTimestamp() {
+       year, month := time.Now().Year(), time.Now().Month()
+       startTimestamp = time.Date(year, month, 1, 0, 0, 0, 0, 
time.Local).Unix()
+       nextTimestamp = time.Date(year, month, 1, 0, 0, 0, 0, 
time.Local).AddDate(0, 1, 0).Unix()
+}
+
+func Pid() int16 {
+       return int16(os.Getpid())
+}
+
+type TraceBean struct {
+       Topic       string
+       MsgId       string
+       OffsetMsgId string
+       Tags        string
+       Keys        string
+       StoreHost   string
+       ClientHost  string
+       StoreTime   int64
+       RetryTimes  int
+       BodyLength  int
+       MsgType     primitive.MessageType
+}
+
+type TraceTransferBean struct {
+       transData string
+       // not duplicate
+       transKey []string
+}
+
+type TraceType string
+
+const (
+       Pub       TraceType = "Pub"
+       SubBefore TraceType = "SubBefore"
+       SubAfter  TraceType = "SubAfter"
+
+       contentSplitter = '\001'
+       fieldSplitter   = '\002'
+)
+
+type TraceContext struct {
+       TraceType   TraceType
+       TimeStamp   int64
+       RegionId    string
+       RegionName  string
+       GroupName   string
+       CostTime    int64
+       IsSuccess   bool
+       RequestId   string
+       ContextCode int
+       TraceBeans  []TraceBean
+}
+
+func (ctx *TraceContext) marshal2Bean() *TraceTransferBean {
+       buffer := bytes.NewBufferString("")
+       switch ctx.TraceType {
+       case Pub:
+               bean := ctx.TraceBeans[0]
+               buffer.WriteString(string(ctx.TraceType))
+               buffer.WriteRune(contentSplitter)
+               buffer.WriteString(strconv.FormatInt(ctx.TimeStamp, 10))
+               buffer.WriteRune(contentSplitter)
+               buffer.WriteString(ctx.RegionId)
+               buffer.WriteRune(contentSplitter)
+               buffer.WriteString(ctx.GroupName)
+               buffer.WriteRune(contentSplitter)
+               buffer.WriteString(bean.Topic)
+               buffer.WriteRune(contentSplitter)
+               buffer.WriteString(bean.MsgId)
+               buffer.WriteRune(contentSplitter)
+               buffer.WriteString(bean.Tags)
+               buffer.WriteRune(contentSplitter)
+               buffer.WriteString(bean.Keys)
+               buffer.WriteRune(contentSplitter)
+               buffer.WriteString(bean.StoreHost)
+               buffer.WriteRune(contentSplitter)
+               buffer.WriteString(strconv.Itoa(bean.BodyLength))
+               buffer.WriteRune(contentSplitter)
+               buffer.WriteString(strconv.FormatInt(ctx.CostTime, 10))
+               buffer.WriteRune(contentSplitter)
+               buffer.WriteString(strconv.Itoa(int(bean.MsgType)))
+               buffer.WriteRune(contentSplitter)
+               buffer.WriteString(bean.OffsetMsgId)
+               buffer.WriteRune(contentSplitter)
+               buffer.WriteString(strconv.FormatBool(ctx.IsSuccess))
+               buffer.WriteRune(fieldSplitter)
+       case SubBefore:
+               for _, bean := range ctx.TraceBeans {
+                       buffer.WriteString(string(ctx.TraceType))
+                       buffer.WriteRune(contentSplitter)
+                       buffer.WriteString(strconv.FormatInt(ctx.TimeStamp, 10))
+                       buffer.WriteRune(contentSplitter)
+                       buffer.WriteString(ctx.RegionId)
+                       buffer.WriteRune(contentSplitter)
+                       buffer.WriteString(ctx.GroupName)
+                       buffer.WriteRune(contentSplitter)
+                       buffer.WriteString(ctx.RequestId)
+                       buffer.WriteRune(contentSplitter)
+                       buffer.WriteString(bean.MsgId)
+                       buffer.WriteRune(contentSplitter)
+                       buffer.WriteString(strconv.Itoa(bean.RetryTimes))
+                       buffer.WriteRune(contentSplitter)
+                       buffer.WriteString(nullWrap(bean.Keys))
+                       buffer.WriteRune(fieldSplitter)
+               }
+       case SubAfter:
+               for _, bean := range ctx.TraceBeans {
+                       buffer.WriteString(string(ctx.TraceType))
+                       buffer.WriteRune(contentSplitter)
+                       buffer.WriteString(ctx.RequestId)
+                       buffer.WriteRune(contentSplitter)
+                       buffer.WriteString(bean.MsgId)
+                       buffer.WriteRune(contentSplitter)
+                       buffer.WriteString(strconv.FormatInt(ctx.CostTime, 10))
+                       buffer.WriteRune(contentSplitter)
+                       buffer.WriteString(strconv.FormatBool(ctx.IsSuccess))
+                       buffer.WriteRune(contentSplitter)
+                       buffer.WriteString(nullWrap(bean.Keys))
+                       buffer.WriteRune(contentSplitter)
+                       buffer.WriteString(strconv.Itoa(ctx.ContextCode))
+                       buffer.WriteRune(fieldSplitter)
+               }
+       }
+       transferBean := new(TraceTransferBean)
+       transferBean.transData = buffer.String()
+       for _, bean := range ctx.TraceBeans {
+               transferBean.transKey = append(transferBean.transKey, 
bean.MsgId)
+               if len(bean.Keys) > 0 {
+                       transferBean.transKey = append(transferBean.transKey, 
bean.Keys)
+               }
+       }
+       return transferBean
+}
+
+// compatible with java console.
+func nullWrap(s string) string {
+       if len(s) == 0 {
+               return "null"
+       }
+       return s
+}
+
+type traceDispatcherType int
+
+const (
+       RmqSysTraceTopic = "RMQ_SYS_TRACE_TOPIC"
+
+       ProducerType traceDispatcherType = iota
+       ConsumerType
+
+       maxMsgSize = 128000 - 10*1000
+       batchSize  = 100
+
+       TraceTopicPrefix = SystemTopicPrefix + "TRACE_DATA_"
+       TraceGroupName   = "_INNER_TRACE_PRODUCER"
+)
+
+type TraceDispatcher interface {
+       GetTraceTopicName() string
+
+       Start()
+       Append(ctx TraceContext) bool
+       Close()
+}
+
+type traceDispatcher struct {
+       ctx     context.Context
+       cancel  context.CancelFunc
+       running bool
+
+       traceTopic string
+       access     primitive.AccessChannel
+
+       ticker  *time.Ticker
+       input   chan TraceContext
+       batchCh chan []*TraceContext
+
+       discardCount int64
+
+       // support deliver trace message to other cluster.
+       namesrvs []string
+       // round robin index
+       rrindex int32
+       cli     RMQClient
+}
+
+func NewTraceDispatcher(traceTopic string, access primitive.AccessChannel) 
*traceDispatcher {
+       ctx := context.Background()
+       ctx, cancel := context.WithCancel(ctx)
+
+       t := traceTopic
+       if len(t) == 0 {
+               t = RmqSysTraceTopic
+       }
+
+       if access == primitive.Cloud {
+               t = TraceTopicPrefix + traceTopic
+       }
+
+       cliOp := DefaultClientOptions()
+       cliOp.RetryTimes = 0
+       cli := GetOrNewRocketMQClient(cliOp)
+       return &traceDispatcher{
+               ctx:    ctx,
+               cancel: cancel,
+
+               traceTopic: t,
+               access:     access,
+               input:      make(chan TraceContext, 1024),
+               batchCh:    make(chan []*TraceContext, 2048),
+               cli:        cli,
+       }
+}
+
+func (td *traceDispatcher) GetTraceTopicName() string {
+       return td.traceTopic
+}
+
+func (td *traceDispatcher) Start() {
+       td.running = true
+       td.cli.Start()
+       go td.process()
+}
+
+func (td *traceDispatcher) Close() {
+       td.running = false
+       td.ticker.Stop()
+       td.cancel()
+}
+
+func (td *traceDispatcher) Append(ctx TraceContext) bool {
+       if !td.running {
+               rlog.Error("traceDispatcher is closed.")
+               return false
+       }
+       select {
+       case td.input <- ctx:
+               return true
+       default:
+               rlog.Warnf("buffer full: %d, ctx is %v", 
atomic.AddInt64(&td.discardCount, 1), ctx)
+               return false
+       }
+}
+
+// process
+func (td *traceDispatcher) process() {
+       var count int
+       var batch []TraceContext
+       maxWaitDuration := 5 * time.Millisecond
+       maxWaitTime := maxWaitDuration.Nanoseconds()
+       td.ticker = time.NewTicker(maxWaitDuration)
+       lastput := time.Now()
+       for {
+               select {
+               case ctx := <-td.input:
+                       count++
+                       lastput = time.Now()
+                       batch = append(batch, ctx)
+                       if count == batchSize {
+                               count = 0
+                               go td.batchCommit(batch)
+                               batch = make([]TraceContext, 0)
+                       }
+               case <-td.ticker.C:
+                       delta := time.Since(lastput).Nanoseconds()
+                       if delta > maxWaitTime {
+                               count++
+                               lastput = time.Now()
+                               if len(batch) > 0 {
+                                       go td.batchCommit(batch)
+                                       batch = make([]TraceContext, 0)
+                               }
+                       }
+               case <-td.ctx.Done():
+                       go td.batchCommit(batch)
+                       batch = make([]TraceContext, 0)
+
+                       now := time.Now().UnixNano() / int64(time.Millisecond)
+                       end := now + 500
+                       for now < end {
+                               now = time.Now().UnixNano() / 
int64(time.Millisecond)
+                               runtime.Gosched()
+                       }
+                       rlog.Infof("------end trace send %v %v", td.input, 
td.batchCh)
+               }
+       }
+}
+
+// batchCommit commit slice of TraceContext. convert the ctxs to keyed 
pair(key is Topic + regionid).
+// flush according key one by one.
+func (td *traceDispatcher) batchCommit(ctxs []TraceContext) {
+       keyedCtxs := make(map[string][]TraceTransferBean)
+       for _, ctx := range ctxs {
+               if len(ctx.TraceBeans) == 0 {
+                       return
+               }
+               topic := ctx.TraceBeans[0].Topic
+               regionID := ctx.RegionId
+               key := topic
+               if len(regionID) > 0 {
+                       key = fmt.Sprintf("%s%c%s", topic, contentSplitter, 
regionID)
+               }
+               keyedCtxs[key] = append(keyedCtxs[key], *ctx.marshal2Bean())
+       }
+
+       for k, v := range keyedCtxs {
+               arr := strings.Split(k, string([]byte{contentSplitter}))
+               topic := k
+               regionID := ""
+               if len(arr) > 1 {
+                       topic = arr[0]
+                       regionID = arr[1]
+               }
+               td.flush(topic, regionID, v)
+       }
+}
+
+type Keyset map[string]struct{}
+
+func (ks Keyset) slice() []string {
+       slice := make([]string, len(ks))
+       for k, _ := range ks {
+               slice = append(slice, k)
+       }
+       return slice
+}
+
+// flush data in batch.
+func (td *traceDispatcher) flush(topic, regionID string, data 
[]TraceTransferBean) {
+       if len(data) == 0 {
+               return
+       }
+
+       keyset := make(Keyset)
+       var builder strings.Builder
+       flushed := true
+       for _, bean := range data {
+               for _, k := range bean.transKey {
+                       keyset[k] = struct{}{}
+               }
+               builder.WriteString(bean.transData)
+               flushed = false
+
+               if builder.Len() > maxMsgSize {
+                       td.sendTraceDataByMQ(keyset, regionID, builder.String())
+                       builder.Reset()
+                       keyset = make(Keyset)
+                       flushed = true
+               }
+       }
+       if !flushed {
+               td.sendTraceDataByMQ(keyset, regionID, builder.String())
+       }
+}
+
+func (td *traceDispatcher) sendTraceDataByMQ(keyset Keyset, regionID string, 
data string) {
+       msg := primitive.NewMessage(td.traceTopic, []byte(data))
+       msg.SetKeys(keyset.slice())
+
+       mq, addr := td.findMq()
+       if mq == nil {
+               return
+       }
+
+       var req = td.buildSendRequest(mq, msg)
+       td.cli.InvokeAsync(addr, req, 5000*time.Millisecond, func(command 
*remote.RemotingCommand, e error) {
+               if e != nil {
+                       rlog.Error("send trace data ,the traceData is %v", data)
+               }
+       })
+}
+
+func (td *traceDispatcher) findMq() (*primitive.MessageQueue, string) {
+       mqs, err := FetchPublishMessageQueues(td.traceTopic)
+       if err != nil {
+               rlog.Error("fetch publish message queues failed. err: %v", err)
+               return nil, ""
+       }
+       i := atomic.AddInt32(&td.rrindex, 1)
+       if i < 0 {
+               i = 0
+               atomic.StoreInt32(&td.rrindex, 0)
+       }
+       i %= int32(len(mqs))
+       mq := mqs[i]
+
+       brokerName := mq.BrokerName
+       addr := FindBrokerAddrByName(brokerName)
+
+       return mq, addr
+}
+
+func (td *traceDispatcher) buildSendRequest(mq *primitive.MessageQueue,
+       msg *primitive.Message) *remote.RemotingCommand {
+       req := &SendMessageRequest{
+               ProducerGroup: TraceGroupName,
+               Topic:         mq.Topic,
+               QueueId:       mq.QueueId,
+               BornTimestamp: time.Now().UnixNano() / int64(time.Millisecond),
+               Flag:          msg.Flag,
+               Properties:    primitive.MarshalPropeties(msg.Properties),
+       }
+
+       return remote.NewRemotingCommand(ReqSendMessage, req, msg.Body)
+}
diff --git a/internal/trace_test.go b/internal/trace_test.go
new file mode 100644
index 0000000..23808a3
--- /dev/null
+++ b/internal/trace_test.go
@@ -0,0 +1,125 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package internal
+
+import (
+       "testing"
+
+       "github.com/apache/rocketmq-client-go/primitive"
+       . "github.com/smartystreets/goconvey/convey"
+       "github.com/stretchr/testify/assert"
+)
+
+func TestMarshal2Bean(t *testing.T) {
+
+       Convey("marshal of TraceContext", t, func() {
+
+               Convey("When marshal producer trace data", func() {
+                       traceCtx := TraceContext{
+                               TraceType: Pub,
+                               TimeStamp: 1563780533299,
+                               RegionId:  "DefaultRegion",
+                               GroupName: "ProducerGroupName",
+                               CostTime:  3572,
+                               IsSuccess: true,
+                               RequestId: "0A5DE93A815518B4AAC26F77F8330001",
+                               TraceBeans: []TraceBean{
+                                       {
+                                               Topic:       "TopicTest",
+                                               MsgId:       
"0A5DE93A833B18B4AAC26F842A2F0000",
+                                               OffsetMsgId: 
"0A5DE93A00002A9F000000000042E322",
+                                               Tags:        "TagA",
+                                               Keys:        "OrderID1882",
+                                               StoreHost:   
"10.93.233.58:10911",
+                                               ClientHost:  "10.93.233.58",
+                                               StoreTime:   1563780535085,
+                                               BodyLength:  11,
+                                               MsgType:     
primitive.NormalMsg,
+                                       },
+                               },
+                       }
+                       bean := traceCtx.marshal2Bean()
+                       assert.Equal(t, 
"Pub1563780533299DefaultRegionProducerGroupNameTopicTest0A5DE93A833B18B4AAC26F842A2F0000TagAOrderID188210.93.233.58:1091111357200A5DE93A00002A9F000000000042E322true\x02",
+                               bean.transData)
+                       assert.Equal(t, 
[]string{"0A5DE93A833B18B4AAC26F842A2F0000", "OrderID1882"}, bean.transKey)
+
+                       // consumer before test
+                       traceCtx = TraceContext{
+                               TraceType: SubBefore,
+                               TimeStamp: 1563789119096,
+                               GroupName: "CID_JODIE_1",
+                               IsSuccess: true,
+                               RequestId: "0A5DE93A96A818B4AAC26FFAFA780007",
+                               TraceBeans: []TraceBean{
+                                       {
+                                               Topic:      "TopicTest",
+                                               MsgId:      
"0A5DE93A973418B4AAC26FFAFA5A0000",
+                                               Tags:       "TagA",
+                                               Keys:       "OrderID1882",
+                                               StoreHost:  "10.93.233.58",
+                                               ClientHost: "10.93.233.58",
+                                               StoreTime:  1563789119092,
+                                               BodyLength: 190,
+                                       },
+                               },
+                       }
+                       bean = traceCtx.marshal2Bean()
+
+                       Convey("transData should equal to expected", func() {
+                               So(bean.transData, ShouldEqual, 
"SubBefore1563789119096CID_JODIE_10A5DE93A96A818B4AAC26FFAFA7800070A5DE93A973418B4AAC26FFAFA5A00000OrderID1882")
+                       })
+
+                       Convey("transkey should equal to expected", func() {
+                               expectedKey := 
[]string{"0A5DE93A973418B4AAC26FFAFA5A0000", "OrderID1882"}
+                               So(bean.transKey[0], ShouldEqual, 
expectedKey[0])
+                               So(bean.transKey[1], ShouldEqual, 
expectedKey[1])
+                       })
+               })
+
+               Convey("When marshal consumer trace data", func() {
+                       traceCtx := TraceContext{
+                               TraceType: SubAfter,
+                               TimeStamp: 1563789119096,
+                               GroupName: "CID_JODIE_1",
+                               IsSuccess: true,
+                               RequestId: "0A5DE93A96A818B4AAC26FFAFA780007",
+                               TraceBeans: []TraceBean{
+                                       {
+                                               Topic:      "TopicTest",
+                                               MsgId:      
"0A5DE93A973418B4AAC26FFAFA5A0000",
+                                               Tags:       "TagA",
+                                               Keys:       "OrderID1882",
+                                               StoreHost:  "10.93.233.58",
+                                               ClientHost: "10.93.233.58",
+                                               StoreTime:  1563789119092,
+                                               BodyLength: 190,
+                                       },
+                               },
+                       }
+                       bean := traceCtx.marshal2Bean()
+                       Convey("transData should equal to expected", func() {
+                               So(bean.transData, ShouldEqual, 
"SubAfter0A5DE93A96A818B4AAC26FFAFA7800070A5DE93A973418B4AAC26FFAFA5A00000trueOrderID18820")
+                       })
+                       Convey("transkey should equal to expected", func() {
+                               expectedKey := 
[]string{"0A5DE93A973418B4AAC26FFAFA5A0000", "OrderID1882"}
+                               So(bean.transKey[0], ShouldEqual, 
expectedKey[0])
+                               So(bean.transKey[1], ShouldEqual, 
expectedKey[1])
+                       })
+               })
+       })
+}
diff --git a/internal/utils/helper.go b/internal/utils/helper.go
index d8797dd..2b9eb2a 100644
--- a/internal/utils/helper.go
+++ b/internal/utils/helper.go
@@ -17,62 +17,10 @@ limitations under the License.
 
 package utils
 
-import (
-       "bytes"
-       "encoding/binary"
-       "fmt"
-       "os"
-       "sync"
-       "time"
-)
-
-var (
-       counter        int16 = 0
-       startTimestamp int64 = 0
-       nextTimestamp  int64 = 0
-       prefix         string
-       locker         sync.Mutex
-)
-
-func MessageClientID() string {
-       locker.Lock()
-       defer locker.Unlock()
-       if prefix == "" {
-               buf := new(bytes.Buffer)
-               binary.Write(buf, binary.BigEndian, LocalIP())
-               binary.Write(buf, binary.BigEndian, Pid())
-               binary.Write(buf, binary.BigEndian, ClassLoaderID())
-               prefix = fmt.Sprintf("%x", buf.Bytes())
-       }
-       if time.Now().Unix() > nextTimestamp {
-               updateTimestamp()
-       }
-       counter++
-       buf := new(bytes.Buffer)
-       binary.Write(buf, binary.BigEndian, 
int32((time.Now().Unix()-startTimestamp)*1000))
-       binary.Write(buf, binary.BigEndian, counter)
-       return prefix + fmt.Sprintf("%x", buf.Bytes())
-
-}
-
-func updateTimestamp() {
-       year, month := time.Now().Year(), time.Now().Month()
-       startTimestamp = time.Date(year, month, 1, 0, 0, 0, 0, 
time.Local).Unix()
-       nextTimestamp = time.Date(year, month, 1, 0, 0, 0, 0, 
time.Local).AddDate(0, 1, 0).Unix()
-}
-
 func GetAddressByBytes(data []byte) string {
        return "127.0.0.1"
 }
 
-func Pid() int16 {
-       return int16(os.Getpid())
-}
-
-func ClassLoaderID() int32 {
-       return 0
-}
-
 func UnCompress(data []byte) []byte {
        return data
 }
diff --git a/internal/utils/helper_test.go b/internal/utils/helper_test.go
deleted file mode 100644
index ee55cd2..0000000
--- a/internal/utils/helper_test.go
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package utils
-
-import (
-       "testing"
-)
-
-func TestClassLoaderID(t *testing.T) {
-       if ClassLoaderID() != 0 {
-               t.Errorf("wrong ClassLoaderID, want=%d, got=%d", 0, 
ClassLoaderID())
-       }
-}
-
-func BenchmarkMessageClientID(b *testing.B) {
-       for i := 0; i < b.N; i++ {
-               MessageClientID()
-       }
-}
diff --git a/internal/utils/net.go b/internal/utils/net.go
index 5da1edc..65f2731 100644
--- a/internal/utils/net.go
+++ b/internal/utils/net.go
@@ -1,20 +1,28 @@
 package utils
 
 import (
+       "bytes"
        "errors"
        "fmt"
        "net"
+       "strconv"
+       "time"
 )
 
-func LocalIP() string {
-       ip, err := clientIP4()
+var (
+       LocalIP string
+)
+
+func init() {
+       ip, err := ClientIP4()
        if err != nil {
-               return ""
+               LocalIP = ""
+       } else {
+               LocalIP = fmt.Sprintf("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3])
        }
-       return fmt.Sprintf("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3])
 }
 
-func clientIP4() ([]byte, error) {
+func ClientIP4() ([]byte, error) {
        addrs, err := net.InterfaceAddrs()
        if err != nil {
                return nil, errors.New("unexpected IP address")
@@ -28,3 +36,9 @@ func clientIP4() ([]byte, error) {
        }
        return nil, errors.New("unknown IP address")
 }
+
+func FakeIP() []byte {
+       buf := bytes.NewBufferString("")
+       
buf.WriteString(strconv.FormatInt(time.Now().UnixNano()/int64(time.Millisecond),
 10))
+       return buf.Bytes()[4:8]
+}
diff --git a/internal/utils/net_test.go b/internal/utils/net_test.go
index 9f76062..44454ad 100644
--- a/internal/utils/net_test.go
+++ b/internal/utils/net_test.go
@@ -3,5 +3,5 @@ package utils
 import "testing"
 
 func TestLocalIP2(t *testing.T) {
-       t.Log(LocalIP())
+       t.Log(LocalIP)
 }
diff --git a/primitive/ctx.go b/primitive/ctx.go
index 6e9b6d3..186521a 100644
--- a/primitive/ctx.go
+++ b/primitive/ctx.go
@@ -23,23 +23,55 @@ package primitive
 import (
        "context"
        "math"
+
+       "github.com/apache/rocketmq-client-go/rlog"
 )
 
 type CtxKey int
 
+type CommunicationMode string
+
+type ConsumeReturnType string
+
+func (c ConsumeReturnType) Ordinal() int {
+       switch c {
+       case SuccessReturn:
+               return 0
+       case TimeoutReturn:
+               return 1
+       case ExceptionRetrun:
+               return 2
+       case NullReturn:
+               return 3
+       case FailedReturn:
+               return 4
+       default:
+               rlog.Error("illegal ConsumeReturnType: %v", c)
+               return 0
+       }
+}
+
 const (
        method CtxKey = iota
        msgCtx
        orderlyCtx
        concurrentlyCtx
+       producerCtx
 
        // method name in  producer
-       SendSync   = "SendSync"
-       SendOneway = "SendOneway"
-       SendAsync  = "SendAsync"
+       SendSync   CommunicationMode = "SendSync"
+       SendOneway CommunicationMode = "SendOneway"
+       SendAsync  CommunicationMode = "SendAsync"
        // method name in consumer
        ConsumerPush = "ConsumerPush"
        ConsumerPull = "ConsumerPull"
+
+       PropCtxType                       = "ConsumeContextType"
+       SuccessReturn   ConsumeReturnType = "SUCCESS"
+       TimeoutReturn   ConsumeReturnType = "TIMEOUT"
+       ExceptionRetrun ConsumeReturnType = "EXCEPTION"
+       NullReturn      ConsumeReturnType = "RETURNNULL"
+       FailedReturn    ConsumeReturnType = "FAILED"
 )
 
 type ConsumeMessageContext struct {
@@ -53,13 +85,13 @@ type ConsumeMessageContext struct {
 }
 
 // WithMethod set call method name
-func WithMethod(ctx context.Context, m string) context.Context {
+func WithMethod(ctx context.Context, m CommunicationMode) context.Context {
        return context.WithValue(ctx, method, m)
 }
 
 // GetMethod get call method name
-func GetMethod(ctx context.Context) string {
-       return ctx.Value(method).(string)
+func GetMethod(ctx context.Context) CommunicationMode {
+       return ctx.Value(method).(CommunicationMode)
 }
 
 // WithConsumerCtx set ConsumeMessageContext in PushConsumer
@@ -116,3 +148,24 @@ func GetConcurrentlyCtx(ctx context.Context) 
(*ConsumeConcurrentlyContext, bool)
        c, exist := ctx.Value(concurrentlyCtx).(*ConsumeConcurrentlyContext)
        return c, exist
 }
+
+type ProducerCtx struct {
+       ProducerGroup     string
+       Message           Message
+       MQ                MessageQueue
+       BrokerAddr        string
+       BornHost          string
+       CommunicationMode CommunicationMode
+       SendResult        *SendResult
+       Props             map[string]string
+       MsgType           MessageType
+       Namespace         string
+}
+
+func WithProducerCtx(ctx context.Context, c *ProducerCtx) context.Context {
+       return context.WithValue(ctx, producerCtx, c)
+}
+
+func GetProducerCtx(ctx context.Context) *ProducerCtx {
+       return ctx.Value(producerCtx).(*ProducerCtx)
+}
diff --git a/primitive/interceptor.go b/primitive/interceptor.go
index 3c4c659..7fd3f83 100644
--- a/primitive/interceptor.go
+++ b/primitive/interceptor.go
@@ -28,3 +28,9 @@ type Invoker func(ctx context.Context, req, reply 
interface{}) error
 // In PushConsumer call, the req is []*MessageExt type and the reply is 
ConsumeResultHolder,
 // use type assert to get real type.
 type Interceptor func(ctx context.Context, req, reply interface{}, next 
Invoker) error
+
+// config for message trace.
+type TraceConfig struct {
+       TraceTopic string
+       Access     AccessChannel
+}
diff --git a/primitive/message.go b/primitive/message.go
index 1da6d5a..28c9eab 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -20,6 +20,7 @@ package primitive
 import (
        "fmt"
        "strconv"
+       "strings"
 
        "github.com/apache/rocketmq-client-go/internal/utils"
 )
@@ -108,6 +109,23 @@ func (msg *Message) RemoveProperty(key string) string {
        return value
 }
 
+func (msg *Message) SetKeys(keys []string) {
+       var sb strings.Builder
+       for _, k := range keys {
+               sb.WriteString(k)
+               sb.WriteString(PropertyKeySeparator)
+       }
+       msg.PutProperty(PropertyKeys, sb.String())
+}
+
+func (msg *Message) GetTags() string {
+       return msg.Properties[PropertyTags]
+}
+
+func (msg *Message) GetKeys() string {
+       return msg.Properties[PropertyKeys]
+}
+
 type MessageExt struct {
        Message
        MsgId                     string
@@ -129,6 +147,14 @@ func (msgExt *MessageExt) GetTags() string {
        return msgExt.Properties[PropertyTags]
 }
 
+func (msgExt *MessageExt) GetRegionID() string {
+       return msgExt.Properties[PropertyMsgRegion]
+}
+
+func (msgExt *MessageExt) IsTraceOn() string {
+       return msgExt.Properties[PropertyTraceSwitch]
+}
+
 func (msgExt *MessageExt) String() string {
        return fmt.Sprintf("[Message=%s, MsgId=%s, QueueId=%d, StoreSize=%d, 
QueueOffset=%d, SysFlag=%d, "+
                "BornTimestamp=%d, BornHost=%s, StoreTimestamp=%d, 
StoreHost=%s, CommitLogOffset=%d, BodyCRC=%d, "+
@@ -162,3 +188,21 @@ func (mq MessageQueue) Equals(queue *MessageQueue) bool {
        // TODO
        return mq.BrokerName == queue.BrokerName && mq.Topic == queue.Topic && 
mq.QueueId == mq.QueueId
 }
+
+type AccessChannel int
+
+const (
+       // connect to private IDC cluster.
+       Local AccessChannel = iota
+       // connect to Cloud service.
+       Cloud
+)
+
+type MessageType int
+
+const (
+       NormalMsg MessageType = iota
+       TransMsgHalf
+       TransMsgCommit
+       DelayMsg
+)
diff --git a/primitive/result.go b/primitive/result.go
index f21cc2e..4664cf0 100644
--- a/primitive/result.go
+++ b/primitive/result.go
@@ -20,7 +20,9 @@ package primitive
 import (
        "bytes"
        "encoding/binary"
+       "encoding/hex"
        "fmt"
+       "strings"
 
        "github.com/apache/rocketmq-client-go/internal/utils"
 )
@@ -203,7 +205,7 @@ func DecodeMessage(data []byte) []*MessageExt {
                }
                count += 2 + int(propertiesLength)
 
-               msg.MsgId = createMessageId(hostBytes, msg.CommitLogOffset)
+               msg.MsgId = createMessageId(hostBytes, port, 
msg.CommitLogOffset)
                //count += 16
 
                msgs = append(msgs, msg)
@@ -212,8 +214,12 @@ func DecodeMessage(data []byte) []*MessageExt {
        return msgs
 }
 
-func createMessageId(addr []byte, offset int64) string {
-       return "msgID" // TODO
+func createMessageId(addr []byte, port int32, offset int64) string {
+       buffer := new(bytes.Buffer)
+       buffer.Write(addr)
+       binary.Write(buffer, binary.BigEndian, port)
+       binary.Write(buffer, binary.BigEndian, offset)
+       return strings.ToUpper(hex.EncodeToString(buffer.Bytes()))
 }
 
 // unmarshalProperties parse data into property kv pairs.
diff --git a/primitive/result_test.go b/primitive/result_test.go
index 131db8c..a3fd5ab 100644
--- a/primitive/result_test.go
+++ b/primitive/result_test.go
@@ -18,12 +18,14 @@ limitations under the License.
 package primitive
 
 import (
+       "strings"
        "testing"
 
+       . "github.com/smartystreets/goconvey/convey"
        "github.com/stretchr/testify/assert"
 )
 
-func Test(t *testing.T) {
+func TestProperties(t *testing.T) {
        kv := map[string]string{
                "k1": "v1",
                "k2": "v2",
@@ -32,3 +34,17 @@ func Test(t *testing.T) {
        kv2 := unmarshalProperties([]byte(str))
        assert.Equal(t, kv, kv2)
 }
+
+func TestCreateMessageId(t *testing.T) {
+       Convey("MessageId gen", t, func() {
+               b := []byte{10, 93, 233, 58}
+               port := int32(10911)
+               offset := int64(4391252)
+               id := createMessageId(b, port, offset)
+
+               Convey("generated messageId should be equal to expected", 
func() {
+                       assert.Equal(t, 
strings.ToLower("0A5DE93A00002A9F0000000000430154"), id)
+               })
+       })
+
+}
diff --git a/producer/interceptor.go b/producer/interceptor.go
new file mode 100644
index 0000000..46f8042
--- /dev/null
+++ b/producer/interceptor.go
@@ -0,0 +1,97 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+/**
+ * builtin interceptor
+ */
+package producer
+
+import (
+       "context"
+       "time"
+
+       "github.com/apache/rocketmq-client-go/internal"
+       "github.com/apache/rocketmq-client-go/internal/utils"
+       "github.com/apache/rocketmq-client-go/primitive"
+)
+
+// WithTrace support rocketmq trace: 
https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.
+func WithTrace(traceCfg primitive.TraceConfig) Option {
+       return func(options *producerOptions) {
+
+               ori := options.Interceptors
+               options.Interceptors = make([]primitive.Interceptor, 0)
+               options.Interceptors = append(options.Interceptors, 
newTraceInterceptor(traceCfg))
+               options.Interceptors = append(options.Interceptors, ori...)
+       }
+}
+
+func newTraceInterceptor(traceCfg primitive.TraceConfig) primitive.Interceptor 
{
+       dispatcher := internal.NewTraceDispatcher(traceCfg.TraceTopic, 
traceCfg.Access)
+       dispatcher.Start()
+
+       return func(ctx context.Context, req, reply interface{}, next 
primitive.Invoker) error {
+               beginT := time.Now()
+               err := next(ctx, req, reply)
+
+               producerCtx := primitive.GetProducerCtx(ctx)
+               if producerCtx.Message.Topic == dispatcher.GetTraceTopicName() {
+                       return next(ctx, req, reply)
+               }
+
+               // SendOneway && SendAsync has no reply.
+               if reply == nil {
+                       return err
+               }
+
+               result := reply.(*primitive.SendResult)
+               if result.RegionID == "" || !result.TraceOn {
+                       return err
+               }
+
+               sendSuccess := result.Status == primitive.SendOK
+               costT := time.Since(beginT).Nanoseconds() / 
int64(time.Millisecond)
+               storeT := beginT.UnixNano()/int64(time.Millisecond) + costT/2
+
+               traceBean := internal.TraceBean{
+                       Topic:       producerCtx.Message.Topic,
+                       Tags:        producerCtx.Message.GetTags(),
+                       Keys:        producerCtx.Message.GetKeys(),
+                       StoreHost:   producerCtx.BrokerAddr,
+                       ClientHost:  utils.LocalIP,
+                       BodyLength:  len(producerCtx.Message.Body),
+                       MsgType:     producerCtx.MsgType,
+                       MsgId:       result.MsgID,
+                       OffsetMsgId: result.OffsetMsgID,
+                       StoreTime:   storeT,
+               }
+
+               traceCtx := internal.TraceContext{
+                       RequestId: internal.CreateUniqID(), // set id
+                       TimeStamp: time.Now().UnixNano() / 
int64(time.Millisecond),
+
+                       TraceType:  internal.Pub,
+                       GroupName:  producerCtx.ProducerGroup,
+                       RegionId:   result.RegionID,
+                       TraceBeans: []internal.TraceBean{traceBean},
+                       CostTime:   costT,
+                       IsSuccess:  sendSuccess,
+               }
+               dispatcher.Append(traceCtx)
+               return err
+       }
+}
diff --git a/producer/producer.go b/producer/producer.go
index 12a72b6..13f7952 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -25,6 +25,7 @@ import (
 
        "github.com/apache/rocketmq-client-go/internal"
        "github.com/apache/rocketmq-client-go/internal/remote"
+       "github.com/apache/rocketmq-client-go/internal/utils"
        "github.com/apache/rocketmq-client-go/primitive"
        "github.com/apache/rocketmq-client-go/rlog"
        "github.com/pkg/errors"
@@ -129,6 +130,15 @@ func (p *defaultProducer) SendSync(ctx context.Context, 
msg *primitive.Message)
        resp := new(primitive.SendResult)
        if p.interceptor != nil {
                primitive.WithMethod(ctx, primitive.SendSync)
+               producerCtx := &primitive.ProducerCtx{
+                       ProducerGroup: p.group,
+                       CommunicationMode: primitive.SendSync,
+                       BornHost: utils.LocalIP,
+                       Message: *msg,
+                       SendResult: resp,
+               }
+               ctx = primitive.WithProducerCtx(ctx, producerCtx)
+
                err := p.interceptor(ctx, msg, resp, func(ctx context.Context, 
req, reply interface{}) error {
                        var err error
                        realReq := req.(*primitive.Message)
@@ -151,6 +161,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg 
*primitive.Message,
                err error
        )
 
+       var producerCtx *primitive.ProducerCtx
        for retryCount := 0; retryCount < retryTime; retryCount++ {
                mq := p.selectMessageQueue(msg)
                if mq == nil {
@@ -163,6 +174,12 @@ func (p *defaultProducer) sendSync(ctx context.Context, 
msg *primitive.Message,
                        return fmt.Errorf("topic=%s route info not found", 
mq.Topic)
                }
 
+               if p.interceptor != nil {
+                       producerCtx = primitive.GetProducerCtx(ctx)
+                       producerCtx.BrokerAddr = addr
+                       producerCtx.MQ = *mq
+               }
+
                res, _err := p.client.InvokeSync(addr, p.buildSendRequest(mq, 
msg), 3*time.Second)
                if _err != nil {
                        err = _err

Reply via email to