This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 09ccdd4 add trace feature. resolve #124 (#125)
09ccdd4 is described below
commit 09ccdd463d6c8b1ab6497abf95b7e187be83bc46
Author: xujianhai666 <[email protected]>
AuthorDate: Thu Jul 25 16:20:51 2019 +0800
add trace feature. resolve #124 (#125)
---
consumer/interceptor.go | 109 +++++++++
consumer/push_consumer.go | 20 +-
examples/consumer/trace/main.go | 57 +++++
examples/producer/trace/main.go | 65 ++++++
go.mod | 1 +
go.sum | 9 +
internal/client.go | 2 +-
internal/constants.go | 1 +
internal/route.go | 2 +-
internal/trace.go | 482 ++++++++++++++++++++++++++++++++++++++++
internal/trace_test.go | 125 +++++++++++
internal/utils/helper.go | 52 -----
internal/utils/helper_test.go | 33 ---
internal/utils/net.go | 24 +-
internal/utils/net_test.go | 2 +-
primitive/ctx.go | 65 +++++-
primitive/interceptor.go | 6 +
primitive/message.go | 44 ++++
primitive/result.go | 12 +-
primitive/result_test.go | 18 +-
producer/interceptor.go | 97 ++++++++
producer/producer.go | 17 ++
22 files changed, 1134 insertions(+), 109 deletions(-)
diff --git a/consumer/interceptor.go b/consumer/interceptor.go
new file mode 100644
index 0000000..260ac6f
--- /dev/null
+++ b/consumer/interceptor.go
@@ -0,0 +1,109 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+ "context"
+ "time"
+
+ "github.com/apache/rocketmq-client-go/internal"
+ "github.com/apache/rocketmq-client-go/internal/utils"
+ "github.com/apache/rocketmq-client-go/primitive"
+)
+
+// WithTrace support rocketmq trace:
https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.
+func WithTrace(traceCfg primitive.TraceConfig) Option {
+ return func(options *consumerOptions) {
+
+ ori := options.Interceptors
+ options.Interceptors = make([]primitive.Interceptor, 0)
+ options.Interceptors = append(options.Interceptors,
newTraceInterceptor(traceCfg))
+ options.Interceptors = append(options.Interceptors, ori...)
+ }
+}
+
+func newTraceInterceptor(traceCfg primitive.TraceConfig) primitive.Interceptor
{
+ dispatcher := internal.NewTraceDispatcher(traceCfg.TraceTopic,
traceCfg.Access)
+ dispatcher.Start()
+
+ return func(ctx context.Context, req, reply interface{}, next
primitive.Invoker) error {
+ consumerCtx, exist := primitive.GetConsumerCtx(ctx)
+ if !exist || len(consumerCtx.Msgs) == 0 {
+ return next(ctx, req, reply)
+ }
+
+ beginT := time.Now()
+ // before traceCtx
+ traceCx := internal.TraceContext{
+ RequestId: internal.CreateUniqID(),
+ TimeStamp: time.Now().UnixNano() /
int64(time.Millisecond),
+ TraceType: internal.SubBefore,
+ GroupName: consumerCtx.ConsumerGroup,
+ IsSuccess: true,
+ }
+ beans := make([]internal.TraceBean, 0)
+ for _, msg := range consumerCtx.Msgs {
+ if msg == nil {
+ continue
+ }
+ regionID := msg.GetRegionID()
+ traceOn := msg.IsTraceOn()
+ if traceOn == "false" {
+ continue
+ }
+ bean := internal.TraceBean{
+ Topic: msg.Topic,
+ MsgId: msg.MsgId,
+ Tags: msg.GetTags(),
+ Keys: msg.GetKeys(),
+ StoreTime: msg.StoreTimestamp,
+ BodyLength: int(msg.StoreSize),
+ RetryTimes: int(msg.ReconsumeTimes),
+ ClientHost: utils.LocalIP,
+ StoreHost: utils.LocalIP,
+ }
+ beans = append(beans, bean)
+ traceCx.RegionId = regionID
+ }
+ if len(beans) > 0 {
+ traceCx.TraceBeans = beans
+ traceCx.TimeStamp = time.Now().UnixNano() /
int64(time.Millisecond)
+ dispatcher.Append(traceCx)
+ }
+
+ err := next(ctx, req, reply)
+
+ // after traceCtx
+ costTime := time.Since(beginT).Nanoseconds() /
int64(time.Millisecond)
+ ctxType := consumerCtx.Properties[primitive.PropCtxType]
+ afterCtx := internal.TraceContext{
+ TimeStamp: time.Now().UnixNano() /
int64(time.Millisecond),
+
+ TraceType: internal.SubAfter,
+ RegionId: traceCx.RegionId,
+ GroupName: traceCx.GroupName,
+ RequestId: traceCx.RequestId,
+ IsSuccess: consumerCtx.Success,
+ CostTime: costTime,
+ TraceBeans: traceCx.TraceBeans,
+ ContextCode:
primitive.ConsumeReturnType(ctxType).Ordinal(),
+ }
+ dispatcher.Append(afterCtx)
+ return err
+ }
+}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 3b011c3..7b4d65e 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -640,6 +640,9 @@ func (pc *pushConsumer) consumeInner(ctx context.Context,
subMsgs []*primitive.M
realReply := reply.(*ConsumeResultHolder)
realReply.ConsumeResult = r
+
+ msgCtx, _ := primitive.GetConsumerCtx(ctx)
+ msgCtx.Success = realReply.ConsumeResult ==
ConsumeSuccess
return e
})
return container.ConsumeResult, err
@@ -694,6 +697,9 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
var err error
msgCtx := &primitive.ConsumeMessageContext{
Properties: make(map[string]string),
+ ConsumerGroup: pc.consumerGroup,
+ MQ: mq,
+ Msgs: msgs,
}
ctx := context.Background()
ctx = primitive.WithConsumerCtx(ctx, msgCtx)
@@ -706,16 +712,15 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
consumeRT := time.Now().Sub(beginTime)
if err != nil {
- msgCtx.Properties["ConsumeContextType"] =
"EXCEPTION"
+ msgCtx.Properties[primitive.PropCtxType] =
string(primitive.ExceptionRetrun)
} else if consumeRT >= pc.option.ConsumeTimeout {
- msgCtx.Properties["ConsumeContextType"] =
"TIMEOUT"
+ msgCtx.Properties[primitive.PropCtxType] =
string(primitive.TimeoutReturn)
} else if result == ConsumeSuccess {
- msgCtx.Properties["ConsumeContextType"] =
"SUCCESS"
- } else {
- msgCtx.Properties["ConsumeContextType"] =
"RECONSUME_LATER"
+ msgCtx.Properties[primitive.PropCtxType] =
string(primitive.SuccessReturn)
+ } else if result == ConsumeRetryLater{
+ msgCtx.Properties[primitive.PropCtxType] =
string(primitive.FailedReturn)
}
- // TODO hook
increaseConsumeRT(pc.consumerGroup, mq.Topic,
int64(consumeRT/time.Millisecond))
if !pq.dropped {
@@ -808,6 +813,9 @@ func (pc *pushConsumer) consumeMessageOrderly(pq
*processQueue, mq *primitive.Me
ctx := context.Background()
msgCtx := &primitive.ConsumeMessageContext{
Properties: make(map[string]string),
+ ConsumerGroup: pc.consumerGroup,
+ MQ: mq,
+ Msgs: msgs,
}
ctx = primitive.WithConsumerCtx(ctx, msgCtx)
ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)
diff --git a/examples/consumer/trace/main.go b/examples/consumer/trace/main.go
new file mode 100644
index 0000000..9102175
--- /dev/null
+++ b/examples/consumer/trace/main.go
@@ -0,0 +1,57 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/apache/rocketmq-client-go"
+ "github.com/apache/rocketmq-client-go/consumer"
+ "github.com/apache/rocketmq-client-go/primitive"
+)
+
+func main() {
+ namesrvs := []string{"127.0.0.1:9876"}
+ traceCfg := primitive.TraceConfig{
+ Access: primitive.Local,
+ }
+
+ c, _ := rocketmq.NewPushConsumer(
+ consumer.WithGroupName("testGroup"),
+ consumer.WithNameServer(namesrvs),
+ consumer.WithTrace(traceCfg),
+ )
+ err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx
context.Context,
+ msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
+ fmt.Printf("subscribe callback: %v \n", msgs)
+ return consumer.ConsumeSuccess, nil
+ })
+ if err != nil {
+ fmt.Println(err.Error())
+ }
+ // Note: start after subscribe
+ err = c.Start()
+ if err != nil {
+ fmt.Println(err.Error())
+ os.Exit(-1)
+ }
+ time.Sleep(time.Hour)
+}
diff --git a/examples/producer/trace/main.go b/examples/producer/trace/main.go
new file mode 100644
index 0000000..266c783
--- /dev/null
+++ b/examples/producer/trace/main.go
@@ -0,0 +1,65 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/apache/rocketmq-client-go"
+ "github.com/apache/rocketmq-client-go/primitive"
+ "github.com/apache/rocketmq-client-go/producer"
+)
+
+func main() {
+ namesrvs := []string{"127.0.0.1:9876"}
+ traceCfg := primitive.TraceConfig{
+ Access: primitive.Local,
+ }
+
+ p, _ := rocketmq.NewProducer(
+ producer.WithNameServer(namesrvs),
+ producer.WithRetry(2),
+ producer.WithTrace(traceCfg))
+ err := p.Start()
+ if err != nil {
+ fmt.Printf("start producer error: %s", err.Error())
+ os.Exit(1)
+ }
+ for i := 0; i < 1; i++ {
+ res, err := p.SendSync(context.Background(), &primitive.Message{
+ Topic: "TopicTest",
+ Body: []byte("Hello RocketMQ Go Client!"),
+ })
+
+ if err != nil {
+ fmt.Printf("send message error: %s\n", err)
+ } else {
+ fmt.Printf("send message success: result=%s\n",
res.String())
+ }
+ }
+
+ time.Sleep(10 * time.Second)
+
+ err = p.Shutdown()
+ if err != nil {
+ fmt.Printf("shundown producer error: %s", err.Error())
+ }
+}
diff --git a/go.mod b/go.mod
index 322c847..e1b0d88 100644
--- a/go.mod
+++ b/go.mod
@@ -7,6 +7,7 @@ require (
github.com/golang/mock v1.3.1
github.com/pkg/errors v0.8.1
github.com/sirupsen/logrus v1.4.1
+ github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945
github.com/stretchr/testify v1.3.0
github.com/tidwall/gjson v1.2.1
github.com/tidwall/match v1.0.1 // indirect
diff --git a/go.sum b/go.sum
index 06d0d34..cc4c4b6 100644
--- a/go.sum
+++ b/go.sum
@@ -5,6 +5,10 @@ github.com/emirpasic/gods v1.12.0
h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg
github.com/emirpasic/gods v1.12.0/go.mod
h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s=
github.com/golang/mock v1.3.1/go.mod
h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
+github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1
h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
+github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod
h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
+github.com/jtolds/gls v4.20.0+incompatible
h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
+github.com/jtolds/gls v4.20.0+incompatible/go.mod
h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/konsorten/go-windows-terminal-sequences v1.0.1
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
@@ -13,6 +17,10 @@ github.com/pmezard/go-difflib v1.0.0
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.4.1
h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
github.com/sirupsen/logrus v1.4.1/go.mod
h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
+github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d
h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
+github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod
h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
+github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945
h1:N8Bg45zpk/UcpNGnfJt2y/3lRWASHNTUET8owPYCgYI=
+github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945/go.mod
h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/stretchr/objx v0.1.0/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
@@ -32,4 +40,5 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod
h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a
h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod
h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod
h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
diff --git a/internal/client.go b/internal/client.go
index 558c28d..d58efc4 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -88,7 +88,7 @@ func DefaultClientOptions() ClientOptions {
opts := ClientOptions{
InstanceName: "DEFAULT",
RetryTimes: 3,
- ClientIP: utils.LocalIP(),
+ ClientIP: utils.LocalIP,
}
return opts
}
diff --git a/internal/constants.go b/internal/constants.go
index a234c18..e2e911f 100644
--- a/internal/constants.go
+++ b/internal/constants.go
@@ -21,6 +21,7 @@ const (
RetryGroupTopicPrefix = "%RETRY%"
DefaultConsumerGroup = "DEFAULT_CONSUMER"
ClientInnerProducerGroup = "CLIENT_INNER_PRODUCER"
+ SystemTopicPrefix = "rmq_sys_"
)
func GetRetryTopic(group string) string {
diff --git a/internal/route.go b/internal/route.go
index 8b1e713..6053fd8 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -275,7 +275,7 @@ func FetchPublishMessageQueues(topic string)
([]*primitive.MessageQueue, error)
if !exist {
routeData, err = queryTopicRouteInfoFromServer(topic)
if err != nil {
- rlog.Error("queryTopicRouteInfoFromServer failed.
topic: %v", topic)
+ rlog.Error("queryTopicRouteInfoFromServer failed.
topic: ", topic)
return nil, err
}
routeDataMap.Store(topic, routeData)
diff --git a/internal/trace.go b/internal/trace.go
new file mode 100644
index 0000000..dc3b246
--- /dev/null
+++ b/internal/trace.go
@@ -0,0 +1,482 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package internal
+
+import (
+ "bytes"
+ "context"
+ "encoding/binary"
+ "encoding/hex"
+ "fmt"
+ "os"
+ "runtime"
+ "strconv"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/apache/rocketmq-client-go/internal/remote"
+ "github.com/apache/rocketmq-client-go/internal/utils"
+ "github.com/apache/rocketmq-client-go/primitive"
+ "github.com/apache/rocketmq-client-go/rlog"
+)
+
+var (
+ counter int16 = 0
+ startTimestamp int64 = 0
+ nextTimestamp int64 = 0
+ prefix string
+ locker sync.Mutex
+ classLoadId int32 = 0
+)
+
+func init() {
+ buf := new(bytes.Buffer)
+
+ ip, err := utils.ClientIP4()
+ if err != nil {
+ ip = utils.FakeIP()
+ }
+ _, _ = buf.Write(ip)
+ _ = binary.Write(buf, binary.BigEndian, Pid())
+ _ = binary.Write(buf, binary.BigEndian, classLoadId)
+ prefix = strings.ToUpper(hex.EncodeToString(buf.Bytes()))
+}
+
+func CreateUniqID() string {
+ locker.Lock()
+ defer locker.Unlock()
+
+ if time.Now().Unix() > nextTimestamp {
+ updateTimestamp()
+ }
+ counter++
+ buf := new(bytes.Buffer)
+ _ = binary.Write(buf, binary.BigEndian,
int32((time.Now().Unix()-startTimestamp)*1000))
+ _ = binary.Write(buf, binary.BigEndian, counter)
+
+ return prefix + hex.EncodeToString(buf.Bytes())
+}
+
+func updateTimestamp() {
+ year, month := time.Now().Year(), time.Now().Month()
+ startTimestamp = time.Date(year, month, 1, 0, 0, 0, 0,
time.Local).Unix()
+ nextTimestamp = time.Date(year, month, 1, 0, 0, 0, 0,
time.Local).AddDate(0, 1, 0).Unix()
+}
+
+func Pid() int16 {
+ return int16(os.Getpid())
+}
+
+type TraceBean struct {
+ Topic string
+ MsgId string
+ OffsetMsgId string
+ Tags string
+ Keys string
+ StoreHost string
+ ClientHost string
+ StoreTime int64
+ RetryTimes int
+ BodyLength int
+ MsgType primitive.MessageType
+}
+
+type TraceTransferBean struct {
+ transData string
+ // not duplicate
+ transKey []string
+}
+
+type TraceType string
+
+const (
+ Pub TraceType = "Pub"
+ SubBefore TraceType = "SubBefore"
+ SubAfter TraceType = "SubAfter"
+
+ contentSplitter = '\001'
+ fieldSplitter = '\002'
+)
+
+type TraceContext struct {
+ TraceType TraceType
+ TimeStamp int64
+ RegionId string
+ RegionName string
+ GroupName string
+ CostTime int64
+ IsSuccess bool
+ RequestId string
+ ContextCode int
+ TraceBeans []TraceBean
+}
+
+func (ctx *TraceContext) marshal2Bean() *TraceTransferBean {
+ buffer := bytes.NewBufferString("")
+ switch ctx.TraceType {
+ case Pub:
+ bean := ctx.TraceBeans[0]
+ buffer.WriteString(string(ctx.TraceType))
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(strconv.FormatInt(ctx.TimeStamp, 10))
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(ctx.RegionId)
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(ctx.GroupName)
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(bean.Topic)
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(bean.MsgId)
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(bean.Tags)
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(bean.Keys)
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(bean.StoreHost)
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(strconv.Itoa(bean.BodyLength))
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(strconv.FormatInt(ctx.CostTime, 10))
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(strconv.Itoa(int(bean.MsgType)))
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(bean.OffsetMsgId)
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(strconv.FormatBool(ctx.IsSuccess))
+ buffer.WriteRune(fieldSplitter)
+ case SubBefore:
+ for _, bean := range ctx.TraceBeans {
+ buffer.WriteString(string(ctx.TraceType))
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(strconv.FormatInt(ctx.TimeStamp, 10))
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(ctx.RegionId)
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(ctx.GroupName)
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(ctx.RequestId)
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(bean.MsgId)
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(strconv.Itoa(bean.RetryTimes))
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(nullWrap(bean.Keys))
+ buffer.WriteRune(fieldSplitter)
+ }
+ case SubAfter:
+ for _, bean := range ctx.TraceBeans {
+ buffer.WriteString(string(ctx.TraceType))
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(ctx.RequestId)
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(bean.MsgId)
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(strconv.FormatInt(ctx.CostTime, 10))
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(strconv.FormatBool(ctx.IsSuccess))
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(nullWrap(bean.Keys))
+ buffer.WriteRune(contentSplitter)
+ buffer.WriteString(strconv.Itoa(ctx.ContextCode))
+ buffer.WriteRune(fieldSplitter)
+ }
+ }
+ transferBean := new(TraceTransferBean)
+ transferBean.transData = buffer.String()
+ for _, bean := range ctx.TraceBeans {
+ transferBean.transKey = append(transferBean.transKey,
bean.MsgId)
+ if len(bean.Keys) > 0 {
+ transferBean.transKey = append(transferBean.transKey,
bean.Keys)
+ }
+ }
+ return transferBean
+}
+
+// compatible with java console.
+func nullWrap(s string) string {
+ if len(s) == 0 {
+ return "null"
+ }
+ return s
+}
+
+type traceDispatcherType int
+
+const (
+ RmqSysTraceTopic = "RMQ_SYS_TRACE_TOPIC"
+
+ ProducerType traceDispatcherType = iota
+ ConsumerType
+
+ maxMsgSize = 128000 - 10*1000
+ batchSize = 100
+
+ TraceTopicPrefix = SystemTopicPrefix + "TRACE_DATA_"
+ TraceGroupName = "_INNER_TRACE_PRODUCER"
+)
+
+type TraceDispatcher interface {
+ GetTraceTopicName() string
+
+ Start()
+ Append(ctx TraceContext) bool
+ Close()
+}
+
+type traceDispatcher struct {
+ ctx context.Context
+ cancel context.CancelFunc
+ running bool
+
+ traceTopic string
+ access primitive.AccessChannel
+
+ ticker *time.Ticker
+ input chan TraceContext
+ batchCh chan []*TraceContext
+
+ discardCount int64
+
+ // support deliver trace message to other cluster.
+ namesrvs []string
+ // round robin index
+ rrindex int32
+ cli RMQClient
+}
+
+func NewTraceDispatcher(traceTopic string, access primitive.AccessChannel)
*traceDispatcher {
+ ctx := context.Background()
+ ctx, cancel := context.WithCancel(ctx)
+
+ t := traceTopic
+ if len(t) == 0 {
+ t = RmqSysTraceTopic
+ }
+
+ if access == primitive.Cloud {
+ t = TraceTopicPrefix + traceTopic
+ }
+
+ cliOp := DefaultClientOptions()
+ cliOp.RetryTimes = 0
+ cli := GetOrNewRocketMQClient(cliOp)
+ return &traceDispatcher{
+ ctx: ctx,
+ cancel: cancel,
+
+ traceTopic: t,
+ access: access,
+ input: make(chan TraceContext, 1024),
+ batchCh: make(chan []*TraceContext, 2048),
+ cli: cli,
+ }
+}
+
+func (td *traceDispatcher) GetTraceTopicName() string {
+ return td.traceTopic
+}
+
+func (td *traceDispatcher) Start() {
+ td.running = true
+ td.cli.Start()
+ go td.process()
+}
+
+func (td *traceDispatcher) Close() {
+ td.running = false
+ td.ticker.Stop()
+ td.cancel()
+}
+
+func (td *traceDispatcher) Append(ctx TraceContext) bool {
+ if !td.running {
+ rlog.Error("traceDispatcher is closed.")
+ return false
+ }
+ select {
+ case td.input <- ctx:
+ return true
+ default:
+ rlog.Warnf("buffer full: %d, ctx is %v",
atomic.AddInt64(&td.discardCount, 1), ctx)
+ return false
+ }
+}
+
+// process
+func (td *traceDispatcher) process() {
+ var count int
+ var batch []TraceContext
+ maxWaitDuration := 5 * time.Millisecond
+ maxWaitTime := maxWaitDuration.Nanoseconds()
+ td.ticker = time.NewTicker(maxWaitDuration)
+ lastput := time.Now()
+ for {
+ select {
+ case ctx := <-td.input:
+ count++
+ lastput = time.Now()
+ batch = append(batch, ctx)
+ if count == batchSize {
+ count = 0
+ go td.batchCommit(batch)
+ batch = make([]TraceContext, 0)
+ }
+ case <-td.ticker.C:
+ delta := time.Since(lastput).Nanoseconds()
+ if delta > maxWaitTime {
+ count++
+ lastput = time.Now()
+ if len(batch) > 0 {
+ go td.batchCommit(batch)
+ batch = make([]TraceContext, 0)
+ }
+ }
+ case <-td.ctx.Done():
+ go td.batchCommit(batch)
+ batch = make([]TraceContext, 0)
+
+ now := time.Now().UnixNano() / int64(time.Millisecond)
+ end := now + 500
+ for now < end {
+ now = time.Now().UnixNano() /
int64(time.Millisecond)
+ runtime.Gosched()
+ }
+ rlog.Infof("------end trace send %v %v", td.input,
td.batchCh)
+ }
+ }
+}
+
+// batchCommit commit slice of TraceContext. convert the ctxs to keyed
pair(key is Topic + regionid).
+// flush according key one by one.
+func (td *traceDispatcher) batchCommit(ctxs []TraceContext) {
+ keyedCtxs := make(map[string][]TraceTransferBean)
+ for _, ctx := range ctxs {
+ if len(ctx.TraceBeans) == 0 {
+ return
+ }
+ topic := ctx.TraceBeans[0].Topic
+ regionID := ctx.RegionId
+ key := topic
+ if len(regionID) > 0 {
+ key = fmt.Sprintf("%s%c%s", topic, contentSplitter,
regionID)
+ }
+ keyedCtxs[key] = append(keyedCtxs[key], *ctx.marshal2Bean())
+ }
+
+ for k, v := range keyedCtxs {
+ arr := strings.Split(k, string([]byte{contentSplitter}))
+ topic := k
+ regionID := ""
+ if len(arr) > 1 {
+ topic = arr[0]
+ regionID = arr[1]
+ }
+ td.flush(topic, regionID, v)
+ }
+}
+
+type Keyset map[string]struct{}
+
+func (ks Keyset) slice() []string {
+ slice := make([]string, len(ks))
+ for k, _ := range ks {
+ slice = append(slice, k)
+ }
+ return slice
+}
+
+// flush data in batch.
+func (td *traceDispatcher) flush(topic, regionID string, data
[]TraceTransferBean) {
+ if len(data) == 0 {
+ return
+ }
+
+ keyset := make(Keyset)
+ var builder strings.Builder
+ flushed := true
+ for _, bean := range data {
+ for _, k := range bean.transKey {
+ keyset[k] = struct{}{}
+ }
+ builder.WriteString(bean.transData)
+ flushed = false
+
+ if builder.Len() > maxMsgSize {
+ td.sendTraceDataByMQ(keyset, regionID, builder.String())
+ builder.Reset()
+ keyset = make(Keyset)
+ flushed = true
+ }
+ }
+ if !flushed {
+ td.sendTraceDataByMQ(keyset, regionID, builder.String())
+ }
+}
+
+func (td *traceDispatcher) sendTraceDataByMQ(keyset Keyset, regionID string,
data string) {
+ msg := primitive.NewMessage(td.traceTopic, []byte(data))
+ msg.SetKeys(keyset.slice())
+
+ mq, addr := td.findMq()
+ if mq == nil {
+ return
+ }
+
+ var req = td.buildSendRequest(mq, msg)
+ td.cli.InvokeAsync(addr, req, 5000*time.Millisecond, func(command
*remote.RemotingCommand, e error) {
+ if e != nil {
+ rlog.Error("send trace data ,the traceData is %v", data)
+ }
+ })
+}
+
+func (td *traceDispatcher) findMq() (*primitive.MessageQueue, string) {
+ mqs, err := FetchPublishMessageQueues(td.traceTopic)
+ if err != nil {
+ rlog.Error("fetch publish message queues failed. err: %v", err)
+ return nil, ""
+ }
+ i := atomic.AddInt32(&td.rrindex, 1)
+ if i < 0 {
+ i = 0
+ atomic.StoreInt32(&td.rrindex, 0)
+ }
+ i %= int32(len(mqs))
+ mq := mqs[i]
+
+ brokerName := mq.BrokerName
+ addr := FindBrokerAddrByName(brokerName)
+
+ return mq, addr
+}
+
+func (td *traceDispatcher) buildSendRequest(mq *primitive.MessageQueue,
+ msg *primitive.Message) *remote.RemotingCommand {
+ req := &SendMessageRequest{
+ ProducerGroup: TraceGroupName,
+ Topic: mq.Topic,
+ QueueId: mq.QueueId,
+ BornTimestamp: time.Now().UnixNano() / int64(time.Millisecond),
+ Flag: msg.Flag,
+ Properties: primitive.MarshalPropeties(msg.Properties),
+ }
+
+ return remote.NewRemotingCommand(ReqSendMessage, req, msg.Body)
+}
diff --git a/internal/trace_test.go b/internal/trace_test.go
new file mode 100644
index 0000000..23808a3
--- /dev/null
+++ b/internal/trace_test.go
@@ -0,0 +1,125 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package internal
+
+import (
+ "testing"
+
+ "github.com/apache/rocketmq-client-go/primitive"
+ . "github.com/smartystreets/goconvey/convey"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestMarshal2Bean(t *testing.T) {
+
+ Convey("marshal of TraceContext", t, func() {
+
+ Convey("When marshal producer trace data", func() {
+ traceCtx := TraceContext{
+ TraceType: Pub,
+ TimeStamp: 1563780533299,
+ RegionId: "DefaultRegion",
+ GroupName: "ProducerGroupName",
+ CostTime: 3572,
+ IsSuccess: true,
+ RequestId: "0A5DE93A815518B4AAC26F77F8330001",
+ TraceBeans: []TraceBean{
+ {
+ Topic: "TopicTest",
+ MsgId:
"0A5DE93A833B18B4AAC26F842A2F0000",
+ OffsetMsgId:
"0A5DE93A00002A9F000000000042E322",
+ Tags: "TagA",
+ Keys: "OrderID1882",
+ StoreHost:
"10.93.233.58:10911",
+ ClientHost: "10.93.233.58",
+ StoreTime: 1563780535085,
+ BodyLength: 11,
+ MsgType:
primitive.NormalMsg,
+ },
+ },
+ }
+ bean := traceCtx.marshal2Bean()
+ assert.Equal(t,
"Pub1563780533299DefaultRegionProducerGroupNameTopicTest0A5DE93A833B18B4AAC26F842A2F0000TagAOrderID188210.93.233.58:1091111357200A5DE93A00002A9F000000000042E322true\x02",
+ bean.transData)
+ assert.Equal(t,
[]string{"0A5DE93A833B18B4AAC26F842A2F0000", "OrderID1882"}, bean.transKey)
+
+ // consumer before test
+ traceCtx = TraceContext{
+ TraceType: SubBefore,
+ TimeStamp: 1563789119096,
+ GroupName: "CID_JODIE_1",
+ IsSuccess: true,
+ RequestId: "0A5DE93A96A818B4AAC26FFAFA780007",
+ TraceBeans: []TraceBean{
+ {
+ Topic: "TopicTest",
+ MsgId:
"0A5DE93A973418B4AAC26FFAFA5A0000",
+ Tags: "TagA",
+ Keys: "OrderID1882",
+ StoreHost: "10.93.233.58",
+ ClientHost: "10.93.233.58",
+ StoreTime: 1563789119092,
+ BodyLength: 190,
+ },
+ },
+ }
+ bean = traceCtx.marshal2Bean()
+
+ Convey("transData should equal to expected", func() {
+ So(bean.transData, ShouldEqual,
"SubBefore1563789119096CID_JODIE_10A5DE93A96A818B4AAC26FFAFA7800070A5DE93A973418B4AAC26FFAFA5A00000OrderID1882")
+ })
+
+ Convey("transkey should equal to expected", func() {
+ expectedKey :=
[]string{"0A5DE93A973418B4AAC26FFAFA5A0000", "OrderID1882"}
+ So(bean.transKey[0], ShouldEqual,
expectedKey[0])
+ So(bean.transKey[1], ShouldEqual,
expectedKey[1])
+ })
+ })
+
+ Convey("When marshal consumer trace data", func() {
+ traceCtx := TraceContext{
+ TraceType: SubAfter,
+ TimeStamp: 1563789119096,
+ GroupName: "CID_JODIE_1",
+ IsSuccess: true,
+ RequestId: "0A5DE93A96A818B4AAC26FFAFA780007",
+ TraceBeans: []TraceBean{
+ {
+ Topic: "TopicTest",
+ MsgId:
"0A5DE93A973418B4AAC26FFAFA5A0000",
+ Tags: "TagA",
+ Keys: "OrderID1882",
+ StoreHost: "10.93.233.58",
+ ClientHost: "10.93.233.58",
+ StoreTime: 1563789119092,
+ BodyLength: 190,
+ },
+ },
+ }
+ bean := traceCtx.marshal2Bean()
+ Convey("transData should equal to expected", func() {
+ So(bean.transData, ShouldEqual,
"SubAfter0A5DE93A96A818B4AAC26FFAFA7800070A5DE93A973418B4AAC26FFAFA5A00000trueOrderID18820")
+ })
+ Convey("transkey should equal to expected", func() {
+ expectedKey :=
[]string{"0A5DE93A973418B4AAC26FFAFA5A0000", "OrderID1882"}
+ So(bean.transKey[0], ShouldEqual,
expectedKey[0])
+ So(bean.transKey[1], ShouldEqual,
expectedKey[1])
+ })
+ })
+ })
+}
diff --git a/internal/utils/helper.go b/internal/utils/helper.go
index d8797dd..2b9eb2a 100644
--- a/internal/utils/helper.go
+++ b/internal/utils/helper.go
@@ -17,62 +17,10 @@ limitations under the License.
package utils
-import (
- "bytes"
- "encoding/binary"
- "fmt"
- "os"
- "sync"
- "time"
-)
-
-var (
- counter int16 = 0
- startTimestamp int64 = 0
- nextTimestamp int64 = 0
- prefix string
- locker sync.Mutex
-)
-
-func MessageClientID() string {
- locker.Lock()
- defer locker.Unlock()
- if prefix == "" {
- buf := new(bytes.Buffer)
- binary.Write(buf, binary.BigEndian, LocalIP())
- binary.Write(buf, binary.BigEndian, Pid())
- binary.Write(buf, binary.BigEndian, ClassLoaderID())
- prefix = fmt.Sprintf("%x", buf.Bytes())
- }
- if time.Now().Unix() > nextTimestamp {
- updateTimestamp()
- }
- counter++
- buf := new(bytes.Buffer)
- binary.Write(buf, binary.BigEndian,
int32((time.Now().Unix()-startTimestamp)*1000))
- binary.Write(buf, binary.BigEndian, counter)
- return prefix + fmt.Sprintf("%x", buf.Bytes())
-
-}
-
-func updateTimestamp() {
- year, month := time.Now().Year(), time.Now().Month()
- startTimestamp = time.Date(year, month, 1, 0, 0, 0, 0,
time.Local).Unix()
- nextTimestamp = time.Date(year, month, 1, 0, 0, 0, 0,
time.Local).AddDate(0, 1, 0).Unix()
-}
-
func GetAddressByBytes(data []byte) string {
return "127.0.0.1"
}
-func Pid() int16 {
- return int16(os.Getpid())
-}
-
-func ClassLoaderID() int32 {
- return 0
-}
-
func UnCompress(data []byte) []byte {
return data
}
diff --git a/internal/utils/helper_test.go b/internal/utils/helper_test.go
deleted file mode 100644
index ee55cd2..0000000
--- a/internal/utils/helper_test.go
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package utils
-
-import (
- "testing"
-)
-
-func TestClassLoaderID(t *testing.T) {
- if ClassLoaderID() != 0 {
- t.Errorf("wrong ClassLoaderID, want=%d, got=%d", 0,
ClassLoaderID())
- }
-}
-
-func BenchmarkMessageClientID(b *testing.B) {
- for i := 0; i < b.N; i++ {
- MessageClientID()
- }
-}
diff --git a/internal/utils/net.go b/internal/utils/net.go
index 5da1edc..65f2731 100644
--- a/internal/utils/net.go
+++ b/internal/utils/net.go
@@ -1,20 +1,28 @@
package utils
import (
+ "bytes"
"errors"
"fmt"
"net"
+ "strconv"
+ "time"
)
-func LocalIP() string {
- ip, err := clientIP4()
+var (
+ LocalIP string
+)
+
+func init() {
+ ip, err := ClientIP4()
if err != nil {
- return ""
+ LocalIP = ""
+ } else {
+ LocalIP = fmt.Sprintf("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3])
}
- return fmt.Sprintf("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3])
}
-func clientIP4() ([]byte, error) {
+func ClientIP4() ([]byte, error) {
addrs, err := net.InterfaceAddrs()
if err != nil {
return nil, errors.New("unexpected IP address")
@@ -28,3 +36,9 @@ func clientIP4() ([]byte, error) {
}
return nil, errors.New("unknown IP address")
}
+
+func FakeIP() []byte {
+ buf := bytes.NewBufferString("")
+
buf.WriteString(strconv.FormatInt(time.Now().UnixNano()/int64(time.Millisecond),
10))
+ return buf.Bytes()[4:8]
+}
diff --git a/internal/utils/net_test.go b/internal/utils/net_test.go
index 9f76062..44454ad 100644
--- a/internal/utils/net_test.go
+++ b/internal/utils/net_test.go
@@ -3,5 +3,5 @@ package utils
import "testing"
func TestLocalIP2(t *testing.T) {
- t.Log(LocalIP())
+ t.Log(LocalIP)
}
diff --git a/primitive/ctx.go b/primitive/ctx.go
index 6e9b6d3..186521a 100644
--- a/primitive/ctx.go
+++ b/primitive/ctx.go
@@ -23,23 +23,55 @@ package primitive
import (
"context"
"math"
+
+ "github.com/apache/rocketmq-client-go/rlog"
)
type CtxKey int
+type CommunicationMode string
+
+type ConsumeReturnType string
+
+func (c ConsumeReturnType) Ordinal() int {
+ switch c {
+ case SuccessReturn:
+ return 0
+ case TimeoutReturn:
+ return 1
+ case ExceptionRetrun:
+ return 2
+ case NullReturn:
+ return 3
+ case FailedReturn:
+ return 4
+ default:
+ rlog.Error("illegal ConsumeReturnType: %v", c)
+ return 0
+ }
+}
+
const (
method CtxKey = iota
msgCtx
orderlyCtx
concurrentlyCtx
+ producerCtx
// method name in producer
- SendSync = "SendSync"
- SendOneway = "SendOneway"
- SendAsync = "SendAsync"
+ SendSync CommunicationMode = "SendSync"
+ SendOneway CommunicationMode = "SendOneway"
+ SendAsync CommunicationMode = "SendAsync"
// method name in consumer
ConsumerPush = "ConsumerPush"
ConsumerPull = "ConsumerPull"
+
+ PropCtxType = "ConsumeContextType"
+ SuccessReturn ConsumeReturnType = "SUCCESS"
+ TimeoutReturn ConsumeReturnType = "TIMEOUT"
+ ExceptionRetrun ConsumeReturnType = "EXCEPTION"
+ NullReturn ConsumeReturnType = "RETURNNULL"
+ FailedReturn ConsumeReturnType = "FAILED"
)
type ConsumeMessageContext struct {
@@ -53,13 +85,13 @@ type ConsumeMessageContext struct {
}
// WithMethod set call method name
-func WithMethod(ctx context.Context, m string) context.Context {
+func WithMethod(ctx context.Context, m CommunicationMode) context.Context {
return context.WithValue(ctx, method, m)
}
// GetMethod get call method name
-func GetMethod(ctx context.Context) string {
- return ctx.Value(method).(string)
+func GetMethod(ctx context.Context) CommunicationMode {
+ return ctx.Value(method).(CommunicationMode)
}
// WithConsumerCtx set ConsumeMessageContext in PushConsumer
@@ -116,3 +148,24 @@ func GetConcurrentlyCtx(ctx context.Context)
(*ConsumeConcurrentlyContext, bool)
c, exist := ctx.Value(concurrentlyCtx).(*ConsumeConcurrentlyContext)
return c, exist
}
+
+type ProducerCtx struct {
+ ProducerGroup string
+ Message Message
+ MQ MessageQueue
+ BrokerAddr string
+ BornHost string
+ CommunicationMode CommunicationMode
+ SendResult *SendResult
+ Props map[string]string
+ MsgType MessageType
+ Namespace string
+}
+
+func WithProducerCtx(ctx context.Context, c *ProducerCtx) context.Context {
+ return context.WithValue(ctx, producerCtx, c)
+}
+
+func GetProducerCtx(ctx context.Context) *ProducerCtx {
+ return ctx.Value(producerCtx).(*ProducerCtx)
+}
diff --git a/primitive/interceptor.go b/primitive/interceptor.go
index 3c4c659..7fd3f83 100644
--- a/primitive/interceptor.go
+++ b/primitive/interceptor.go
@@ -28,3 +28,9 @@ type Invoker func(ctx context.Context, req, reply
interface{}) error
// In PushConsumer call, the req is []*MessageExt type and the reply is
ConsumeResultHolder,
// use type assert to get real type.
type Interceptor func(ctx context.Context, req, reply interface{}, next
Invoker) error
+
+// config for message trace.
+type TraceConfig struct {
+ TraceTopic string
+ Access AccessChannel
+}
diff --git a/primitive/message.go b/primitive/message.go
index 1da6d5a..28c9eab 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -20,6 +20,7 @@ package primitive
import (
"fmt"
"strconv"
+ "strings"
"github.com/apache/rocketmq-client-go/internal/utils"
)
@@ -108,6 +109,23 @@ func (msg *Message) RemoveProperty(key string) string {
return value
}
+func (msg *Message) SetKeys(keys []string) {
+ var sb strings.Builder
+ for _, k := range keys {
+ sb.WriteString(k)
+ sb.WriteString(PropertyKeySeparator)
+ }
+ msg.PutProperty(PropertyKeys, sb.String())
+}
+
+func (msg *Message) GetTags() string {
+ return msg.Properties[PropertyTags]
+}
+
+func (msg *Message) GetKeys() string {
+ return msg.Properties[PropertyKeys]
+}
+
type MessageExt struct {
Message
MsgId string
@@ -129,6 +147,14 @@ func (msgExt *MessageExt) GetTags() string {
return msgExt.Properties[PropertyTags]
}
+func (msgExt *MessageExt) GetRegionID() string {
+ return msgExt.Properties[PropertyMsgRegion]
+}
+
+func (msgExt *MessageExt) IsTraceOn() string {
+ return msgExt.Properties[PropertyTraceSwitch]
+}
+
func (msgExt *MessageExt) String() string {
return fmt.Sprintf("[Message=%s, MsgId=%s, QueueId=%d, StoreSize=%d,
QueueOffset=%d, SysFlag=%d, "+
"BornTimestamp=%d, BornHost=%s, StoreTimestamp=%d,
StoreHost=%s, CommitLogOffset=%d, BodyCRC=%d, "+
@@ -162,3 +188,21 @@ func (mq MessageQueue) Equals(queue *MessageQueue) bool {
// TODO
return mq.BrokerName == queue.BrokerName && mq.Topic == queue.Topic &&
mq.QueueId == mq.QueueId
}
+
+type AccessChannel int
+
+const (
+ // connect to private IDC cluster.
+ Local AccessChannel = iota
+ // connect to Cloud service.
+ Cloud
+)
+
+type MessageType int
+
+const (
+ NormalMsg MessageType = iota
+ TransMsgHalf
+ TransMsgCommit
+ DelayMsg
+)
diff --git a/primitive/result.go b/primitive/result.go
index f21cc2e..4664cf0 100644
--- a/primitive/result.go
+++ b/primitive/result.go
@@ -20,7 +20,9 @@ package primitive
import (
"bytes"
"encoding/binary"
+ "encoding/hex"
"fmt"
+ "strings"
"github.com/apache/rocketmq-client-go/internal/utils"
)
@@ -203,7 +205,7 @@ func DecodeMessage(data []byte) []*MessageExt {
}
count += 2 + int(propertiesLength)
- msg.MsgId = createMessageId(hostBytes, msg.CommitLogOffset)
+ msg.MsgId = createMessageId(hostBytes, port,
msg.CommitLogOffset)
//count += 16
msgs = append(msgs, msg)
@@ -212,8 +214,12 @@ func DecodeMessage(data []byte) []*MessageExt {
return msgs
}
-func createMessageId(addr []byte, offset int64) string {
- return "msgID" // TODO
+func createMessageId(addr []byte, port int32, offset int64) string {
+ buffer := new(bytes.Buffer)
+ buffer.Write(addr)
+ binary.Write(buffer, binary.BigEndian, port)
+ binary.Write(buffer, binary.BigEndian, offset)
+ return strings.ToUpper(hex.EncodeToString(buffer.Bytes()))
}
// unmarshalProperties parse data into property kv pairs.
diff --git a/primitive/result_test.go b/primitive/result_test.go
index 131db8c..a3fd5ab 100644
--- a/primitive/result_test.go
+++ b/primitive/result_test.go
@@ -18,12 +18,14 @@ limitations under the License.
package primitive
import (
+ "strings"
"testing"
+ . "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/assert"
)
-func Test(t *testing.T) {
+func TestProperties(t *testing.T) {
kv := map[string]string{
"k1": "v1",
"k2": "v2",
@@ -32,3 +34,17 @@ func Test(t *testing.T) {
kv2 := unmarshalProperties([]byte(str))
assert.Equal(t, kv, kv2)
}
+
+func TestCreateMessageId(t *testing.T) {
+ Convey("MessageId gen", t, func() {
+ b := []byte{10, 93, 233, 58}
+ port := int32(10911)
+ offset := int64(4391252)
+ id := createMessageId(b, port, offset)
+
+ Convey("generated messageId should be equal to expected",
func() {
+ assert.Equal(t,
strings.ToLower("0A5DE93A00002A9F0000000000430154"), id)
+ })
+ })
+
+}
diff --git a/producer/interceptor.go b/producer/interceptor.go
new file mode 100644
index 0000000..46f8042
--- /dev/null
+++ b/producer/interceptor.go
@@ -0,0 +1,97 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+/**
+ * builtin interceptor
+ */
+package producer
+
+import (
+ "context"
+ "time"
+
+ "github.com/apache/rocketmq-client-go/internal"
+ "github.com/apache/rocketmq-client-go/internal/utils"
+ "github.com/apache/rocketmq-client-go/primitive"
+)
+
+// WithTrace support rocketmq trace:
https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.
+func WithTrace(traceCfg primitive.TraceConfig) Option {
+ return func(options *producerOptions) {
+
+ ori := options.Interceptors
+ options.Interceptors = make([]primitive.Interceptor, 0)
+ options.Interceptors = append(options.Interceptors,
newTraceInterceptor(traceCfg))
+ options.Interceptors = append(options.Interceptors, ori...)
+ }
+}
+
+func newTraceInterceptor(traceCfg primitive.TraceConfig) primitive.Interceptor
{
+ dispatcher := internal.NewTraceDispatcher(traceCfg.TraceTopic,
traceCfg.Access)
+ dispatcher.Start()
+
+ return func(ctx context.Context, req, reply interface{}, next
primitive.Invoker) error {
+ beginT := time.Now()
+ err := next(ctx, req, reply)
+
+ producerCtx := primitive.GetProducerCtx(ctx)
+ if producerCtx.Message.Topic == dispatcher.GetTraceTopicName() {
+ return next(ctx, req, reply)
+ }
+
+ // SendOneway && SendAsync has no reply.
+ if reply == nil {
+ return err
+ }
+
+ result := reply.(*primitive.SendResult)
+ if result.RegionID == "" || !result.TraceOn {
+ return err
+ }
+
+ sendSuccess := result.Status == primitive.SendOK
+ costT := time.Since(beginT).Nanoseconds() /
int64(time.Millisecond)
+ storeT := beginT.UnixNano()/int64(time.Millisecond) + costT/2
+
+ traceBean := internal.TraceBean{
+ Topic: producerCtx.Message.Topic,
+ Tags: producerCtx.Message.GetTags(),
+ Keys: producerCtx.Message.GetKeys(),
+ StoreHost: producerCtx.BrokerAddr,
+ ClientHost: utils.LocalIP,
+ BodyLength: len(producerCtx.Message.Body),
+ MsgType: producerCtx.MsgType,
+ MsgId: result.MsgID,
+ OffsetMsgId: result.OffsetMsgID,
+ StoreTime: storeT,
+ }
+
+ traceCtx := internal.TraceContext{
+ RequestId: internal.CreateUniqID(), // set id
+ TimeStamp: time.Now().UnixNano() /
int64(time.Millisecond),
+
+ TraceType: internal.Pub,
+ GroupName: producerCtx.ProducerGroup,
+ RegionId: result.RegionID,
+ TraceBeans: []internal.TraceBean{traceBean},
+ CostTime: costT,
+ IsSuccess: sendSuccess,
+ }
+ dispatcher.Append(traceCtx)
+ return err
+ }
+}
diff --git a/producer/producer.go b/producer/producer.go
index 12a72b6..13f7952 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/rocketmq-client-go/internal"
"github.com/apache/rocketmq-client-go/internal/remote"
+ "github.com/apache/rocketmq-client-go/internal/utils"
"github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
"github.com/pkg/errors"
@@ -129,6 +130,15 @@ func (p *defaultProducer) SendSync(ctx context.Context,
msg *primitive.Message)
resp := new(primitive.SendResult)
if p.interceptor != nil {
primitive.WithMethod(ctx, primitive.SendSync)
+ producerCtx := &primitive.ProducerCtx{
+ ProducerGroup: p.group,
+ CommunicationMode: primitive.SendSync,
+ BornHost: utils.LocalIP,
+ Message: *msg,
+ SendResult: resp,
+ }
+ ctx = primitive.WithProducerCtx(ctx, producerCtx)
+
err := p.interceptor(ctx, msg, resp, func(ctx context.Context,
req, reply interface{}) error {
var err error
realReq := req.(*primitive.Message)
@@ -151,6 +161,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg
*primitive.Message,
err error
)
+ var producerCtx *primitive.ProducerCtx
for retryCount := 0; retryCount < retryTime; retryCount++ {
mq := p.selectMessageQueue(msg)
if mq == nil {
@@ -163,6 +174,12 @@ func (p *defaultProducer) sendSync(ctx context.Context,
msg *primitive.Message,
return fmt.Errorf("topic=%s route info not found",
mq.Topic)
}
+ if p.interceptor != nil {
+ producerCtx = primitive.GetProducerCtx(ctx)
+ producerCtx.BrokerAddr = addr
+ producerCtx.MQ = *mq
+ }
+
res, _err := p.client.InvokeSync(addr, p.buildSendRequest(mq,
msg), 3*time.Second)
if _err != nil {
err = _err