This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new c6b08aa  fix: using atomic status for producer/consumer (#320)
c6b08aa is described below

commit c6b08aafd5f8fe0b2d54b3377c5cc70b7bdf8d43
Author: xujianhai666 <[email protected]>
AuthorDate: Thu Dec 5 16:20:06 2019 +0800

    fix: using atomic status for producer/consumer (#320)
    
    - using atomic for status
    
    Closes #318
---
 consumer/consumer.go      | 10 ++++++----
 consumer/pull_consumer.go |  7 ++++---
 consumer/push_consumer.go |  8 ++++----
 internal/model.go         |  2 +-
 producer/producer.go      |  9 +++++----
 5 files changed, 20 insertions(+), 16 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 070cd19..a10d3c5 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -25,6 +25,7 @@ import (
        "strconv"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 
        "github.com/pkg/errors"
@@ -247,7 +248,7 @@ type defaultConsumer struct {
        cType     ConsumeType
        client    internal.RMQClient
        mqChanged func(topic string, mqAll, mqDivided []*primitive.MessageQueue)
-       state     internal.ServiceState
+       state     int32
        pause     bool
        once      sync.Once
        option    consumerOptions
@@ -287,13 +288,14 @@ func (dc *defaultConsumer) start() error {
 
        dc.client.UpdateTopicRouteInfo()
        dc.client.Start()
-       dc.state = internal.StateRunning
+       atomic.StoreInt32(&dc.state, int32(internal.StateRunning))
        dc.consumerStartTimestamp = time.Now().UnixNano() / 
int64(time.Millisecond)
        return nil
 }
 
 func (dc *defaultConsumer) shutdown() error {
-       dc.state = internal.StateShutdown
+       atomic.StoreInt32(&dc.state, int32(internal.StateShutdown))
+
        mqs := make([]*primitive.MessageQueue, 0)
        dc.processQueueTable.Range(func(key, value interface{}) bool {
                k := key.(primitive.MessageQueue)
@@ -435,7 +437,7 @@ func (dc *defaultConsumer) SubscriptionDataList() 
[]*internal.SubscriptionData {
 }
 
 func (dc *defaultConsumer) makeSureStateOK() error {
-       if dc.state != internal.StateRunning {
+       if atomic.LoadInt32(&dc.state) != int32(internal.StateRunning) {
                return fmt.Errorf("state not running, actually: %v", dc.state)
        }
        return nil
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 6a6fa64..f7c8ef4 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -82,9 +82,10 @@ func NewPullConsumer(options ...Option) 
(*defaultPullConsumer, error) {
        }
 
        dc := &defaultConsumer{
+               client:        
internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
                consumerGroup: defaultOpts.GroupName,
                cType:         _PullConsume,
-               state:         internal.StateCreateJust,
+               state:         int32(internal.StateCreateJust),
                prCh:          make(chan PullRequest, 4),
                model:         defaultOpts.ConsumerModel,
                option:        defaultOpts,
@@ -99,7 +100,7 @@ func NewPullConsumer(options ...Option) 
(*defaultPullConsumer, error) {
 }
 
 func (c *defaultPullConsumer) Start() error {
-       c.state = internal.StateRunning
+       atomic.StoreInt32(&c.state, int32(internal.StateRunning))
 
        var err error
        c.once.Do(func() {
@@ -208,7 +209,7 @@ func (c *defaultPullConsumer) pull(ctx context.Context, mq 
*primitive.MessageQue
 }
 
 func (c *defaultPullConsumer) makeSureStateOK() error {
-       if c.state != internal.StateRunning {
+       if atomic.LoadInt32(&c.state) != int32(internal.StateRunning) {
                return fmt.Errorf("the consumer state is [%d], not running", 
c.state)
        }
        return nil
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 98b0033..c915a32 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -23,6 +23,7 @@ import (
        "math"
        "strconv"
        "sync"
+       "sync/atomic"
        "time"
 
        "github.com/pkg/errors"
@@ -92,7 +93,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
                client:         
internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
                consumerGroup:  defaultOpts.GroupName,
                cType:          _PushConsume,
-               state:          internal.StateCreateJust,
+               state:          int32(internal.StateCreateJust),
                prCh:           make(chan PullRequest, 4),
                model:          defaultOpts.ConsumerModel,
                consumeOrderly: defaultOpts.ConsumeOrderly,
@@ -130,12 +131,11 @@ func (pc *pushConsumer) Start() error {
                        "messageModel":           pc.model,
                        "unitMode":               pc.unitMode,
                })
-               pc.state = internal.StateStartFailed
+               atomic.StoreInt32(&pc.state, int32(internal.StateStartFailed))
                pc.validate()
 
                err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
                if err != nil {
-                       pc.state = internal.StateStartFailed
                        rlog.Error("the consumer group has been created, 
specify another one", map[string]interface{}{
                                rlog.LogKeyConsumerGroup: pc.consumerGroup,
                        })
@@ -220,7 +220,7 @@ func (pc *pushConsumer) Shutdown() error {
 
 func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
        f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, 
error)) error {
-       if pc.state != internal.StateCreateJust {
+       if atomic.LoadInt32(&pc.state) != int32(internal.StateCreateJust) {
                return errors.New("subscribe topic only started before")
        }
        if pc.option.Namespace != "" {
diff --git a/internal/model.go b/internal/model.go
index ea1fdac..61ecbbc 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -39,7 +39,7 @@ type (
        // groupName of consumer
        consumeType string
 
-       ServiceState int
+       ServiceState int32
 )
 
 const (
diff --git a/producer/producer.go b/producer/producer.go
index 78cd8d9..287df58 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -22,6 +22,7 @@ import (
        "fmt"
        "strconv"
        "sync"
+       "sync/atomic"
        "time"
 
        "github.com/pkg/errors"
@@ -42,7 +43,7 @@ var (
 type defaultProducer struct {
        group       string
        client      internal.RMQClient
-       state       internal.ServiceState
+       state       int32
        options     producerOptions
        publishInfo sync.Map
        callbackCh  chan interface{}
@@ -77,20 +78,20 @@ func NewDefaultProducer(opts ...Option) (*defaultProducer, 
error) {
 }
 
 func (p *defaultProducer) Start() error {
-       p.state = internal.StateRunning
+       atomic.StoreInt32(&p.state, int32(internal.StateRunning))
        p.client.RegisterProducer(p.group, p)
        p.client.Start()
        return nil
 }
 
 func (p *defaultProducer) Shutdown() error {
-       p.state = internal.StateShutdown
+       atomic.StoreInt32(&p.state, int32(internal.StateShutdown))
        p.client.Shutdown()
        return nil
 }
 
 func (p *defaultProducer) checkMsg(msg *primitive.Message) error {
-       if p.state != internal.StateRunning {
+       if atomic.LoadInt32(&p.state) != int32(internal.StateRunning) {
                return ErrNotRunning
        }
 

Reply via email to