This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 76dd050  [ISSUE #631] Support Consuming from Slave
76dd050 is described below

commit 76dd050fbbd06d59c5fb3e8fe5ab187521167436
Author: 张旭 <[email protected]>
AuthorDate: Thu Jul 1 21:14:40 2021 +0800

    [ISSUE #631] Support Consuming from Slave
    
    Co-authored-by: zhangxu16 <[email protected]>
---
 internal/route.go      | 48 ++++++++++++++++++++++++++++++++-------------
 internal/route_test.go | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 87 insertions(+), 14 deletions(-)

diff --git a/internal/route.go b/internal/route.go
index 09b6e53..7b27c9d 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -219,10 +219,12 @@ func (s *namesrvs) FindBrokerAddrByName(brokerName 
string) string {
 func (s *namesrvs) FindBrokerAddressInSubscribe(brokerName string, brokerId 
int64, onlyThisBroker bool) *FindBrokerResult {
        var (
                brokerAddr = ""
-               //slave      = false
-               //found      = false
+               slave      = false
+               found      = false
        )
 
+       rlog.Debug("broker id "+strconv.FormatInt(brokerId, 10), nil)
+
        v, exist := s.brokerAddressesMap.Load(brokerName)
 
        if !exist {
@@ -234,22 +236,40 @@ func (s *namesrvs) 
FindBrokerAddressInSubscribe(brokerName string, brokerId int6
        }
 
        brokerAddr = data.BrokerAddresses[brokerId]
-       //for k, v := range data.BrokerAddresses {
-       //      if v != "" {
-       //              found = true
-       //              if k != MasterId {
-       //                      slave = true
-       //              }
-       //              brokerAddr = v
-       //              break
-       //      }
-       //}
+       slave = brokerId != MasterId
+       if brokerAddr != "" {
+               found = true
+       }
+
+       // not found && read from slave, try again use next brokerId
+       if !found && slave {
+               rlog.Debug("Not found broker addr and slave 
"+strconv.FormatBool(slave), nil)
+               brokerAddr = data.BrokerAddresses[brokerId+1]
+               found = brokerAddr != ""
+       }
+
+       // still not found && cloud use other broker addr, find anyone in 
BrokerAddresses
+       if !found && !onlyThisBroker {
+               rlog.Debug("STILL Not found broker addr", nil)
+               for k, v := range data.BrokerAddresses {
+                       if v != "" {
+                               brokerAddr = v
+                               found = true
+                               slave = k != MasterId
+                               break
+                       }
+               }
+       }
+
+       if found {
+               rlog.Debug("Find broker addr "+brokerAddr, nil)
+       }
 
        var result *FindBrokerResult
-       if brokerAddr != "" {
+       if found {
                result = &FindBrokerResult{
                        BrokerAddr:    brokerAddr,
-                       Slave:         brokerId != 0,
+                       Slave:         slave,
                        BrokerVersion: s.findBrokerVersion(brokerName, 
brokerAddr),
                }
        }
diff --git a/internal/route_test.go b/internal/route_test.go
index c9b65f0..ded7780 100644
--- a/internal/route_test.go
+++ b/internal/route_test.go
@@ -82,3 +82,56 @@ func TestAddBrokerVersion(t *testing.T) {
        v = s.findBrokerVersion("b1", "addr2")
        assert.Equal(t, v, int32(0))
 }
+
+func TestFindBrokerAddressInSubscribe(t *testing.T) {
+       s := &namesrvs{}
+       s.brokerVersionMap = make(map[string]map[string]int32, 0)
+       s.brokerLock = new(sync.RWMutex)
+
+       brokerDataRaft1 := &BrokerData{
+               Cluster:    "cluster",
+               BrokerName: "raft01",
+               BrokerAddresses: map[int64]string{
+                       0: "127.0.0.1:10911",
+                       1: "127.0.0.1:10912",
+                       2: "127.0.0.1:10913",
+               },
+       }
+       s.brokerAddressesMap.Store(brokerDataRaft1.BrokerName, brokerDataRaft1)
+       brokerDataRaft2 := &BrokerData{
+               Cluster:    "cluster",
+               BrokerName: "raft02",
+               BrokerAddresses: map[int64]string{
+                       0: "127.0.0.1:10911",
+                       2: "127.0.0.1:10912",
+                       3: "127.0.0.1:10913",
+               },
+       }
+       s.brokerAddressesMap.Store(brokerDataRaft2.BrokerName, brokerDataRaft2)
+
+       Convey("Request master broker", t, func() {
+               result := 
s.FindBrokerAddressInSubscribe(brokerDataRaft1.BrokerName, 0, false)
+               assert.NotNil(t, result)
+               assert.Equal(t, result.BrokerAddr, 
brokerDataRaft1.BrokerAddresses[0])
+               assert.Equal(t, result.Slave, false)
+       })
+
+       Convey("Request slave broker from normal broker group", t, func() {
+               result := 
s.FindBrokerAddressInSubscribe(brokerDataRaft1.BrokerName, 1, false)
+               assert.NotNil(t, result)
+               assert.Equal(t, result.BrokerAddr, 
brokerDataRaft1.BrokerAddresses[1])
+               assert.Equal(t, result.Slave, true)
+       })
+
+       Convey("Request slave broker from non normal broker group", t, func() {
+               result := 
s.FindBrokerAddressInSubscribe(brokerDataRaft2.BrokerName, 1, false)
+               assert.NotNil(t, result)
+               assert.Equal(t, result.BrokerAddr, 
brokerDataRaft2.BrokerAddresses[2])
+               assert.Equal(t, result.Slave, true)
+       })
+
+       Convey("Request not exist broker", t, func() {
+               result := 
s.FindBrokerAddressInSubscribe(brokerDataRaft1.BrokerName, 4, false)
+               assert.NotNil(t, result)
+       })
+}

Reply via email to