This is an automated email from the ASF dual-hosted git repository.
wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new be743f7 fix(route): update route (#339)
be743f7 is described below
commit be743f70415addae8f113861000bc1e079a1d4bd
Author: xujianhai666 <[email protected]>
AuthorDate: Tue Dec 24 10:41:20 2019 +0800
fix(route): update route (#339)
- fix producer publish update logic
- fix consumer subcription update logic
Closes #338
---
internal/client.go | 47 +++++++++++++++++++++++------------------------
internal/mock_client.go | 2 +-
internal/mock_namesrv.go | 4 ++--
internal/namesrv.go | 2 +-
internal/route.go | 9 +++++----
producer/producer.go | 3 ++-
6 files changed, 34 insertions(+), 33 deletions(-)
diff --git a/internal/client.go b/internal/client.go
index b1ad3a6..c41417d 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -148,7 +148,7 @@ type RMQClient interface {
PullMessage(ctx context.Context, brokerAddrs string, request
*PullMessageRequestHeader) (*primitive.PullResult, error)
PullMessageAsync(ctx context.Context, brokerAddrs string, request
*PullMessageRequestHeader, f func(result *primitive.PullResult)) error
RebalanceImmediately()
- UpdatePublishInfo(topic string, data *TopicRouteData)
+ UpdatePublishInfo(topic string, data *TopicRouteData, changed bool)
}
var _ RMQClient = new(rmqClient)
@@ -475,7 +475,8 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
return true
})
for topic := range publishTopicSet {
- c.UpdatePublishInfo(topic,
c.namesrvs.UpdateTopicRouteInfo(topic))
+ data, changed := c.namesrvs.UpdateTopicRouteInfo(topic)
+ c.UpdatePublishInfo(topic, data, changed)
}
subscribedTopicSet := make(map[string]bool, 0)
@@ -489,7 +490,8 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
})
for topic := range subscribedTopicSet {
- c.updateSubscribeInfo(topic,
c.namesrvs.UpdateTopicRouteInfo(topic))
+ data, changed := c.namesrvs.UpdateTopicRouteInfo(topic)
+ c.updateSubscribeInfo(topic, data, changed)
}
}
@@ -639,36 +641,27 @@ func (c *rmqClient) RebalanceImmediately() {
})
}
-func (c *rmqClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
+func (c *rmqClient) UpdatePublishInfo(topic string, data *TopicRouteData,
changed bool) {
if data == nil {
return
}
- if !c.isNeedUpdatePublishInfo(topic) {
- return
- }
- c.producerMap.Range(func(key, value interface{}) bool {
- p := value.(InnerProducer)
- publishInfo := c.namesrvs.routeData2PublishInfo(topic, data)
- publishInfo.HaveTopicRouterInfo = true
- p.UpdateTopicPublishInfo(topic, publishInfo)
- return true
- })
-}
-func (c *rmqClient) isNeedUpdatePublishInfo(topic string) bool {
- var result bool
c.producerMap.Range(func(key, value interface{}) bool {
p := value.(InnerProducer)
- if p.IsPublishTopicNeedUpdate(topic) {
- result = true
- return false
+ updated := changed
+ if !updated {
+ updated = p.IsPublishTopicNeedUpdate(topic)
+ }
+ if updated {
+ publishInfo := c.namesrvs.routeData2PublishInfo(topic,
data)
+ publishInfo.HaveTopicRouterInfo = true
+ p.UpdateTopicPublishInfo(topic, publishInfo)
}
return true
})
- return result
}
-func (c *rmqClient) updateSubscribeInfo(topic string, data *TopicRouteData) {
+func (c *rmqClient) updateSubscribeInfo(topic string, data *TopicRouteData,
changed bool) {
if data == nil {
return
}
@@ -677,8 +670,14 @@ func (c *rmqClient) updateSubscribeInfo(topic string, data
*TopicRouteData) {
}
c.consumerMap.Range(func(key, value interface{}) bool {
consumer := value.(InnerConsumer)
- // TODO
- consumer.UpdateTopicSubscribeInfo(topic,
routeData2SubscribeInfo(topic, data))
+ updated := changed
+ if !updated {
+ updated = consumer.IsSubscribeTopicNeedUpdate(topic)
+ }
+ if updated {
+ consumer.UpdateTopicSubscribeInfo(topic,
routeData2SubscribeInfo(topic, data))
+ }
+
return true
})
}
diff --git a/internal/mock_client.go b/internal/mock_client.go
index 730c073..244cd0c 100644
--- a/internal/mock_client.go
+++ b/internal/mock_client.go
@@ -398,7 +398,7 @@ func (mr *MockRMQClientMockRecorder) RebalanceImmediately()
*gomock.Call {
}
// UpdatePublishInfo mocks base method
-func (m *MockRMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
+func (m *MockRMQClient) UpdatePublishInfo(topic string, data *TopicRouteData,
changed bool) {
m.ctrl.Call(m, "UpdatePublishInfo", topic, data)
}
diff --git a/internal/mock_namesrv.go b/internal/mock_namesrv.go
index 365e784..0a983bd 100644
--- a/internal/mock_namesrv.go
+++ b/internal/mock_namesrv.go
@@ -87,11 +87,11 @@ func (mr *MockNamesrvsMockRecorder)
UpdateNameServerAddress(nameServer, instance
}
// UpdateTopicRouteInfo mocks base method
-func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) *TopicRouteData {
+func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) (routeData
*TopicRouteData, changed bool) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateTopicRouteInfo", topic)
ret0, _ := ret[0].(*TopicRouteData)
- return ret0
+ return ret0, changed
}
// UpdateTopicRouteInfo indicates an expected call of UpdateTopicRouteInfo
diff --git a/internal/namesrv.go b/internal/namesrv.go
index 3c9cf12..43c5d5d 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -54,7 +54,7 @@ type Namesrvs interface {
cleanOfflineBroker()
- UpdateTopicRouteInfo(topic string) *TopicRouteData
+ UpdateTopicRouteInfo(topic string) (routeData *TopicRouteData, changed
bool)
FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue,
error)
diff --git a/internal/route.go b/internal/route.go
index 5c7b114..327706f 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -112,7 +112,7 @@ func (info *TopicPublishInfo) fetchQueueIndex() int {
return int(qIndex) % length
}
-func (s *namesrvs) UpdateTopicRouteInfo(topic string) *TopicRouteData {
+func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool) {
// Todo process lock timeout
s.lockNamesrv.Lock()
defer s.lockNamesrv.Unlock()
@@ -124,7 +124,7 @@ func (s *namesrvs) UpdateTopicRouteInfo(topic string)
*TopicRouteData {
rlog.Warning("query topic route from server error",
map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
- return nil
+ return nil, false
}
}
@@ -132,10 +132,11 @@ func (s *namesrvs) UpdateTopicRouteInfo(topic string)
*TopicRouteData {
rlog.Warning("queryTopicRouteInfoFromServer return nil",
map[string]interface{}{
rlog.LogKeyTopic: topic,
})
- return nil
+ return nil, false
}
oldRouteData, exist := s.routeDataMap.Load(topic)
+
changed := true
if exist {
changed =
s.topicRouteDataIsChange(oldRouteData.(*TopicRouteData), routeData)
@@ -153,7 +154,7 @@ func (s *namesrvs) UpdateTopicRouteInfo(topic string)
*TopicRouteData {
}
}
- return routeData.clone()
+ return routeData.clone(), changed
}
func (s *namesrvs) AddBroker(routeData *TopicRouteData) {
diff --git a/producer/producer.go b/producer/producer.go
index 013d3aa..05bcb15 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -340,7 +340,8 @@ func (p *defaultProducer) selectMessageQueue(msg
*primitive.Message) *primitive.
v, exist := p.publishInfo.Load(topic)
if !exist {
- p.client.UpdatePublishInfo(topic,
p.options.Namesrv.UpdateTopicRouteInfo(topic))
+ data, changed := p.options.Namesrv.UpdateTopicRouteInfo(topic)
+ p.client.UpdatePublishInfo(topic, data, changed)
v, exist = p.publishInfo.Load(topic)
}