This is an automated email from the ASF dual-hosted git repository.
wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 8f76a40 feat(consumer): move updatePullFromWhichNode ahead of result
check (#390)
8f76a40 is described below
commit 8f76a40bb4981a947212fcf490b3f61344a19417
Author: xujianhai666 <[email protected]>
AuthorDate: Wed Jan 15 10:42:16 2020 +0800
feat(consumer): move updatePullFromWhichNode ahead of result check (#390)
- consistent with java
Closes #389
---
consumer/consumer.go | 20 +++++++++-----------
1 file changed, 9 insertions(+), 11 deletions(-)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 2179e10..cacbdd1 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -268,6 +268,8 @@ type defaultConsumer struct {
prCh chan PullRequest
namesrv internal.Namesrvs
+
+ pullFromWhichNodeTable sync.Map
}
func (dc *defaultConsumer) start() error {
@@ -855,7 +857,7 @@ func (dc *defaultConsumer) pullInner(ctx context.Context,
queue *primitive.Messa
func (dc *defaultConsumer) processPullResult(mq *primitive.MessageQueue,
result *primitive.PullResult, data *internal.SubscriptionData) {
- updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
+ dc.updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
switch result.Status {
case primitive.PullFound:
@@ -1041,24 +1043,20 @@ func clearCommitOffsetFlag(sysFlag int32) int32 {
}
func (dc *defaultConsumer) tryFindBroker(mq *primitive.MessageQueue)
*internal.FindBrokerResult {
- result := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName,
recalculatePullFromWhichNode(mq), false)
+ result := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName,
dc.recalculatePullFromWhichNode(mq), false)
if result != nil {
return result
}
dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
- return dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName,
recalculatePullFromWhichNode(mq), false)
+ return dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName,
dc.recalculatePullFromWhichNode(mq), false)
}
-var (
- pullFromWhichNodeTable sync.Map
-)
-
-func updatePullFromWhichNode(mq *primitive.MessageQueue, brokerId int64) {
- pullFromWhichNodeTable.Store(*mq, brokerId)
+func (dc *defaultConsumer) updatePullFromWhichNode(mq *primitive.MessageQueue,
brokerId int64) {
+ dc.pullFromWhichNodeTable.Store(*mq, brokerId)
}
-func recalculatePullFromWhichNode(mq *primitive.MessageQueue) int64 {
- v, exist := pullFromWhichNodeTable.Load(*mq)
+func (dc *defaultConsumer) recalculatePullFromWhichNode(mq
*primitive.MessageQueue) int64 {
+ v, exist := dc.pullFromWhichNodeTable.Load(*mq)
if exist {
return v.(int64)
}