This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 97a7a3727 [INLONG-5314][TubeMQ] Reconnect to another address if
heartbeat encounters exception (#5326)
97a7a3727 is described below
commit 97a7a37279fe016bf970de1b597aa8db7c12520f
Author: Zijie Lu <[email protected]>
AuthorDate: Thu Aug 4 09:37:42 2022 +0800
[INLONG-5314][TubeMQ] Reconnect to another address if heartbeat encounters
exception (#5326)
---
.../tubemq-client-go/client/consumer_impl.go | 1 +
.../tubemq-client-go/client/heartbeat.go | 77 ++++++++++++----------
.../tubemq-client-go/client/version.go | 2 +-
.../tubemq-client-go/log/config.go | 2 +-
.../tubemq-client-go/rpc/client.go | 3 +
.../tubemq-client-go/selector/ip_selector.go | 55 ++++++++--------
.../tubemq-client-go/selector/selector.go | 4 +-
7 files changed, 77 insertions(+), 67 deletions(-)
diff --git
a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 8fad3cf29..6cc017483 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -122,6 +122,7 @@ func NewConsumer(config *config.Config) (Consumer, error) {
func (c *consumer) register2Master(needChange bool) error {
if needChange {
+ c.selector.Refresh(c.config.Consumer.Masters)
node, err := c.selector.Select(c.config.Consumer.Masters)
if err != nil {
return err
diff --git
a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index 094222087..7bd9bfa82 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -52,27 +52,43 @@ func newHBManager(consumer *consumer) *heartbeatManager {
func (h *heartbeatManager) registerMaster(address string) {
h.mu.Lock()
defer h.mu.Unlock()
- if _, ok := h.heartbeats[address]; !ok {
+ hm, ok := h.heartbeats[address]
+ log.Infof("register master heartbeat address:%v, heartbeat:%+v",
address, hm)
+ if !ok {
h.heartbeats[address] = &heartbeatMetadata{
numConnections: 1,
timer:
time.AfterFunc(h.consumer.config.Heartbeat.Interval/2, h.consumerHB2Master),
}
+ return
}
- hm := h.heartbeats[address]
hm.numConnections++
}
-func (h *heartbeatManager) registerBroker(broker *metadata.Node) {
+// deleteHeartbeat delete heartbeat of the given address.
+func (h *heartbeatManager) deleteHeartbeat(address string) {
h.mu.Lock()
defer h.mu.Unlock()
+ hm, ok := h.heartbeats[address]
+ if !ok {
+ return
+ }
+ hm.numConnections--
+ if hm.numConnections <= 0 {
+ delete(h.heartbeats, address)
+ }
+}
- if _, ok := h.heartbeats[broker.GetAddress()]; !ok {
+func (h *heartbeatManager) registerBroker(broker *metadata.Node) {
+ h.mu.Lock()
+ defer h.mu.Unlock()
+ hm, ok := h.heartbeats[broker.GetAddress()]
+ if !ok {
h.heartbeats[broker.GetAddress()] = &heartbeatMetadata{
numConnections: 1,
timer:
time.AfterFunc(h.consumer.config.Heartbeat.Interval, func() {
h.consumerHB2Broker(broker) }),
}
+ return
}
- hm := h.heartbeats[broker.GetAddress()]
hm.numConnections++
}
@@ -99,40 +115,31 @@ func (h *heartbeatManager) consumerHB2Master() {
}
rsp, err := h.sendHeartbeatC2M(m)
- if err != nil {
- log.Errorf("consumer hb err %s", err.Error())
- h.consumer.masterHBRetry++
- } else {
- if !rsp.GetSuccess() {
- h.consumer.masterHBRetry++
- if rsp.GetErrCode() == errs.RetErrHBNoNode ||
strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 {
- log.Warnf("[CONSUMER] hb2master found no-node
or standby, re-register, client=%s", h.consumer.clientID)
- address := h.consumer.master.Address
- go func() {
- err :=
h.consumer.register2Master(rsp.GetErrCode() != errs.RetErrHBNoNode)
- if err != nil {
- return
- }
- h.resetMasterHeartbeat()
- }()
- if rsp.GetErrCode() != errs.RetErrHBNoNode {
- h.mu.Lock()
- defer h.mu.Unlock()
- hm := h.heartbeats[address]
- hm.numConnections--
- if hm.numConnections == 0 {
- delete(h.heartbeats, address)
- }
- return
- }
- log.Warnf("[CONSUMER] heartBeat2Master failure
to (%s) : %s, client=%s", h.consumer.master.Address, rsp.GetErrMsg(),
h.consumer.clientID)
- return
- }
- }
+ if err == nil {
h.consumer.masterHBRetry = 0
h.processHBResponseM2C(rsp)
+ h.resetMasterHeartbeat()
+ return
}
+ h.consumer.masterHBRetry++
h.resetMasterHeartbeat()
+ hbNoNode := rsp != nil && rsp.GetErrCode() == errs.RetErrHBNoNode
+ standByException := false
+ if e, ok := err.(*errs.Error); ok {
+ standByException = strings.Index(e.Msg, "StandbyException") !=
-1
+ }
+ if (h.consumer.masterHBRetry >=
h.consumer.config.Heartbeat.MaxRetryTimes) || standByException || hbNoNode {
+ h.deleteHeartbeat(h.consumer.master.Address)
+ go func() {
+ err := h.consumer.register2Master(!hbNoNode)
+ if err != nil {
+ log.Warnf("[CONSUMER] heartBeat2Master failure
to (%s) : %s, client=%s", h.consumer.master.Address, rsp.GetErrMsg(),
h.consumer.clientID)
+ return
+ }
+ h.registerMaster(h.consumer.master.Address)
+ log.Infof("[CONSUMER] heartBeat2Master success to (%s),
client=%s", h.consumer.master.Address, h.consumer.clientID)
+ }()
+ }
}
func (h *heartbeatManager) resetMasterHeartbeat() {
diff --git
a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/version.go
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/version.go
index a9c996c12..fa56745bb 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/version.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/version.go
@@ -18,5 +18,5 @@
package client
const (
- tubeMQClientVersion = "0.1.1"
+ tubeMQClientVersion = "0.1.2"
)
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log/config.go
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log/config.go
index 0c3ea36d6..fee8f4a75 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log/config.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log/config.go
@@ -37,5 +37,5 @@ var defaultConfig = &OutputConfig{
MaxSize: 100,
MaxBackups: 5,
MaxAge: 3,
- Level: "error",
+ Level: "warn",
}
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc/client.go
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc/client.go
index 1e2c4df6f..88ef9eb61 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc/client.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc/client.go
@@ -99,5 +99,8 @@ func (c *rpcClient) doRequest(ctx context.Context, address
string,
}
v := rsp.(*codec.TubeMQRPCResponse)
+ if v.ResponseException != nil {
+ return nil, errs.New(errs.RetResponseException,
v.ResponseException.String())
+ }
return v.ResponseBody, nil
}
diff --git
a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
index c2dd8c100..b1193645e 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
@@ -20,6 +20,7 @@ package selector
import (
"errors"
"strings"
+ "sync"
)
func init() {
@@ -30,6 +31,7 @@ func init() {
}
type ipSelector struct {
+ mu sync.Mutex
services map[string]*ipServices
}
@@ -42,42 +44,37 @@ type ipServices struct {
// Select will return the address in the serviceName sequentially.
// The first address will be returned after reaching the end of the addresses.
func (s *ipSelector) Select(serviceName string) (*Node, error) {
- if len(serviceName) == 0 {
+ if serviceName == "" {
return nil, errors.New("serviceName empty")
}
-
- num := strings.Count(serviceName, ",") + 1
- if num == 1 {
- return &Node{
- ServiceName: serviceName,
- Address: serviceName,
- HasNext: false,
- }, nil
- }
-
- var addresses []string
- nextIndex := 0
- if _, ok := s.services[serviceName]; !ok {
- addresses = strings.Split(serviceName, ",")
- } else {
- services := s.services[serviceName]
- addresses = services.addresses
- nextIndex = services.nextIndex
- }
-
- address := addresses[nextIndex]
- nextIndex = (nextIndex + 1) % num
- s.services[serviceName] = &ipServices{
- addresses: addresses,
- nextIndex: nextIndex,
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ services, ok := s.services[serviceName]
+ if !ok {
+ services = &ipServices{
+ addresses: strings.Split(serviceName, ","),
+ nextIndex: 0,
+ }
+ s.services[serviceName] = services
}
-
node := &Node{
ServiceName: serviceName,
- Address: address,
+ Address: services.addresses[services.nextIndex],
}
- if nextIndex > 0 {
+ services.nextIndex = (services.nextIndex + 1) % len(services.addresses)
+ if services.nextIndex > 0 {
node.HasNext = true
}
return node, nil
}
+
+// Refresh will refresh a service address cache data.
+func (s *ipSelector) Refresh(serviceName string) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ services, ok := s.services[serviceName]
+ if !ok {
+ return
+ }
+ services.nextIndex = 0
+}
diff --git
a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/selector.go
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/selector.go
index 51d2716da..63c503c01 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/selector.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/selector.go
@@ -29,6 +29,8 @@ import (
type Selector interface {
// Select will return a service node which contains an available
address.
Select(serviceName string) (*Node, error)
+ // Refresh will refresh a service address cache data.
+ Refresh(serviceName string)
}
var (
@@ -54,6 +56,6 @@ type Node struct {
ServiceName string
// Address of the node.
Address string
- // HasNext indicates whether or not the service has next node.
+ // HasNext indicates whether the service has next node or not.
HasNext bool
}