This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new b4b4df3 feat: fix push consumer pause data race (#1217)
b4b4df3 is described below
commit b4b4df33301eb91a105ec57b2ef1d2ee352df895
Author: WeizhongTu <[email protected]>
AuthorDate: Wed Sep 3 17:01:37 2025 +0800
feat: fix push consumer pause data race (#1217)
---
consumer/consumer.go | 20 ++++++++++----------
consumer/pull_consumer.go | 11 ++++++-----
consumer/push_consumer.go | 16 ++++++++--------
3 files changed, 24 insertions(+), 23 deletions(-)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 0edeef2..98b4447 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -25,11 +25,11 @@ import (
"strconv"
"strings"
"sync"
- "sync/atomic"
"time"
jsoniter "github.com/json-iterator/go"
"github.com/tidwall/gjson"
+ "go.uber.org/atomic"
"github.com/apache/rocketmq-client-go/v2/errors"
"github.com/apache/rocketmq-client-go/v2/hooks"
@@ -250,8 +250,8 @@ type defaultConsumer struct {
cType ConsumeType
client internal.RMQClient
mqChanged func(topic string, mqAll, mqDivided []*primitive.MessageQueue)
- state int32
- pause bool
+ state *atomic.Int32
+ pause *atomic.Bool
once sync.Once
option consumerOptions
// key: primitive.MessageQueue
@@ -289,14 +289,14 @@ func (dc *defaultConsumer) start() error {
}
dc.client.Start()
- atomic.StoreInt32(&dc.state, int32(internal.StateRunning))
+ dc.state.Store(int32(internal.StateRunning))
dc.consumerStartTimestamp = time.Now().UnixNano() /
int64(time.Millisecond)
dc.stat = NewStatsManager()
return nil
}
func (dc *defaultConsumer) shutdown() error {
- atomic.StoreInt32(&dc.state, int32(internal.StateShutdown))
+ dc.state.Store(int32(internal.StateShutdown))
mqs := make([]*primitive.MessageQueue, 0)
dc.processQueueTable.Range(func(key, value interface{}) bool {
@@ -317,11 +317,11 @@ func (dc *defaultConsumer) shutdown() error {
}
func (dc *defaultConsumer) isRunning() bool {
- return atomic.LoadInt32(&dc.state) == int32(internal.StateRunning)
+ return dc.state.Load() == int32(internal.StateRunning)
}
func (dc *defaultConsumer) isStopped() bool {
- return atomic.LoadInt32(&dc.state) == int32(internal.StateShutdown)
+ return dc.state.Load() == int32(internal.StateShutdown)
}
func (dc *defaultConsumer) persistConsumerOffset() error {
@@ -371,7 +371,7 @@ func (dc *defaultConsumer) isSubscribeTopicNeedUpdate(topic
string) bool {
}
func (dc *defaultConsumer) doBalanceIfNotPaused() {
- if dc.pause {
+ if dc.pause.Load() {
rlog.Info("[BALANCE-SKIP] since consumer paused",
map[string]interface{}{
rlog.LogKeyConsumerGroup: dc.consumerGroup,
})
@@ -483,8 +483,8 @@ func (dc *defaultConsumer) SubscriptionDataList()
[]*internal.SubscriptionData {
}
func (dc *defaultConsumer) makeSureStateOK() error {
- if atomic.LoadInt32(&dc.state) != int32(internal.StateRunning) {
- return fmt.Errorf("state not running, actually: %v", dc.state)
+ if dc.state.Load() != int32(internal.StateRunning) {
+ return fmt.Errorf("state not running, actually: %v",
dc.state.Load())
}
return nil
}
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 9592061..c6af4f7 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -31,6 +31,7 @@ import (
errors2 "github.com/apache/rocketmq-client-go/v2/errors"
"github.com/apache/rocketmq-client-go/v2/internal/remote"
"github.com/apache/rocketmq-client-go/v2/internal/utils"
+ atomic2 "go.uber.org/atomic"
"github.com/pkg/errors"
@@ -111,7 +112,7 @@ func NewPullConsumer(options ...Option)
(*defaultPullConsumer, error) {
client:
internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
consumerGroup: utils.WrapNamespace(defaultOpts.Namespace,
defaultOpts.GroupName),
cType: _PullConsume,
- state: int32(internal.StateCreateJust),
+ state:
atomic2.NewInt32(int32(internal.StateCreateJust)),
prCh: make(chan PullRequest, 4),
model: defaultOpts.ConsumerModel,
option: defaultOpts,
@@ -149,8 +150,8 @@ func (pc *defaultPullConsumer) GetTopicRouteInfo(topic
string) ([]*primitive.Mes
}
func (pc *defaultPullConsumer) Subscribe(topic string, selector
MessageSelector) error {
- if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) ||
- atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) {
+ if pc.state.Load() == int32(internal.StateStartFailed) ||
+ pc.state.Load() == int32(internal.StateShutdown) {
return errors2.ErrStartTopic
}
if pc.SubType == Assign {
@@ -247,7 +248,7 @@ func (pc *defaultPullConsumer) Start() error {
if err != nil {
return
}
- atomic.StoreInt32(&pc.state, int32(internal.StateRunning))
+ pc.state.Store(int32(internal.StateRunning))
go func() {
for {
select {
@@ -837,7 +838,7 @@ func (pc *defaultPullConsumer) pullMessage(request
*PullRequest) {
goto NEXT
}
- if pc.pause {
+ if pc.pause.Load() {
rlog.Debug(fmt.Sprintf("defaultPullConsumer [%s] of
[%s] was paused, execute pull request [%s] later",
pc.option.InstanceName, pc.consumerGroup,
request.String()), nil)
sleepTime = _PullDelayTimeWhenSuspend
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index e6ec3e5..0c2a630 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -25,10 +25,10 @@ import (
"strconv"
"strings"
"sync"
- "sync/atomic"
"time"
errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+ "go.uber.org/atomic"
"github.com/pkg/errors"
@@ -97,7 +97,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
client:
internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
consumerGroup: defaultOpts.GroupName,
cType: _PushConsume,
- state: int32(internal.StateCreateJust),
+ state:
atomic.NewInt32(int32(internal.StateCreateJust)),
prCh: make(chan PullRequest, 4),
model: defaultOpts.ConsumerModel,
consumeOrderly: defaultOpts.ConsumeOrderly,
@@ -138,7 +138,7 @@ func (pc *pushConsumer) Start() error {
"messageModel": pc.model,
"unitMode": pc.unitMode,
})
- atomic.StoreInt32(&pc.state, int32(internal.StateStartFailed))
+ pc.state.Store(int32(internal.StateStartFailed))
err = pc.validate()
if err != nil {
rlog.Error("the consumer group option validate fail",
map[string]interface{}{
@@ -289,8 +289,8 @@ func (pc *pushConsumer) Shutdown() error {
func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
f func(context.Context, ...*primitive.MessageExt) (ConsumeResult,
error)) error {
- if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) ||
- atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) {
+ if pc.state.Load() == int32(internal.StateStartFailed) ||
+ pc.state.Load() == int32(internal.StateShutdown) {
return errors2.ErrStartTopic
}
@@ -685,7 +685,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
goto NEXT
}
- if pc.pause {
+ if pc.pause.Load() {
rlog.Debug(fmt.Sprintf("consumer [%s] of [%s] was
paused, execute pull request [%s] later",
pc.option.InstanceName, pc.consumerGroup,
request.String()), nil)
sleepTime = _PullDelayTimeWhenSuspend
@@ -945,12 +945,12 @@ func (pc *pushConsumer) buildSendBackRequest(msg
*primitive.MessageExt, delayLev
}
func (pc *pushConsumer) suspend() {
- pc.pause = true
+ pc.pause.Store(true)
rlog.Info(fmt.Sprintf("suspend consumer: %s", pc.consumerGroup), nil)
}
func (pc *pushConsumer) resume() {
- pc.pause = false
+ pc.pause.Store(false)
pc.doBalance()
rlog.Info(fmt.Sprintf("resume consumer: %s", pc.consumerGroup), nil)
}