This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 8f76a40  feat(consumer): move updatePullFromWhichNode ahead of result 
check (#390)
8f76a40 is described below

commit 8f76a40bb4981a947212fcf490b3f61344a19417
Author: xujianhai666 <[email protected]>
AuthorDate: Wed Jan 15 10:42:16 2020 +0800

    feat(consumer): move updatePullFromWhichNode ahead of result check (#390)
    
    - consistent with java
    
    Closes #389
---
 consumer/consumer.go | 20 +++++++++-----------
 1 file changed, 9 insertions(+), 11 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 2179e10..cacbdd1 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -268,6 +268,8 @@ type defaultConsumer struct {
        prCh chan PullRequest
 
        namesrv internal.Namesrvs
+
+       pullFromWhichNodeTable sync.Map
 }
 
 func (dc *defaultConsumer) start() error {
@@ -855,7 +857,7 @@ func (dc *defaultConsumer) pullInner(ctx context.Context, 
queue *primitive.Messa
 
 func (dc *defaultConsumer) processPullResult(mq *primitive.MessageQueue, 
result *primitive.PullResult, data *internal.SubscriptionData) {
 
-       updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
+       dc.updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
 
        switch result.Status {
        case primitive.PullFound:
@@ -1041,24 +1043,20 @@ func clearCommitOffsetFlag(sysFlag int32) int32 {
 }
 
 func (dc *defaultConsumer) tryFindBroker(mq *primitive.MessageQueue) 
*internal.FindBrokerResult {
-       result := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, 
recalculatePullFromWhichNode(mq), false)
+       result := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, 
dc.recalculatePullFromWhichNode(mq), false)
        if result != nil {
                return result
        }
        dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
-       return dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, 
recalculatePullFromWhichNode(mq), false)
+       return dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, 
dc.recalculatePullFromWhichNode(mq), false)
 }
 
-var (
-       pullFromWhichNodeTable sync.Map
-)
-
-func updatePullFromWhichNode(mq *primitive.MessageQueue, brokerId int64) {
-       pullFromWhichNodeTable.Store(*mq, brokerId)
+func (dc *defaultConsumer) updatePullFromWhichNode(mq *primitive.MessageQueue, 
brokerId int64) {
+       dc.pullFromWhichNodeTable.Store(*mq, brokerId)
 }
 
-func recalculatePullFromWhichNode(mq *primitive.MessageQueue) int64 {
-       v, exist := pullFromWhichNodeTable.Load(*mq)
+func (dc *defaultConsumer) recalculatePullFromWhichNode(mq 
*primitive.MessageQueue) int64 {
+       v, exist := dc.pullFromWhichNodeTable.Load(*mq)
        if exist {
                return v.(int64)
        }

Reply via email to