This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 819e9474ac [INLONG-11583][TubeMQ] Go SDK load balance logic not
perfect, causing consumption to stop suddenly (#11582)
819e9474ac is described below
commit 819e9474acb16c52486fcd896e39dd1251d4f910
Author: leonwolf <[email protected]>
AuthorDate: Fri Dec 6 14:17:24 2024 +0800
[INLONG-11583][TubeMQ] Go SDK load balance logic not perfect, causing
consumption to stop suddenly (#11582)
1. prevent forever failing of reregistration due to ip of domain of master
changes
* [fix bug] OfferEventAndNotify should not change rebalanceResults
* [fix bug] make register and heartbeat to master more golang way. the old
way will stop working if re-register to master failed.
* adjust comment
---------
Co-authored-by: leonyue <[email protected]>
---
.../tubemq-client-go/client/consumer_impl.go | 239 +++++++++++++++------
.../tubemq-client-go/client/heartbeat.go | 98 +--------
.../tubemq-client-go/remote/remote.go | 21 +-
.../tubemq-client-go/rpc/master.go | 2 +
.../tubemq-client-go/selector/ip_selector.go | 16 +-
.../tubemq-client-go/selector/ip_selector_test.go | 37 ----
6 files changed, 203 insertions(+), 210 deletions(-)
diff --git
a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 6cc0174835..c78f2469b5 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -68,7 +68,8 @@ type consumer struct {
masterHBRetry int
heartbeatManager *heartbeatManager
unreportedTimes int
- done chan struct{}
+ cancel context.CancelFunc
+ routineClosed chan struct{}
closeOnce sync.Once
}
@@ -105,72 +106,92 @@ func NewConsumer(config *config.Config) (Consumer, error)
{
client: client,
visitToken: util.InvalidValue,
unreportedTimes: 0,
- done: make(chan struct{}),
+ routineClosed: make(chan struct{}),
}
+
+ ctx := context.Background()
+ ctx, c.cancel = context.WithCancel(ctx)
+
c.subInfo.SetClientID(clientID)
hbm := newHBManager(c)
c.heartbeatManager = hbm
- err = c.register2Master(true)
- if err != nil {
- return nil, err
- }
- c.heartbeatManager.registerMaster(c.master.Address)
- go c.processRebalanceEvent()
+
+ go c.routine(ctx)
+ go c.processRebalanceEvent(ctx)
+
log.Infof("[CONSUMER] start consumer success, client=%s", clientID)
return c, nil
}
-func (c *consumer) register2Master(needChange bool) error {
- if needChange {
- c.selector.Refresh(c.config.Consumer.Masters)
+func (c *consumer) routine(ctx context.Context) {
+ defer close(c.routineClosed)
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+ // select master
node, err := c.selector.Select(c.config.Consumer.Masters)
if err != nil {
- return err
+ log.Errorf("[CONSUMER] select error %v", err)
+ time.Sleep(time.Second)
+ continue
}
c.master = node
- }
- retryCount := 0
- for {
- rsp, err := c.sendRegRequest2Master()
- if err != nil || !rsp.GetSuccess() {
- if err != nil {
- log.Errorf("[CONSUMER]register2Master error
%s", err.Error())
- } else if rsp.GetErrCode() ==
errs.RetConsumeGroupForbidden ||
- rsp.GetErrCode() ==
errs.RetConsumeContentForbidden {
- log.Warnf("[CONSUMER] register2master(%s)
failure exist register, client=%s, error: %s",
- c.master.Address, c.clientID,
rsp.GetErrMsg())
- return errs.New(rsp.GetErrCode(),
rsp.GetErrMsg())
+ log.Infof("[CONSUMER] master %+v", c.master)
+ // register to master
+ if err := c.register2Master(ctx); err != nil {
+ log.Errorf("[CONSUMER] register2Master error %v", err)
+ time.Sleep(time.Second)
+ continue
+ }
+ c.lastMasterHb = time.Now().UnixMilli()
+ // heartbeat to master
+ time.Sleep(c.config.Heartbeat.Interval / 2)
+ if err := c.heartbeat2Master(ctx); err != nil {
+ log.Errorf("[CONSUMER] heartbeat2Master error %v", err)
+ } else {
+ c.lastMasterHb = time.Now().UnixMilli()
+ }
+ heartbeatRetry := 0
+ for {
+ time.Sleep(c.config.Heartbeat.Interval)
+ select {
+ case <-ctx.Done():
+ return
+ default:
}
-
- if !c.master.HasNext {
- if err != nil {
- return err
- }
- if rsp != nil {
- log.Errorf("[CONSUMER]
register2master(%s) failure exist register, client=%s, error: %s",
- c.master.Address, c.clientID,
rsp.GetErrMsg())
- }
+ if heartbeatRetry >= c.config.Heartbeat.MaxRetryTimes {
break
}
- retryCount++
- log.Warnf("[CONSUMER] register2master(%s) failure,
client=%s, retry count=%d",
- c.master.Address, c.clientID, retryCount)
- if c.master, err =
c.selector.Select(c.config.Consumer.Masters); err != nil {
- return err
+ if err := c.heartbeat2Master(ctx); err != nil {
+ log.Errorf("[CONSUMER] heartbeat2Master error
%v", err)
+ heartbeatRetry++
+ continue
+ } else {
+ heartbeatRetry = 0
+ c.lastMasterHb = time.Now().UnixMilli()
}
- continue
}
- log.Infof("register2Master response %s", rsp.String())
+ }
+}
- c.masterHBRetry = 0
- c.processRegisterResponseM2C(rsp)
- break
+func (c *consumer) register2Master(ctx context.Context) error {
+ rsp, err := c.sendRegRequest2Master(ctx)
+ if err != nil {
+ return err
+ }
+ if !rsp.GetSuccess() {
+ return errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
}
+ c.processRegisterResponseM2C(rsp)
return nil
}
-func (c *consumer) sendRegRequest2Master() (*protocol.RegisterResponseM2C,
error) {
- ctx, cancel := context.WithTimeout(context.Background(),
c.config.Net.ReadTimeout)
+func (c *consumer) sendRegRequest2Master(ctx context.Context)
(*protocol.RegisterResponseM2C, error) {
+ ctx, cancel := context.WithTimeout(ctx, c.config.Net.ReadTimeout)
defer cancel()
m := &metadata.Metadata{}
@@ -207,7 +228,6 @@ func (c *consumer) processRegisterResponseM2C(rsp
*protocol.RegisterResponseM2C)
if rsp.GetAuthorizedInfo() != nil {
c.processAuthorizedToken(rsp.GetAuthorizedInfo())
}
- c.lastMasterHb = time.Now().UnixNano() / int64(time.Millisecond)
}
func (c *consumer) processAuthorizedToken(info *protocol.MasterAuthorizedInfo)
{
@@ -215,12 +235,84 @@ func (c *consumer) processAuthorizedToken(info
*protocol.MasterAuthorizedInfo) {
c.authorizedInfo = info.GetAuthAuthorizedToken()
}
-// GetMessage implementation of TubeMQ consumer.
-func (c *consumer) GetMessage() (*ConsumerResult, error) {
- err := c.checkPartitionErr()
+func (c *consumer) heartbeat2Master(ctx context.Context) error {
+ rsp, err := c.sendHeartbeat2Master(ctx)
if err != nil {
- return nil, err
+ return err
+ }
+ if !rsp.GetSuccess() {
+ return errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
}
+ c.processHBResponseM2C(rsp)
+ return nil
+}
+
+func (c *consumer) sendHeartbeat2Master(ctx context.Context)
(*protocol.HeartResponseM2C, error) {
+ if time.Now().UnixNano()/int64(time.Millisecond)-c.lastMasterHb > 30000
{
+
c.rmtDataCache.HandleExpiredPartitions(c.config.Consumer.MaxConfirmWait)
+ }
+ m := &metadata.Metadata{}
+ node := &metadata.Node{}
+ node.SetHost(util.GetLocalHost())
+ node.SetAddress(c.master.Address)
+ m.SetNode(node)
+ sub := &metadata.SubscribeInfo{}
+ sub.SetGroup(c.config.Consumer.Group)
+ m.SetSubscribeInfo(sub)
+ auth := &protocol.AuthenticateInfo{}
+ if c.needGenMasterCertificateInfo(true) {
+ util.GenMasterAuthenticateToken(auth,
c.config.Net.Auth.UserName, c.config.Net.Auth.Password)
+ }
+ c.unreportedTimes++
+ if c.unreportedTimes > c.config.Consumer.MaxSubInfoReportInterval {
+ m.SetReportTimes(true)
+ c.unreportedTimes = 0
+ }
+
+ ctx, cancel := context.WithTimeout(ctx, c.config.Net.ReadTimeout)
+ defer cancel()
+ rsp, err := c.client.HeartRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+ return rsp, err
+}
+
+func (c *consumer) processHBResponseM2C(rsp *protocol.HeartResponseM2C) {
+ c.masterHBRetry = 0
+ if !rsp.GetNotAllocated() {
+ c.subInfo.CASIsNotAllocated(1, 0)
+ }
+ if rsp.GetDefFlowCheckId() != 0 || rsp.GetGroupFlowCheckId() != 0 {
+ if rsp.GetDefFlowCheckId() != 0 {
+
c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(),
rsp.GetDefFlowControlInfo())
+ }
+ qryPriorityID := c.rmtDataCache.GetQryPriorityID()
+ if rsp.GetQryPriorityId() != 0 {
+ qryPriorityID = rsp.GetQryPriorityId()
+ }
+ c.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID,
rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
+ }
+ if rsp.GetAuthorizedInfo() != nil {
+ c.processAuthorizedToken(rsp.GetAuthorizedInfo())
+ }
+ if rsp.GetRequireAuth() {
+ atomic.StoreInt32(&c.nextAuth2Master, 1)
+ }
+ if rsp.GetEvent() != nil {
+ event := rsp.GetEvent()
+ subscribeInfo := make([]*metadata.SubscribeInfo, 0,
len(event.GetSubscribeInfo()))
+ for _, sub := range event.GetSubscribeInfo() {
+ s, err := metadata.NewSubscribeInfo(sub)
+ if err != nil {
+ continue
+ }
+ subscribeInfo = append(subscribeInfo, s)
+ }
+ e := metadata.NewEvent(event.GetRebalanceId(),
event.GetOpType(), subscribeInfo)
+ c.rmtDataCache.OfferEvent(e)
+ }
+}
+
+// GetMessage implementation of TubeMQ consumer.
+func (c *consumer) GetMessage() (*ConsumerResult, error) {
partition, bookedTime, err := c.rmtDataCache.SelectPartition()
if err != nil {
return nil, err
@@ -372,35 +464,36 @@ func (c *consumer) GetClientID() string {
func (c *consumer) Close() {
c.closeOnce.Do(func() {
log.Infof("[CONSUMER]Begin to close consumer, client=%s",
c.clientID)
- close(c.done)
+ c.cancel()
c.heartbeatManager.close()
c.close2Master()
c.closeAllBrokers()
c.client.Close()
+ <-c.routineClosed
log.Infof("[CONSUMER]Consumer has been closed successfully,
client=%s", c.clientID)
})
}
-func (c *consumer) processRebalanceEvent() {
+func (c *consumer) processRebalanceEvent(ctx context.Context) {
log.Info("[CONSUMER]Rebalance event Handler starts!")
for {
select {
+ case <-ctx.Done():
+ log.Info("[CONSUMER] Rebalance event Handler stopped!")
+ return
case event, ok := <-c.rmtDataCache.EventCh:
if ok {
+ log.Infof("%+v", event)
c.rmtDataCache.ClearEvent()
switch event.GetEventType() {
case metadata.Disconnect,
metadata.OnlyDisconnect:
c.disconnect2Broker(event)
c.rmtDataCache.OfferEventResult(event)
case metadata.Connect, metadata.OnlyConnect:
- c.connect2Broker(event)
+ c.connect2Broker(ctx, event)
c.rmtDataCache.OfferEventResult(event)
}
}
- case <-c.done:
- log.Infof("[CONSUMER]Rebalance done, client=%s",
c.clientID)
- log.Info("[CONSUMER] Rebalance event Handler stopped!")
- return
}
}
}
@@ -424,16 +517,26 @@ func (c *consumer) unregister2Broker(unRegPartitions
map[*metadata.Node][]*metad
if len(unRegPartitions) == 0 {
return
}
-
+ var wg sync.WaitGroup
for _, partitions := range unRegPartitions {
for _, partition := range partitions {
log.Tracef("unregister2Brokers, partition key=%s",
partition.GetPartitionKey())
- c.sendUnregisterReq2Broker(partition)
+ partition := partition
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ if err :=
c.sendUnregisterReq2Broker(partition); err != nil {
+ log.Errorf("[CONSUMER] unregister
partition %+v failed, error %v", partition, err)
+ } else {
+ log.Infof("[connect2Broker] unregister
partition %+v success", partition)
+ }
+ }()
}
}
+ wg.Wait()
}
-func (c *consumer) sendUnregisterReq2Broker(partition *metadata.Partition) {
+func (c *consumer) sendUnregisterReq2Broker(partition *metadata.Partition)
error {
ctx, cancel := context.WithTimeout(context.Background(),
c.config.Net.ReadTimeout)
defer cancel()
@@ -454,20 +557,29 @@ func (c *consumer) sendUnregisterReq2Broker(partition
*metadata.Partition) {
rsp, err := c.client.UnregisterRequestC2B(ctx, m, c.subInfo)
if err != nil {
log.Errorf("[CONSUMER] fail to unregister partition %s, error
%s", partition, err.Error())
- return
+ return err
}
if !rsp.GetSuccess() {
log.Errorf("[CONSUMER] fail to unregister partition %s, err
code: %d, error msg %s",
partition, rsp.GetErrCode(), rsp.GetErrMsg())
+ return errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
}
+ return nil
}
-func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
+func (c *consumer) connect2Broker(ctx context.Context, event
*metadata.ConsumerEvent) {
log.Tracef("[connect2Broker] connect event begin, client=%s",
c.clientID)
if len(event.GetSubscribeInfo()) > 0 {
unsubPartitions :=
c.rmtDataCache.FilterPartitions(event.GetSubscribeInfo())
+ n := len(unsubPartitions)
if len(unsubPartitions) > 0 {
- for _, partition := range unsubPartitions {
+ for i, partition := range unsubPartitions {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+
node := &metadata.Node{}
node.SetHost(util.GetLocalHost())
node.SetAddress(partition.GetBroker().GetAddress())
@@ -482,6 +594,7 @@ func (c *consumer) connect2Broker(event
*metadata.ConsumerEvent) {
return
}
+ log.Infof("[connect2Broker] %v/%v register
partition %+v success", i, n, partition)
c.rmtDataCache.AddNewPartition(partition)
c.heartbeatManager.registerBroker(partition.GetBroker())
}
diff --git
a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index df6d079445..67e4faa134 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -21,7 +21,6 @@ import (
"context"
"strings"
"sync"
- "sync/atomic"
"time"
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/errs"
@@ -61,9 +60,6 @@ func (h *heartbeatManager) registerMaster(address string) {
if h.producer != nil {
heartbeatInterval = h.producer.config.Heartbeat.Interval / 2
heartbeatFunc = h.producerHB2Master
- } else if h.consumer != nil {
- heartbeatInterval = h.consumer.config.Heartbeat.Interval / 2
- heartbeatFunc = h.consumerHB2Master
}
if !ok {
@@ -133,58 +129,6 @@ func (h *heartbeatManager) producerHB2Master() {
h.resetMasterHeartbeat()
}
-func (h *heartbeatManager) consumerHB2Master() {
- if
time.Now().UnixNano()/int64(time.Millisecond)-h.consumer.lastMasterHb > 30000 {
-
h.consumer.rmtDataCache.HandleExpiredPartitions(h.consumer.config.Consumer.MaxConfirmWait)
- }
- m := &metadata.Metadata{}
- node := &metadata.Node{}
- node.SetHost(util.GetLocalHost())
- node.SetAddress(h.consumer.master.Address)
- m.SetNode(node)
- sub := &metadata.SubscribeInfo{}
- sub.SetGroup(h.consumer.config.Consumer.Group)
- m.SetSubscribeInfo(sub)
- auth := &protocol.AuthenticateInfo{}
- if h.consumer.needGenMasterCertificateInfo(true) {
- util.GenMasterAuthenticateToken(auth,
h.consumer.config.Net.Auth.UserName, h.consumer.config.Net.Auth.Password)
- }
- h.consumer.unreportedTimes++
- if h.consumer.unreportedTimes >
h.consumer.config.Consumer.MaxSubInfoReportInterval {
- m.SetReportTimes(true)
- h.consumer.unreportedTimes = 0
- }
-
- rsp, err := h.sendHeartbeatC2M(m)
- if err == nil {
- h.consumer.masterHBRetry = 0
- h.processHBResponseM2C(rsp)
- h.resetMasterHeartbeat()
- return
- }
- h.consumer.masterHBRetry++
- h.resetMasterHeartbeat()
- hbNoNode := rsp != nil && rsp.GetErrCode() == errs.RetErrHBNoNode
- standByException := false
- if e, ok := err.(*errs.Error); ok {
- standByException = strings.Index(e.Msg, "StandbyException") !=
-1
- }
- if (h.consumer.masterHBRetry >=
h.consumer.config.Heartbeat.MaxRetryTimes) || standByException || hbNoNode {
- h.deleteHeartbeat(h.consumer.master.Address)
- go func() {
- err := h.consumer.register2Master(!hbNoNode)
- if err != nil {
- log.Warnf("[CONSUMER] heartBeat2Master failure
to (%s) : %s, client=%s",
- h.consumer.master.Address,
rsp.GetErrMsg(), h.consumer.clientID)
- return
- }
- h.registerMaster(h.consumer.master.Address)
- log.Infof("[CONSUMER] heartBeat2Master success to (%s),
client=%s",
- h.consumer.master.Address, h.consumer.clientID)
- }()
- }
-}
-
func (h *heartbeatManager) resetMasterHeartbeat() {
h.mu.Lock()
defer h.mu.Unlock()
@@ -224,42 +168,6 @@ func (h *heartbeatManager) processHBResponseM2P(rsp
*protocol.HeartResponseM2P)
h.producer.updateTopicConfigure(topicInfos)
}
-func (h *heartbeatManager) processHBResponseM2C(rsp
*protocol.HeartResponseM2C) {
- h.consumer.masterHBRetry = 0
- if !rsp.GetNotAllocated() {
- h.consumer.subInfo.CASIsNotAllocated(1, 0)
- }
- if rsp.GetDefFlowCheckId() != 0 || rsp.GetGroupFlowCheckId() != 0 {
- if rsp.GetDefFlowCheckId() != 0 {
-
h.consumer.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(),
rsp.GetDefFlowControlInfo())
- }
- qryPriorityID := h.consumer.rmtDataCache.GetQryPriorityID()
- if rsp.GetQryPriorityId() != 0 {
- qryPriorityID = rsp.GetQryPriorityId()
- }
- h.consumer.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID,
rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
- }
- if rsp.GetAuthorizedInfo() != nil {
- h.consumer.processAuthorizedToken(rsp.GetAuthorizedInfo())
- }
- if rsp.GetRequireAuth() {
- atomic.StoreInt32(&h.consumer.nextAuth2Master, 1)
- }
- if rsp.GetEvent() != nil {
- event := rsp.GetEvent()
- subscribeInfo := make([]*metadata.SubscribeInfo, 0,
len(event.GetSubscribeInfo()))
- for _, sub := range event.GetSubscribeInfo() {
- s, err := metadata.NewSubscribeInfo(sub)
- if err != nil {
- continue
- }
- subscribeInfo = append(subscribeInfo, s)
- }
- e := metadata.NewEvent(event.GetRebalanceId(),
event.GetOpType(), subscribeInfo)
- h.consumer.rmtDataCache.OfferEventAndNotify(e)
- }
-}
-
func (h *heartbeatManager) nextHeartbeatInterval() time.Duration {
var interval time.Duration
if h.producer != nil {
@@ -350,10 +258,6 @@ func (h *heartbeatManager) close() {
defer h.mu.Unlock()
for _, heartbeat := range h.heartbeats {
- if !heartbeat.timer.Stop() {
- <-heartbeat.timer.C
- }
- heartbeat.timer = nil
+ heartbeat.timer.Stop()
}
- h.heartbeats = nil
}
diff --git
a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/remote/remote.go
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 13b7a15472..0563c32305 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -185,21 +185,20 @@ func (r *RmtDataCache)
UpdateGroupFlowCtrlInfo(qryPriorityID int32, flowCtrlID i
}
}
-// OfferEventAndNotify offers a consumer event and notifies the consumer
method and notify the consumer to consume.
-func (r *RmtDataCache) OfferEventAndNotify(event *metadata.ConsumerEvent) {
- r.eventReadMu.Lock()
- defer r.eventReadMu.Unlock()
- r.rebalanceResults = append(r.rebalanceResults, event)
- e := r.rebalanceResults[0]
- r.rebalanceResults = r.rebalanceResults[1:]
- r.EventCh <- e
+// OfferEvent offers a consumer event and notifies the consumer method and
notify the consumer to consume.
+func (r *RmtDataCache) OfferEvent(event *metadata.ConsumerEvent) {
+ r.EventCh <- event
}
// ClearEvent clears all the events.
func (r *RmtDataCache) ClearEvent() {
- r.eventWriteMu.Lock()
- defer r.eventWriteMu.Unlock()
- r.rebalanceResults = r.rebalanceResults[:0]
+ for {
+ select {
+ case <-r.EventCh:
+ default:
+ return
+ }
+ }
}
// OfferEventResult offers a consumer event.
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc/master.go
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc/master.go
index 386f619c9e..38274c1460 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc/master.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc/master.go
@@ -24,6 +24,7 @@ import (
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/codec"
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/errs"
+
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log"
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/metadata"
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/protocol"
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/remote"
@@ -199,6 +200,7 @@ func (c *rpcClient) HeartRequestC2M(ctx context.Context,
metadata *metadata.Meta
}
}
if event != nil {
+ log.Infof("report Event: %v", event)
ep := &protocol.EventProto{
RebalanceId: proto.Int64(event.GetRebalanceID()),
OpType: proto.Int32(event.GetEventType()),
diff --git
a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
index b1193645ec..a87075e3fb 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
@@ -19,6 +19,8 @@ package selector
import (
"errors"
+ "fmt"
+ "net"
"strings"
"sync"
)
@@ -57,11 +59,21 @@ func (s *ipSelector) Select(serviceName string) (*Node,
error) {
}
s.services[serviceName] = services
}
+ address := services.addresses[services.nextIndex]
+ services.nextIndex = (services.nextIndex + 1) % len(services.addresses)
+ host, port, err := net.SplitHostPort(address)
+ if err != nil {
+ return nil, err
+ }
+ ips, err := net.LookupHost(host)
+ if err != nil {
+ return nil, err
+ }
+ address = fmt.Sprintf("%v:%v", ips[0], port)
node := &Node{
ServiceName: serviceName,
- Address: services.addresses[services.nextIndex],
+ Address: address,
}
- services.nextIndex = (services.nextIndex + 1) % len(services.addresses)
if services.nextIndex > 0 {
node.HasNext = true
}
diff --git
a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
index c091262f82..1303b6bfd5 100644
---
a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
+++
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
@@ -34,17 +34,6 @@ func TestSingleIP(t *testing.T) {
assert.Equal(t, node.ServiceName, "127.0.0.1:9092")
}
-func TestSingleDNS(t *testing.T) {
- serviceName := "tubemq:8081"
- selector, err := Get("dns")
- assert.Nil(t, err)
- node, err := selector.Select(serviceName)
- assert.Nil(t, err)
- assert.Equal(t, node.HasNext, false)
- assert.Equal(t, node.Address, "tubemq:8081")
- assert.Equal(t, node.ServiceName, "tubemq:8081")
-}
-
func TestMultipleIP(t *testing.T) {
serviceName :=
"127.0.0.1:9091,127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"
selector, err := Get("dns")
@@ -71,32 +60,6 @@ func TestMultipleIP(t *testing.T) {
assert.Equal(t,
"127.0.0.1:9091,127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094", node.ServiceName)
}
-func TestMultipleDNS(t *testing.T) {
- serviceName := "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084"
- selector, err := Get("dns")
- assert.Nil(t, err)
- node, err := selector.Select(serviceName)
- assert.Nil(t, err)
- assert.Equal(t, true, node.HasNext)
- assert.Equal(t, "tubemq:8081", node.Address)
- assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084",
node.ServiceName)
-
- node, err = selector.Select(serviceName)
- assert.Equal(t, true, node.HasNext)
- assert.Equal(t, "tubemq:8082", node.Address)
- assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084",
node.ServiceName)
-
- node, err = selector.Select(serviceName)
- assert.Equal(t, true, node.HasNext)
- assert.Equal(t, "tubemq:8083", node.Address)
- assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084",
node.ServiceName)
-
- node, err = selector.Select(serviceName)
- assert.Equal(t, false, node.HasNext)
- assert.Equal(t, "tubemq:8084", node.Address)
- assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084",
node.ServiceName)
-}
-
func TestEmptyService(t *testing.T) {
serviceName := ""
selector, err := Get("ip")