This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new c6b08aa fix: using atomic status for producer/consumer (#320)
c6b08aa is described below
commit c6b08aafd5f8fe0b2d54b3377c5cc70b7bdf8d43
Author: xujianhai666 <[email protected]>
AuthorDate: Thu Dec 5 16:20:06 2019 +0800
fix: using atomic status for producer/consumer (#320)
- using atomic for status
Closes #318
---
consumer/consumer.go | 10 ++++++----
consumer/pull_consumer.go | 7 ++++---
consumer/push_consumer.go | 8 ++++----
internal/model.go | 2 +-
producer/producer.go | 9 +++++----
5 files changed, 20 insertions(+), 16 deletions(-)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 070cd19..a10d3c5 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -25,6 +25,7 @@ import (
"strconv"
"strings"
"sync"
+ "sync/atomic"
"time"
"github.com/pkg/errors"
@@ -247,7 +248,7 @@ type defaultConsumer struct {
cType ConsumeType
client internal.RMQClient
mqChanged func(topic string, mqAll, mqDivided []*primitive.MessageQueue)
- state internal.ServiceState
+ state int32
pause bool
once sync.Once
option consumerOptions
@@ -287,13 +288,14 @@ func (dc *defaultConsumer) start() error {
dc.client.UpdateTopicRouteInfo()
dc.client.Start()
- dc.state = internal.StateRunning
+ atomic.StoreInt32(&dc.state, int32(internal.StateRunning))
dc.consumerStartTimestamp = time.Now().UnixNano() /
int64(time.Millisecond)
return nil
}
func (dc *defaultConsumer) shutdown() error {
- dc.state = internal.StateShutdown
+ atomic.StoreInt32(&dc.state, int32(internal.StateShutdown))
+
mqs := make([]*primitive.MessageQueue, 0)
dc.processQueueTable.Range(func(key, value interface{}) bool {
k := key.(primitive.MessageQueue)
@@ -435,7 +437,7 @@ func (dc *defaultConsumer) SubscriptionDataList()
[]*internal.SubscriptionData {
}
func (dc *defaultConsumer) makeSureStateOK() error {
- if dc.state != internal.StateRunning {
+ if atomic.LoadInt32(&dc.state) != int32(internal.StateRunning) {
return fmt.Errorf("state not running, actually: %v", dc.state)
}
return nil
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 6a6fa64..f7c8ef4 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -82,9 +82,10 @@ func NewPullConsumer(options ...Option)
(*defaultPullConsumer, error) {
}
dc := &defaultConsumer{
+ client:
internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
consumerGroup: defaultOpts.GroupName,
cType: _PullConsume,
- state: internal.StateCreateJust,
+ state: int32(internal.StateCreateJust),
prCh: make(chan PullRequest, 4),
model: defaultOpts.ConsumerModel,
option: defaultOpts,
@@ -99,7 +100,7 @@ func NewPullConsumer(options ...Option)
(*defaultPullConsumer, error) {
}
func (c *defaultPullConsumer) Start() error {
- c.state = internal.StateRunning
+ atomic.StoreInt32(&c.state, int32(internal.StateRunning))
var err error
c.once.Do(func() {
@@ -208,7 +209,7 @@ func (c *defaultPullConsumer) pull(ctx context.Context, mq
*primitive.MessageQue
}
func (c *defaultPullConsumer) makeSureStateOK() error {
- if c.state != internal.StateRunning {
+ if atomic.LoadInt32(&c.state) != int32(internal.StateRunning) {
return fmt.Errorf("the consumer state is [%d], not running",
c.state)
}
return nil
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 98b0033..c915a32 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -23,6 +23,7 @@ import (
"math"
"strconv"
"sync"
+ "sync/atomic"
"time"
"github.com/pkg/errors"
@@ -92,7 +93,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
client:
internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
consumerGroup: defaultOpts.GroupName,
cType: _PushConsume,
- state: internal.StateCreateJust,
+ state: int32(internal.StateCreateJust),
prCh: make(chan PullRequest, 4),
model: defaultOpts.ConsumerModel,
consumeOrderly: defaultOpts.ConsumeOrderly,
@@ -130,12 +131,11 @@ func (pc *pushConsumer) Start() error {
"messageModel": pc.model,
"unitMode": pc.unitMode,
})
- pc.state = internal.StateStartFailed
+ atomic.StoreInt32(&pc.state, int32(internal.StateStartFailed))
pc.validate()
err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
if err != nil {
- pc.state = internal.StateStartFailed
rlog.Error("the consumer group has been created,
specify another one", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
@@ -220,7 +220,7 @@ func (pc *pushConsumer) Shutdown() error {
func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
f func(context.Context, ...*primitive.MessageExt) (ConsumeResult,
error)) error {
- if pc.state != internal.StateCreateJust {
+ if atomic.LoadInt32(&pc.state) != int32(internal.StateCreateJust) {
return errors.New("subscribe topic only started before")
}
if pc.option.Namespace != "" {
diff --git a/internal/model.go b/internal/model.go
index ea1fdac..61ecbbc 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -39,7 +39,7 @@ type (
// groupName of consumer
consumeType string
- ServiceState int
+ ServiceState int32
)
const (
diff --git a/producer/producer.go b/producer/producer.go
index 78cd8d9..287df58 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -22,6 +22,7 @@ import (
"fmt"
"strconv"
"sync"
+ "sync/atomic"
"time"
"github.com/pkg/errors"
@@ -42,7 +43,7 @@ var (
type defaultProducer struct {
group string
client internal.RMQClient
- state internal.ServiceState
+ state int32
options producerOptions
publishInfo sync.Map
callbackCh chan interface{}
@@ -77,20 +78,20 @@ func NewDefaultProducer(opts ...Option) (*defaultProducer,
error) {
}
func (p *defaultProducer) Start() error {
- p.state = internal.StateRunning
+ atomic.StoreInt32(&p.state, int32(internal.StateRunning))
p.client.RegisterProducer(p.group, p)
p.client.Start()
return nil
}
func (p *defaultProducer) Shutdown() error {
- p.state = internal.StateShutdown
+ atomic.StoreInt32(&p.state, int32(internal.StateShutdown))
p.client.Shutdown()
return nil
}
func (p *defaultProducer) checkMsg(msg *primitive.Message) error {
- if p.state != internal.StateRunning {
+ if atomic.LoadInt32(&p.state) != int32(internal.StateRunning) {
return ErrNotRunning
}