This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new be743f7  fix(route): update route (#339)
be743f7 is described below

commit be743f70415addae8f113861000bc1e079a1d4bd
Author: xujianhai666 <[email protected]>
AuthorDate: Tue Dec 24 10:41:20 2019 +0800

    fix(route): update route (#339)
    
    - fix producer publish update logic
    - fix consumer subcription update logic
    
    Closes #338
---
 internal/client.go       | 47 +++++++++++++++++++++++------------------------
 internal/mock_client.go  |  2 +-
 internal/mock_namesrv.go |  4 ++--
 internal/namesrv.go      |  2 +-
 internal/route.go        |  9 +++++----
 producer/producer.go     |  3 ++-
 6 files changed, 34 insertions(+), 33 deletions(-)

diff --git a/internal/client.go b/internal/client.go
index b1ad3a6..c41417d 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -148,7 +148,7 @@ type RMQClient interface {
        PullMessage(ctx context.Context, brokerAddrs string, request 
*PullMessageRequestHeader) (*primitive.PullResult, error)
        PullMessageAsync(ctx context.Context, brokerAddrs string, request 
*PullMessageRequestHeader, f func(result *primitive.PullResult)) error
        RebalanceImmediately()
-       UpdatePublishInfo(topic string, data *TopicRouteData)
+       UpdatePublishInfo(topic string, data *TopicRouteData, changed bool)
 }
 
 var _ RMQClient = new(rmqClient)
@@ -475,7 +475,8 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
                return true
        })
        for topic := range publishTopicSet {
-               c.UpdatePublishInfo(topic, 
c.namesrvs.UpdateTopicRouteInfo(topic))
+               data, changed := c.namesrvs.UpdateTopicRouteInfo(topic)
+               c.UpdatePublishInfo(topic, data, changed)
        }
 
        subscribedTopicSet := make(map[string]bool, 0)
@@ -489,7 +490,8 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
        })
 
        for topic := range subscribedTopicSet {
-               c.updateSubscribeInfo(topic, 
c.namesrvs.UpdateTopicRouteInfo(topic))
+               data, changed := c.namesrvs.UpdateTopicRouteInfo(topic)
+               c.updateSubscribeInfo(topic, data, changed)
        }
 }
 
@@ -639,36 +641,27 @@ func (c *rmqClient) RebalanceImmediately() {
        })
 }
 
-func (c *rmqClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
+func (c *rmqClient) UpdatePublishInfo(topic string, data *TopicRouteData, 
changed bool) {
        if data == nil {
                return
        }
-       if !c.isNeedUpdatePublishInfo(topic) {
-               return
-       }
-       c.producerMap.Range(func(key, value interface{}) bool {
-               p := value.(InnerProducer)
-               publishInfo := c.namesrvs.routeData2PublishInfo(topic, data)
-               publishInfo.HaveTopicRouterInfo = true
-               p.UpdateTopicPublishInfo(topic, publishInfo)
-               return true
-       })
-}
 
-func (c *rmqClient) isNeedUpdatePublishInfo(topic string) bool {
-       var result bool
        c.producerMap.Range(func(key, value interface{}) bool {
                p := value.(InnerProducer)
-               if p.IsPublishTopicNeedUpdate(topic) {
-                       result = true
-                       return false
+               updated := changed
+               if !updated {
+                       updated = p.IsPublishTopicNeedUpdate(topic)
+               }
+               if updated {
+                       publishInfo := c.namesrvs.routeData2PublishInfo(topic, 
data)
+                       publishInfo.HaveTopicRouterInfo = true
+                       p.UpdateTopicPublishInfo(topic, publishInfo)
                }
                return true
        })
-       return result
 }
 
-func (c *rmqClient) updateSubscribeInfo(topic string, data *TopicRouteData) {
+func (c *rmqClient) updateSubscribeInfo(topic string, data *TopicRouteData, 
changed bool) {
        if data == nil {
                return
        }
@@ -677,8 +670,14 @@ func (c *rmqClient) updateSubscribeInfo(topic string, data 
*TopicRouteData) {
        }
        c.consumerMap.Range(func(key, value interface{}) bool {
                consumer := value.(InnerConsumer)
-               // TODO
-               consumer.UpdateTopicSubscribeInfo(topic, 
routeData2SubscribeInfo(topic, data))
+               updated := changed
+               if !updated {
+                       updated = consumer.IsSubscribeTopicNeedUpdate(topic)
+               }
+               if updated {
+                       consumer.UpdateTopicSubscribeInfo(topic, 
routeData2SubscribeInfo(topic, data))
+               }
+
                return true
        })
 }
diff --git a/internal/mock_client.go b/internal/mock_client.go
index 730c073..244cd0c 100644
--- a/internal/mock_client.go
+++ b/internal/mock_client.go
@@ -398,7 +398,7 @@ func (mr *MockRMQClientMockRecorder) RebalanceImmediately() 
*gomock.Call {
 }
 
 // UpdatePublishInfo mocks base method
-func (m *MockRMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
+func (m *MockRMQClient) UpdatePublishInfo(topic string, data *TopicRouteData, 
changed bool) {
        m.ctrl.Call(m, "UpdatePublishInfo", topic, data)
 }
 
diff --git a/internal/mock_namesrv.go b/internal/mock_namesrv.go
index 365e784..0a983bd 100644
--- a/internal/mock_namesrv.go
+++ b/internal/mock_namesrv.go
@@ -87,11 +87,11 @@ func (mr *MockNamesrvsMockRecorder) 
UpdateNameServerAddress(nameServer, instance
 }
 
 // UpdateTopicRouteInfo mocks base method
-func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) *TopicRouteData {
+func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) (routeData 
*TopicRouteData, changed bool) {
        m.ctrl.T.Helper()
        ret := m.ctrl.Call(m, "UpdateTopicRouteInfo", topic)
        ret0, _ := ret[0].(*TopicRouteData)
-       return ret0
+       return ret0, changed
 }
 
 // UpdateTopicRouteInfo indicates an expected call of UpdateTopicRouteInfo
diff --git a/internal/namesrv.go b/internal/namesrv.go
index 3c9cf12..43c5d5d 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -54,7 +54,7 @@ type Namesrvs interface {
 
        cleanOfflineBroker()
 
-       UpdateTopicRouteInfo(topic string) *TopicRouteData
+       UpdateTopicRouteInfo(topic string) (routeData *TopicRouteData, changed 
bool)
 
        FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, 
error)
 
diff --git a/internal/route.go b/internal/route.go
index 5c7b114..327706f 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -112,7 +112,7 @@ func (info *TopicPublishInfo) fetchQueueIndex() int {
        return int(qIndex) % length
 }
 
-func (s *namesrvs) UpdateTopicRouteInfo(topic string) *TopicRouteData {
+func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool) {
        // Todo process lock timeout
        s.lockNamesrv.Lock()
        defer s.lockNamesrv.Unlock()
@@ -124,7 +124,7 @@ func (s *namesrvs) UpdateTopicRouteInfo(topic string) 
*TopicRouteData {
                        rlog.Warning("query topic route from server error", 
map[string]interface{}{
                                rlog.LogKeyUnderlayError: err,
                        })
-                       return nil
+                       return nil, false
                }
        }
 
@@ -132,10 +132,11 @@ func (s *namesrvs) UpdateTopicRouteInfo(topic string) 
*TopicRouteData {
                rlog.Warning("queryTopicRouteInfoFromServer return nil", 
map[string]interface{}{
                        rlog.LogKeyTopic: topic,
                })
-               return nil
+               return nil, false
        }
 
        oldRouteData, exist := s.routeDataMap.Load(topic)
+
        changed := true
        if exist {
                changed = 
s.topicRouteDataIsChange(oldRouteData.(*TopicRouteData), routeData)
@@ -153,7 +154,7 @@ func (s *namesrvs) UpdateTopicRouteInfo(topic string) 
*TopicRouteData {
                }
        }
 
-       return routeData.clone()
+       return routeData.clone(), changed
 }
 
 func (s *namesrvs) AddBroker(routeData *TopicRouteData) {
diff --git a/producer/producer.go b/producer/producer.go
index 013d3aa..05bcb15 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -340,7 +340,8 @@ func (p *defaultProducer) selectMessageQueue(msg 
*primitive.Message) *primitive.
 
        v, exist := p.publishInfo.Load(topic)
        if !exist {
-               p.client.UpdatePublishInfo(topic, 
p.options.Namesrv.UpdateTopicRouteInfo(topic))
+               data, changed := p.options.Namesrv.UpdateTopicRouteInfo(topic)
+               p.client.UpdatePublishInfo(topic, data, changed)
                v, exist = p.publishInfo.Load(topic)
        }
 

Reply via email to