This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 97a7a3727 [INLONG-5314][TubeMQ] Reconnect to another address if 
heartbeat encounters exception (#5326)
97a7a3727 is described below

commit 97a7a37279fe016bf970de1b597aa8db7c12520f
Author: Zijie Lu <[email protected]>
AuthorDate: Thu Aug 4 09:37:42 2022 +0800

    [INLONG-5314][TubeMQ] Reconnect to another address if heartbeat encounters 
exception (#5326)
---
 .../tubemq-client-go/client/consumer_impl.go       |  1 +
 .../tubemq-client-go/client/heartbeat.go           | 77 ++++++++++++----------
 .../tubemq-client-go/client/version.go             |  2 +-
 .../tubemq-client-go/log/config.go                 |  2 +-
 .../tubemq-client-go/rpc/client.go                 |  3 +
 .../tubemq-client-go/selector/ip_selector.go       | 55 ++++++++--------
 .../tubemq-client-go/selector/selector.go          |  4 +-
 7 files changed, 77 insertions(+), 67 deletions(-)

diff --git 
a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go 
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 8fad3cf29..6cc017483 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -122,6 +122,7 @@ func NewConsumer(config *config.Config) (Consumer, error) {
 
 func (c *consumer) register2Master(needChange bool) error {
        if needChange {
+               c.selector.Refresh(c.config.Consumer.Masters)
                node, err := c.selector.Select(c.config.Consumer.Masters)
                if err != nil {
                        return err
diff --git 
a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/heartbeat.go 
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index 094222087..7bd9bfa82 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -52,27 +52,43 @@ func newHBManager(consumer *consumer) *heartbeatManager {
 func (h *heartbeatManager) registerMaster(address string) {
        h.mu.Lock()
        defer h.mu.Unlock()
-       if _, ok := h.heartbeats[address]; !ok {
+       hm, ok := h.heartbeats[address]
+       log.Infof("register master heartbeat address:%v, heartbeat:%+v", 
address, hm)
+       if !ok {
                h.heartbeats[address] = &heartbeatMetadata{
                        numConnections: 1,
                        timer:          
time.AfterFunc(h.consumer.config.Heartbeat.Interval/2, h.consumerHB2Master),
                }
+               return
        }
-       hm := h.heartbeats[address]
        hm.numConnections++
 }
 
-func (h *heartbeatManager) registerBroker(broker *metadata.Node) {
+// deleteHeartbeat delete heartbeat of the given address.
+func (h *heartbeatManager) deleteHeartbeat(address string) {
        h.mu.Lock()
        defer h.mu.Unlock()
+       hm, ok := h.heartbeats[address]
+       if !ok {
+               return
+       }
+       hm.numConnections--
+       if hm.numConnections <= 0 {
+               delete(h.heartbeats, address)
+       }
+}
 
-       if _, ok := h.heartbeats[broker.GetAddress()]; !ok {
+func (h *heartbeatManager) registerBroker(broker *metadata.Node) {
+       h.mu.Lock()
+       defer h.mu.Unlock()
+       hm, ok := h.heartbeats[broker.GetAddress()]
+       if !ok {
                h.heartbeats[broker.GetAddress()] = &heartbeatMetadata{
                        numConnections: 1,
                        timer:          
time.AfterFunc(h.consumer.config.Heartbeat.Interval, func() { 
h.consumerHB2Broker(broker) }),
                }
+               return
        }
-       hm := h.heartbeats[broker.GetAddress()]
        hm.numConnections++
 }
 
@@ -99,40 +115,31 @@ func (h *heartbeatManager) consumerHB2Master() {
        }
 
        rsp, err := h.sendHeartbeatC2M(m)
-       if err != nil {
-               log.Errorf("consumer hb err %s", err.Error())
-               h.consumer.masterHBRetry++
-       } else {
-               if !rsp.GetSuccess() {
-                       h.consumer.masterHBRetry++
-                       if rsp.GetErrCode() == errs.RetErrHBNoNode || 
strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 {
-                               log.Warnf("[CONSUMER] hb2master found no-node 
or standby, re-register, client=%s", h.consumer.clientID)
-                               address := h.consumer.master.Address
-                               go func() {
-                                       err := 
h.consumer.register2Master(rsp.GetErrCode() != errs.RetErrHBNoNode)
-                                       if err != nil {
-                                               return
-                                       }
-                                       h.resetMasterHeartbeat()
-                               }()
-                               if rsp.GetErrCode() != errs.RetErrHBNoNode {
-                                       h.mu.Lock()
-                                       defer h.mu.Unlock()
-                                       hm := h.heartbeats[address]
-                                       hm.numConnections--
-                                       if hm.numConnections == 0 {
-                                               delete(h.heartbeats, address)
-                                       }
-                                       return
-                               }
-                               log.Warnf("[CONSUMER] heartBeat2Master failure 
to (%s) : %s, client=%s", h.consumer.master.Address, rsp.GetErrMsg(), 
h.consumer.clientID)
-                               return
-                       }
-               }
+       if err == nil {
                h.consumer.masterHBRetry = 0
                h.processHBResponseM2C(rsp)
+               h.resetMasterHeartbeat()
+               return
        }
+       h.consumer.masterHBRetry++
        h.resetMasterHeartbeat()
+       hbNoNode := rsp != nil && rsp.GetErrCode() == errs.RetErrHBNoNode
+       standByException := false
+       if e, ok := err.(*errs.Error); ok {
+               standByException = strings.Index(e.Msg, "StandbyException") != 
-1
+       }
+       if (h.consumer.masterHBRetry >= 
h.consumer.config.Heartbeat.MaxRetryTimes) || standByException || hbNoNode {
+               h.deleteHeartbeat(h.consumer.master.Address)
+               go func() {
+                       err := h.consumer.register2Master(!hbNoNode)
+                       if err != nil {
+                               log.Warnf("[CONSUMER] heartBeat2Master failure 
to (%s) : %s, client=%s", h.consumer.master.Address, rsp.GetErrMsg(), 
h.consumer.clientID)
+                               return
+                       }
+                       h.registerMaster(h.consumer.master.Address)
+                       log.Infof("[CONSUMER] heartBeat2Master success to (%s), 
client=%s", h.consumer.master.Address, h.consumer.clientID)
+               }()
+       }
 }
 
 func (h *heartbeatManager) resetMasterHeartbeat() {
diff --git 
a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/version.go 
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/version.go
index a9c996c12..fa56745bb 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/version.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/version.go
@@ -18,5 +18,5 @@
 package client
 
 const (
-       tubeMQClientVersion = "0.1.1"
+       tubeMQClientVersion = "0.1.2"
 )
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log/config.go 
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log/config.go
index 0c3ea36d6..fee8f4a75 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log/config.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log/config.go
@@ -37,5 +37,5 @@ var defaultConfig = &OutputConfig{
        MaxSize:    100,
        MaxBackups: 5,
        MaxAge:     3,
-       Level:      "error",
+       Level:      "warn",
 }
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc/client.go 
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc/client.go
index 1e2c4df6f..88ef9eb61 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc/client.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc/client.go
@@ -99,5 +99,8 @@ func (c *rpcClient) doRequest(ctx context.Context, address 
string,
        }
 
        v := rsp.(*codec.TubeMQRPCResponse)
+       if v.ResponseException != nil {
+               return nil, errs.New(errs.RetResponseException, 
v.ResponseException.String())
+       }
        return v.ResponseBody, nil
 }
diff --git 
a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go 
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
index c2dd8c100..b1193645e 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
@@ -20,6 +20,7 @@ package selector
 import (
        "errors"
        "strings"
+       "sync"
 )
 
 func init() {
@@ -30,6 +31,7 @@ func init() {
 }
 
 type ipSelector struct {
+       mu       sync.Mutex
        services map[string]*ipServices
 }
 
@@ -42,42 +44,37 @@ type ipServices struct {
 // Select will return the address in the serviceName sequentially.
 // The first address will be returned after reaching the end of the addresses.
 func (s *ipSelector) Select(serviceName string) (*Node, error) {
-       if len(serviceName) == 0 {
+       if serviceName == "" {
                return nil, errors.New("serviceName empty")
        }
-
-       num := strings.Count(serviceName, ",") + 1
-       if num == 1 {
-               return &Node{
-                       ServiceName: serviceName,
-                       Address:     serviceName,
-                       HasNext:     false,
-               }, nil
-       }
-
-       var addresses []string
-       nextIndex := 0
-       if _, ok := s.services[serviceName]; !ok {
-               addresses = strings.Split(serviceName, ",")
-       } else {
-               services := s.services[serviceName]
-               addresses = services.addresses
-               nextIndex = services.nextIndex
-       }
-
-       address := addresses[nextIndex]
-       nextIndex = (nextIndex + 1) % num
-       s.services[serviceName] = &ipServices{
-               addresses: addresses,
-               nextIndex: nextIndex,
+       s.mu.Lock()
+       defer s.mu.Unlock()
+       services, ok := s.services[serviceName]
+       if !ok {
+               services = &ipServices{
+                       addresses: strings.Split(serviceName, ","),
+                       nextIndex: 0,
+               }
+               s.services[serviceName] = services
        }
-
        node := &Node{
                ServiceName: serviceName,
-               Address:     address,
+               Address:     services.addresses[services.nextIndex],
        }
-       if nextIndex > 0 {
+       services.nextIndex = (services.nextIndex + 1) % len(services.addresses)
+       if services.nextIndex > 0 {
                node.HasNext = true
        }
        return node, nil
 }
+
+// Refresh will refresh a service address cache data.
+func (s *ipSelector) Refresh(serviceName string) {
+       s.mu.Lock()
+       defer s.mu.Unlock()
+       services, ok := s.services[serviceName]
+       if !ok {
+               return
+       }
+       services.nextIndex = 0
+}
diff --git 
a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/selector.go 
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/selector.go
index 51d2716da..63c503c01 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/selector.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/selector.go
@@ -29,6 +29,8 @@ import (
 type Selector interface {
        // Select will return a service node which contains an available 
address.
        Select(serviceName string) (*Node, error)
+       // Refresh will refresh a service address cache data.
+       Refresh(serviceName string)
 }
 
 var (
@@ -54,6 +56,6 @@ type Node struct {
        ServiceName string
        // Address of the node.
        Address string
-       // HasNext indicates whether or not the service has next node.
+       // HasNext indicates whether the service has next node or not.
        HasNext bool
 }

Reply via email to