This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f3349bd  [ISSUE #939]Pull or poll message support message trace (#940)
f3349bd is described below

commit f3349bdbb4dbcf9a7481563cb0fc3dd3d3ce3985
Author: zhangjidi2016 <[email protected]>
AuthorDate: Fri Oct 14 15:09:57 2022 +0800

    [ISSUE #939]Pull or poll message support message trace (#940)
    
    Co-authored-by: zhangjidi <[email protected]>
---
 consumer/pull_consumer.go | 29 +++++++++++++++++++++++++++++
 1 file changed, 29 insertions(+)

diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 6c24c63..a7e0ec5 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -79,6 +79,7 @@ type defaultPullConsumer struct {
        closeOnce           sync.Once
        consumeRequestCache chan *ConsumeRequest
        submitToConsume     func(*processQueue, *primitive.MessageQueue)
+       interceptor         primitive.Interceptor
 }
 
 func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
@@ -115,6 +116,7 @@ func NewPullConsumer(options ...Option) 
(*defaultPullConsumer, error) {
        }
        dc.mqChanged = c.messageQueueChanged
        c.submitToConsume = c.consumeMessageCurrently
+       c.interceptor = primitive.ChainInterceptors(c.option.Interceptors...)
        return c, nil
 }
 
@@ -251,8 +253,16 @@ RETRY:
 
        if result == ConsumeSuccess {
                msgCtx.Properties[primitive.PropCtxType] = 
string(primitive.SuccessReturn)
+               msgCtx.Success = true
        } else {
                msgCtx.Properties[primitive.PropCtxType] = 
string(primitive.FailedReturn)
+               msgCtx.Success = false
+       }
+
+       if pc.interceptor != nil {
+               pc.interceptor(ctx, msgList, nil, func(ctx context.Context, 
req, reply interface{}) error {
+                       return nil
+               })
        }
 
        if !pq.IsDroppd() {
@@ -334,6 +344,22 @@ func (pc *defaultPullConsumer) Pull(ctx context.Context, 
numbers int) (*primitiv
        }
 
        pc.processPullResult(mq, result, data)
+
+       if pc.interceptor != nil {
+               msgCtx := &primitive.ConsumeMessageContext{
+                       Properties:    make(map[string]string),
+                       ConsumerGroup: pc.consumerGroup,
+                       MQ:            mq,
+                       Msgs:          result.GetMessageExts(),
+                       Success:       true,
+               }
+               ctx = primitive.WithConsumerCtx(ctx, msgCtx)
+               ctx = primitive.WithMethod(ctx, primitive.ConsumerPull)
+               pc.interceptor(ctx, result.GetMessageExts(), nil, func(ctx 
context.Context, req, reply interface{}) error {
+                       return nil
+               })
+       }
+
        return result, nil
 }
 
@@ -454,6 +480,9 @@ func (pc *defaultPullConsumer) CurrentOffset(queue 
*primitive.MessageQueue) (int
 func (pc *defaultPullConsumer) Shutdown() error {
        var err error
        pc.closeOnce.Do(func() {
+               if pc.option.TraceDispatcher != nil {
+                       pc.option.TraceDispatcher.Close()
+               }
                close(pc.done)
 
                pc.client.UnregisterConsumer(pc.consumerGroup)

Reply via email to