This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 2630383 [ISSUE #901]Close the traceDispatcher when producer or
consumer is shutdown (#902)
2630383 is described below
commit 26303836092a1fadbe4a1239c8abbcf504a36fce
Author: zhangjidi2016 <[email protected]>
AuthorDate: Wed Aug 24 15:44:15 2022 +0800
[ISSUE #901]Close the traceDispatcher when producer or consumer is shutdown
(#902)
Co-authored-by: zhangjidi <[email protected]>
---
consumer/interceptor.go | 8 ++++----
consumer/option.go | 2 ++
consumer/push_consumer.go | 3 +++
producer/interceptor.go | 8 ++++----
producer/option.go | 1 +
producer/producer.go | 3 +++
6 files changed, 17 insertions(+), 8 deletions(-)
diff --git a/consumer/interceptor.go b/consumer/interceptor.go
index 05ff94a..7878809 100644
--- a/consumer/interceptor.go
+++ b/consumer/interceptor.go
@@ -30,16 +30,16 @@ import (
// WithTrace support rocketmq trace:
https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.
func WithTrace(traceCfg *primitive.TraceConfig) Option {
return func(options *consumerOptions) {
-
+ dispatcher := internal.NewTraceDispatcher(traceCfg)
+ options.TraceDispatcher = dispatcher
ori := options.Interceptors
options.Interceptors = make([]primitive.Interceptor, 0)
- options.Interceptors = append(options.Interceptors,
newTraceInterceptor(traceCfg))
+ options.Interceptors = append(options.Interceptors,
newTraceInterceptor(dispatcher))
options.Interceptors = append(options.Interceptors, ori...)
}
}
-func newTraceInterceptor(traceCfg *primitive.TraceConfig)
primitive.Interceptor {
- dispatcher := internal.NewTraceDispatcher(traceCfg)
+func newTraceInterceptor(dispatcher internal.TraceDispatcher)
primitive.Interceptor {
if dispatcher != nil {
dispatcher.Start()
}
diff --git a/consumer/option.go b/consumer/option.go
index 2d51b46..f2cb9e8 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -113,6 +113,8 @@ type consumerOptions struct {
filterMessageHooks []hooks.FilterMessageHook
Limiter Limiter
+
+ TraceDispatcher internal.TraceDispatcher
}
func defaultPushConsumerOptions() consumerOptions {
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 81c74aa..3ee5470 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -241,6 +241,9 @@ func (pc *pushConsumer) Start() error {
func (pc *pushConsumer) Shutdown() error {
var err error
pc.closeOnce.Do(func() {
+ if pc.option.TraceDispatcher != nil {
+ pc.option.TraceDispatcher.Close()
+ }
close(pc.done)
pc.client.UnregisterConsumer(pc.consumerGroup)
diff --git a/producer/interceptor.go b/producer/interceptor.go
index f77b9c6..956bfec 100644
--- a/producer/interceptor.go
+++ b/producer/interceptor.go
@@ -33,16 +33,16 @@ import (
// WithTrace support rocketmq trace:
https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.
func WithTrace(traceCfg *primitive.TraceConfig) Option {
return func(options *producerOptions) {
-
+ dispatcher := internal.NewTraceDispatcher(traceCfg)
+ options.TraceDispatcher = dispatcher
ori := options.Interceptors
options.Interceptors = make([]primitive.Interceptor, 0)
- options.Interceptors = append(options.Interceptors,
newTraceInterceptor(traceCfg))
+ options.Interceptors = append(options.Interceptors,
newTraceInterceptor(dispatcher))
options.Interceptors = append(options.Interceptors, ori...)
}
}
-func newTraceInterceptor(traceCfg *primitive.TraceConfig)
primitive.Interceptor {
- dispatcher := internal.NewTraceDispatcher(traceCfg)
+func newTraceInterceptor(dispatcher internal.TraceDispatcher)
primitive.Interceptor {
if dispatcher != nil {
dispatcher.Start()
}
diff --git a/producer/option.go b/producer/option.go
index ae76511..9fd8374 100644
--- a/producer/option.go
+++ b/producer/option.go
@@ -49,6 +49,7 @@ type producerOptions struct {
Resolver primitive.NsResolver
CompressMsgBodyOverHowmuch int
CompressLevel int
+ TraceDispatcher internal.TraceDispatcher
}
type Option func(*producerOptions)
diff --git a/producer/producer.go b/producer/producer.go
index 3a71003..fa2a832 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -100,6 +100,9 @@ func (p *defaultProducer) Start() error {
func (p *defaultProducer) Shutdown() error {
p.ShutdownOnce.Do(func() {
+ if p.options.TraceDispatcher != nil {
+ p.options.TraceDispatcher.Close()
+ }
atomic.StoreInt32(&p.state, int32(internal.StateShutdown))
p.client.UnregisterProducer(p.group)
p.client.Shutdown()