This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 7b021b1  fix: lots of  incorrect impl (#322)
7b021b1 is described below

commit 7b021b13e42fd912f69b98061ad55872be9258fc
Author: xujianhai666 <[email protected]>
AuthorDate: Tue Dec 17 14:38:47 2019 +0800

    fix: lots of  incorrect impl (#322)
    
    Closes #321
---
 consumer/consumer.go      | 10 ++--------
 consumer/push_consumer.go | 20 +++++++-------------
 2 files changed, 9 insertions(+), 21 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index fcb14d2..6ebe9ff 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -341,13 +341,7 @@ func (dc *defaultConsumer) subscriptionAutomatically(topic 
string) {
 
 func (dc *defaultConsumer) updateTopicSubscribeInfo(topic string, mqs 
[]*primitive.MessageQueue) {
        _, exist := dc.subscriptionDataTable.Load(topic)
-       // does subscribe, if true, replace it
        if exist {
-               mqSet := make(map[int]*primitive.MessageQueue, 0)
-               for idx := range mqs {
-                       mq := mqs[idx]
-                       mqSet[mq.HashCode()] = mq
-               }
                dc.topicSubscribeInfoTable.Store(topic, mqs)
        }
 }
@@ -1058,11 +1052,11 @@ var (
 )
 
 func updatePullFromWhichNode(mq *primitive.MessageQueue, brokerId int64) {
-       pullFromWhichNodeTable.Store(mq.HashCode(), brokerId)
+       pullFromWhichNodeTable.Store(*mq, brokerId)
 }
 
 func recalculatePullFromWhichNode(mq *primitive.MessageQueue) int64 {
-       v, exist := pullFromWhichNodeTable.Load(mq.HashCode())
+       v, exist := pullFromWhichNodeTable.Load(*mq)
        if exist {
                return v.(int64)
        }
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 6d70393..74251a1 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -764,14 +764,10 @@ func (pc *pushConsumer) resetOffset(topic string, table 
map[primitive.MessageQue
        //      return
        //}
 
-       set := make(map[int]*primitive.MessageQueue, 0)
-       for k := range table {
-               set[k.HashCode()] = &k
-       }
        pc.processQueueTable.Range(func(key, value interface{}) bool {
-               mqHash := value.(int)
+               mq := key.(primitive.MessageQueue)
                pq := value.(*processQueue)
-               if set[mqHash] != nil {
+               if _, ok := table[mq]; !ok {
                        pq.WithDropped(true)
                        pq.clear()
                }
@@ -782,18 +778,16 @@ func (pc *pushConsumer) resetOffset(topic string, table 
map[primitive.MessageQue
        if !exist {
                return
        }
-       queuesOfTopic := v.(map[int]primitive.MessageQueue)
-       for k := range queuesOfTopic {
-               q := set[k]
-               if q != nil {
-                       pc.storage.update(q, table[*q], false)
+       queuesOfTopic := v.([]primitive.MessageQueue)
+       for _, k := range queuesOfTopic {
+               if _, ok := table[k]; ok {
+                       pc.storage.update(&k, table[k], false)
                        v, exist := pc.processQueueTable.Load(k)
                        if !exist {
                                continue
                        }
                        pq := v.(*processQueue)
-                       pc.removeUnnecessaryMessageQueue(q, pq)
-                       delete(queuesOfTopic, k)
+                       pc.removeUnnecessaryMessageQueue(&k, pq)
                }
        }
 }

Reply via email to