This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new f3349bd [ISSUE #939]Pull or poll message support message trace (#940)
f3349bd is described below
commit f3349bdbb4dbcf9a7481563cb0fc3dd3d3ce3985
Author: zhangjidi2016 <[email protected]>
AuthorDate: Fri Oct 14 15:09:57 2022 +0800
[ISSUE #939]Pull or poll message support message trace (#940)
Co-authored-by: zhangjidi <[email protected]>
---
consumer/pull_consumer.go | 29 +++++++++++++++++++++++++++++
1 file changed, 29 insertions(+)
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 6c24c63..a7e0ec5 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -79,6 +79,7 @@ type defaultPullConsumer struct {
closeOnce sync.Once
consumeRequestCache chan *ConsumeRequest
submitToConsume func(*processQueue, *primitive.MessageQueue)
+ interceptor primitive.Interceptor
}
func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
@@ -115,6 +116,7 @@ func NewPullConsumer(options ...Option)
(*defaultPullConsumer, error) {
}
dc.mqChanged = c.messageQueueChanged
c.submitToConsume = c.consumeMessageCurrently
+ c.interceptor = primitive.ChainInterceptors(c.option.Interceptors...)
return c, nil
}
@@ -251,8 +253,16 @@ RETRY:
if result == ConsumeSuccess {
msgCtx.Properties[primitive.PropCtxType] =
string(primitive.SuccessReturn)
+ msgCtx.Success = true
} else {
msgCtx.Properties[primitive.PropCtxType] =
string(primitive.FailedReturn)
+ msgCtx.Success = false
+ }
+
+ if pc.interceptor != nil {
+ pc.interceptor(ctx, msgList, nil, func(ctx context.Context,
req, reply interface{}) error {
+ return nil
+ })
}
if !pq.IsDroppd() {
@@ -334,6 +344,22 @@ func (pc *defaultPullConsumer) Pull(ctx context.Context,
numbers int) (*primitiv
}
pc.processPullResult(mq, result, data)
+
+ if pc.interceptor != nil {
+ msgCtx := &primitive.ConsumeMessageContext{
+ Properties: make(map[string]string),
+ ConsumerGroup: pc.consumerGroup,
+ MQ: mq,
+ Msgs: result.GetMessageExts(),
+ Success: true,
+ }
+ ctx = primitive.WithConsumerCtx(ctx, msgCtx)
+ ctx = primitive.WithMethod(ctx, primitive.ConsumerPull)
+ pc.interceptor(ctx, result.GetMessageExts(), nil, func(ctx
context.Context, req, reply interface{}) error {
+ return nil
+ })
+ }
+
return result, nil
}
@@ -454,6 +480,9 @@ func (pc *defaultPullConsumer) CurrentOffset(queue
*primitive.MessageQueue) (int
func (pc *defaultPullConsumer) Shutdown() error {
var err error
pc.closeOnce.Do(func() {
+ if pc.option.TraceDispatcher != nil {
+ pc.option.TraceDispatcher.Close()
+ }
close(pc.done)
pc.client.UnregisterConsumer(pc.consumerGroup)