This is an automated email from the ASF dual-hosted git repository.
wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 7b021b1 fix: lots of incorrect impl (#322)
7b021b1 is described below
commit 7b021b13e42fd912f69b98061ad55872be9258fc
Author: xujianhai666 <[email protected]>
AuthorDate: Tue Dec 17 14:38:47 2019 +0800
fix: lots of incorrect impl (#322)
Closes #321
---
consumer/consumer.go | 10 ++--------
consumer/push_consumer.go | 20 +++++++-------------
2 files changed, 9 insertions(+), 21 deletions(-)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index fcb14d2..6ebe9ff 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -341,13 +341,7 @@ func (dc *defaultConsumer) subscriptionAutomatically(topic
string) {
func (dc *defaultConsumer) updateTopicSubscribeInfo(topic string, mqs
[]*primitive.MessageQueue) {
_, exist := dc.subscriptionDataTable.Load(topic)
- // does subscribe, if true, replace it
if exist {
- mqSet := make(map[int]*primitive.MessageQueue, 0)
- for idx := range mqs {
- mq := mqs[idx]
- mqSet[mq.HashCode()] = mq
- }
dc.topicSubscribeInfoTable.Store(topic, mqs)
}
}
@@ -1058,11 +1052,11 @@ var (
)
func updatePullFromWhichNode(mq *primitive.MessageQueue, brokerId int64) {
- pullFromWhichNodeTable.Store(mq.HashCode(), brokerId)
+ pullFromWhichNodeTable.Store(*mq, brokerId)
}
func recalculatePullFromWhichNode(mq *primitive.MessageQueue) int64 {
- v, exist := pullFromWhichNodeTable.Load(mq.HashCode())
+ v, exist := pullFromWhichNodeTable.Load(*mq)
if exist {
return v.(int64)
}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 6d70393..74251a1 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -764,14 +764,10 @@ func (pc *pushConsumer) resetOffset(topic string, table
map[primitive.MessageQue
// return
//}
- set := make(map[int]*primitive.MessageQueue, 0)
- for k := range table {
- set[k.HashCode()] = &k
- }
pc.processQueueTable.Range(func(key, value interface{}) bool {
- mqHash := value.(int)
+ mq := key.(primitive.MessageQueue)
pq := value.(*processQueue)
- if set[mqHash] != nil {
+ if _, ok := table[mq]; !ok {
pq.WithDropped(true)
pq.clear()
}
@@ -782,18 +778,16 @@ func (pc *pushConsumer) resetOffset(topic string, table
map[primitive.MessageQue
if !exist {
return
}
- queuesOfTopic := v.(map[int]primitive.MessageQueue)
- for k := range queuesOfTopic {
- q := set[k]
- if q != nil {
- pc.storage.update(q, table[*q], false)
+ queuesOfTopic := v.([]primitive.MessageQueue)
+ for _, k := range queuesOfTopic {
+ if _, ok := table[k]; ok {
+ pc.storage.update(&k, table[k], false)
v, exist := pc.processQueueTable.Load(k)
if !exist {
continue
}
pq := v.(*processQueue)
- pc.removeUnnecessaryMessageQueue(q, pq)
- delete(queuesOfTopic, k)
+ pc.removeUnnecessaryMessageQueue(&k, pq)
}
}
}