This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 181eb30  fix the bug caused in HA cluster when master broker node is 
down (#714)
181eb30 is described below

commit 181eb30879c7c037b515d4b6c7a5e7a9b4eaefef
Author: guyinyou <[email protected]>
AuthorDate: Wed Aug 18 10:58:40 2021 +0800

    fix the bug caused in HA cluster when master broker node is down (#714)
---
 consumer/push_consumer.go |  6 +++++-
 internal/client.go        |  3 +++
 internal/namesrv.go       | 15 +++++++++++++++
 internal/route.go         | 28 ++++++++++++++++++++++++++++
 producer/producer.go      |  5 ++++-
 5 files changed, 55 insertions(+), 2 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index c84ce84..e3e0d32 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -99,8 +99,12 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
                consumeOrderly: defaultOpts.ConsumeOrderly,
                fromWhere:      defaultOpts.FromWhere,
                allocate:       defaultOpts.Strategy,
-               option:         defaultOpts,
                namesrv:        srvs,
+               option:         defaultOpts,
+       }
+       dc.option.ClientOptions.Namesrv, err = 
internal.GetNamesrv(dc.client.ClientID())
+       if err != nil {
+               return nil, err
        }
 
        p := &pushConsumer{
diff --git a/internal/client.go b/internal/client.go
index 3a09ea8..4de3b93 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -186,6 +186,9 @@ func GetOrNewRocketMQClient(option ClientOptions, 
callbackCh chan interface{}) R
                done:         make(chan struct{}),
        }
        actual, loaded := clientMap.LoadOrStore(client.ClientID(), client)
+       client.namesrvs = GetOrSetNamesrv(client.ClientID(), client.namesrvs)
+       client.namesrvs.bundleClient = actual.(*rmqClient)
+       client.option.Namesrv = client.namesrvs
        if !loaded {
                
client.remoteClient.RegisterRequestFunc(ReqNotifyConsumerIdsChanged, func(req 
*remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
                        rlog.Info("receive broker's notification to consumer 
group", map[string]interface{}{
diff --git a/internal/namesrv.go b/internal/namesrv.go
index a47bbc1..7776651 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -19,6 +19,7 @@ package internal
 
 import (
        "errors"
+       "fmt"
        "regexp"
        "strings"
        "sync"
@@ -76,6 +77,8 @@ type namesrvs struct {
        // brokerName -> *BrokerData
        brokerAddressesMap sync.Map
 
+       bundleClient *rmqClient
+
        // brokerName -> map[string]int32: brokerAddr -> version
        brokerVersionMap map[string]map[string]int32
        // lock for broker version read/write
@@ -92,9 +95,21 @@ type namesrvs struct {
 }
 
 var _ Namesrvs = (*namesrvs)(nil)
+var namesrvMap sync.Map
 
 // NewNamesrv init Namesrv from namesrv addr string.
 // addr primitive.NamesrvAddr
+func GetOrSetNamesrv(clientId string, namesrv *namesrvs) *namesrvs {
+       actual, _ := namesrvMap.LoadOrStore(clientId, namesrv)
+       return actual.(*namesrvs)
+}
+func GetNamesrv(clientId string) (*namesrvs, error) {
+       actual, ok := namesrvMap.Load(clientId)
+       if !ok {
+               return nil, fmt.Errorf("the namesrv in instanceName [%s] not 
found", clientId)
+       }
+       return actual.(*namesrvs), nil
+}
 func NewNamesrv(resolver primitive.NsResolver) (*namesrvs, error) {
        addr := resolver.Resolve()
        if len(addr) == 0 {
diff --git a/internal/route.go b/internal/route.go
index 66be96d..3676cb4 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -161,6 +161,34 @@ func (s *namesrvs) UpdateTopicRouteInfoWithDefault(topic 
string, defaultTopic st
        }
 
        if changed {
+               if s.bundleClient != nil {
+                       s.bundleClient.producerMap.Range(func(key, value 
interface{}) bool {
+                               p := value.(InnerProducer)
+                               updated := changed
+                               if !updated {
+                                       updated = 
p.IsPublishTopicNeedUpdate(topic)
+                               }
+                               if updated {
+                                       publishInfo := 
s.bundleClient.namesrvs.routeData2PublishInfo(topic, routeData)
+                                       publishInfo.HaveTopicRouterInfo = true
+                                       p.UpdateTopicPublishInfo(topic, 
publishInfo)
+                               }
+                               return true
+                       })
+                       s.bundleClient.consumerMap.Range(func(key, value 
interface{}) bool {
+                               consumer := value.(InnerConsumer)
+                               updated := changed
+                               if !updated {
+                                       updated = 
consumer.IsSubscribeTopicNeedUpdate(topic)
+                               }
+                               if updated {
+                                       
consumer.UpdateTopicSubscribeInfo(topic, routeData2SubscribeInfo(topic, 
routeData))
+                               }
+
+                               return true
+                       })
+               }
+
                s.routeDataMap.Store(topic, routeData)
                rlog.Info("the topic route info changed", 
map[string]interface{}{
                        rlog.LogKeyTopic:            topic,
diff --git a/producer/producer.go b/producer/producer.go
index 8ebb660..ef4ea25 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -72,7 +72,10 @@ func NewDefaultProducer(opts ...Option) (*defaultProducer, 
error) {
                options:    defaultOpts,
        }
        producer.client = 
internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, producer.callbackCh)
-
+       producer.options.ClientOptions.Namesrv, err = 
internal.GetNamesrv(producer.client.ClientID())
+       if err != nil {
+               return nil, err
+       }
        producer.interceptor = 
primitive.ChainInterceptors(producer.options.Interceptors...)
 
        return producer, nil

Reply via email to