This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 2630383  [ISSUE #901]Close the traceDispatcher when producer or 
consumer is shutdown (#902)
2630383 is described below

commit 26303836092a1fadbe4a1239c8abbcf504a36fce
Author: zhangjidi2016 <[email protected]>
AuthorDate: Wed Aug 24 15:44:15 2022 +0800

    [ISSUE #901]Close the traceDispatcher when producer or consumer is shutdown 
(#902)
    
    Co-authored-by: zhangjidi <[email protected]>
---
 consumer/interceptor.go   | 8 ++++----
 consumer/option.go        | 2 ++
 consumer/push_consumer.go | 3 +++
 producer/interceptor.go   | 8 ++++----
 producer/option.go        | 1 +
 producer/producer.go      | 3 +++
 6 files changed, 17 insertions(+), 8 deletions(-)

diff --git a/consumer/interceptor.go b/consumer/interceptor.go
index 05ff94a..7878809 100644
--- a/consumer/interceptor.go
+++ b/consumer/interceptor.go
@@ -30,16 +30,16 @@ import (
 // WithTrace support rocketmq trace: 
https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.
 func WithTrace(traceCfg *primitive.TraceConfig) Option {
        return func(options *consumerOptions) {
-
+               dispatcher := internal.NewTraceDispatcher(traceCfg)
+               options.TraceDispatcher = dispatcher
                ori := options.Interceptors
                options.Interceptors = make([]primitive.Interceptor, 0)
-               options.Interceptors = append(options.Interceptors, 
newTraceInterceptor(traceCfg))
+               options.Interceptors = append(options.Interceptors, 
newTraceInterceptor(dispatcher))
                options.Interceptors = append(options.Interceptors, ori...)
        }
 }
 
-func newTraceInterceptor(traceCfg *primitive.TraceConfig) 
primitive.Interceptor {
-       dispatcher := internal.NewTraceDispatcher(traceCfg)
+func newTraceInterceptor(dispatcher internal.TraceDispatcher) 
primitive.Interceptor {
        if dispatcher != nil {
                dispatcher.Start()
        }
diff --git a/consumer/option.go b/consumer/option.go
index 2d51b46..f2cb9e8 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -113,6 +113,8 @@ type consumerOptions struct {
        filterMessageHooks []hooks.FilterMessageHook
 
        Limiter Limiter
+
+       TraceDispatcher internal.TraceDispatcher
 }
 
 func defaultPushConsumerOptions() consumerOptions {
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 81c74aa..3ee5470 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -241,6 +241,9 @@ func (pc *pushConsumer) Start() error {
 func (pc *pushConsumer) Shutdown() error {
        var err error
        pc.closeOnce.Do(func() {
+               if pc.option.TraceDispatcher != nil {
+                       pc.option.TraceDispatcher.Close()
+               }
                close(pc.done)
 
                pc.client.UnregisterConsumer(pc.consumerGroup)
diff --git a/producer/interceptor.go b/producer/interceptor.go
index f77b9c6..956bfec 100644
--- a/producer/interceptor.go
+++ b/producer/interceptor.go
@@ -33,16 +33,16 @@ import (
 // WithTrace support rocketmq trace: 
https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.
 func WithTrace(traceCfg *primitive.TraceConfig) Option {
        return func(options *producerOptions) {
-
+               dispatcher := internal.NewTraceDispatcher(traceCfg)
+               options.TraceDispatcher = dispatcher
                ori := options.Interceptors
                options.Interceptors = make([]primitive.Interceptor, 0)
-               options.Interceptors = append(options.Interceptors, 
newTraceInterceptor(traceCfg))
+               options.Interceptors = append(options.Interceptors, 
newTraceInterceptor(dispatcher))
                options.Interceptors = append(options.Interceptors, ori...)
        }
 }
 
-func newTraceInterceptor(traceCfg *primitive.TraceConfig) 
primitive.Interceptor {
-       dispatcher := internal.NewTraceDispatcher(traceCfg)
+func newTraceInterceptor(dispatcher internal.TraceDispatcher) 
primitive.Interceptor {
        if dispatcher != nil {
                dispatcher.Start()
        }
diff --git a/producer/option.go b/producer/option.go
index ae76511..9fd8374 100644
--- a/producer/option.go
+++ b/producer/option.go
@@ -49,6 +49,7 @@ type producerOptions struct {
        Resolver                   primitive.NsResolver
        CompressMsgBodyOverHowmuch int
        CompressLevel              int
+       TraceDispatcher            internal.TraceDispatcher
 }
 
 type Option func(*producerOptions)
diff --git a/producer/producer.go b/producer/producer.go
index 3a71003..fa2a832 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -100,6 +100,9 @@ func (p *defaultProducer) Start() error {
 
 func (p *defaultProducer) Shutdown() error {
        p.ShutdownOnce.Do(func() {
+               if p.options.TraceDispatcher != nil {
+                       p.options.TraceDispatcher.Close()
+               }
                atomic.StoreInt32(&p.state, int32(internal.StateShutdown))
                p.client.UnregisterProducer(p.group)
                p.client.Shutdown()

Reply via email to