This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 819e9474ac [INLONG-11583][TubeMQ] Go SDK load balance logic not 
perfect, causing consumption to stop suddenly (#11582)
819e9474ac is described below

commit 819e9474acb16c52486fcd896e39dd1251d4f910
Author: leonwolf <[email protected]>
AuthorDate: Fri Dec 6 14:17:24 2024 +0800

    [INLONG-11583][TubeMQ] Go SDK load balance logic not perfect, causing 
consumption to stop suddenly (#11582)
    
    1. prevent forever failing of reregistration due to ip of domain of master 
changes
    
    * [fix bug] OfferEventAndNotify should not change rebalanceResults
    
    * [fix bug] make register and heartbeat to master more golang way. the old 
way will stop working if re-register to master failed.
    
    * adjust comment
    
    ---------
    
    Co-authored-by: leonyue <[email protected]>
---
 .../tubemq-client-go/client/consumer_impl.go       | 239 +++++++++++++++------
 .../tubemq-client-go/client/heartbeat.go           |  98 +--------
 .../tubemq-client-go/remote/remote.go              |  21 +-
 .../tubemq-client-go/rpc/master.go                 |   2 +
 .../tubemq-client-go/selector/ip_selector.go       |  16 +-
 .../tubemq-client-go/selector/ip_selector_test.go  |  37 ----
 6 files changed, 203 insertions(+), 210 deletions(-)

diff --git 
a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go 
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 6cc0174835..c78f2469b5 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -68,7 +68,8 @@ type consumer struct {
        masterHBRetry    int
        heartbeatManager *heartbeatManager
        unreportedTimes  int
-       done             chan struct{}
+       cancel           context.CancelFunc
+       routineClosed    chan struct{}
        closeOnce        sync.Once
 }
 
@@ -105,72 +106,92 @@ func NewConsumer(config *config.Config) (Consumer, error) 
{
                client:          client,
                visitToken:      util.InvalidValue,
                unreportedTimes: 0,
-               done:            make(chan struct{}),
+               routineClosed:   make(chan struct{}),
        }
+
+       ctx := context.Background()
+       ctx, c.cancel = context.WithCancel(ctx)
+
        c.subInfo.SetClientID(clientID)
        hbm := newHBManager(c)
        c.heartbeatManager = hbm
-       err = c.register2Master(true)
-       if err != nil {
-               return nil, err
-       }
-       c.heartbeatManager.registerMaster(c.master.Address)
-       go c.processRebalanceEvent()
+
+       go c.routine(ctx)
+       go c.processRebalanceEvent(ctx)
+
        log.Infof("[CONSUMER] start consumer success, client=%s", clientID)
        return c, nil
 }
 
-func (c *consumer) register2Master(needChange bool) error {
-       if needChange {
-               c.selector.Refresh(c.config.Consumer.Masters)
+func (c *consumer) routine(ctx context.Context) {
+       defer close(c.routineClosed)
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               default:
+               }
+               // select master
                node, err := c.selector.Select(c.config.Consumer.Masters)
                if err != nil {
-                       return err
+                       log.Errorf("[CONSUMER] select error %v", err)
+                       time.Sleep(time.Second)
+                       continue
                }
                c.master = node
-       }
-       retryCount := 0
-       for {
-               rsp, err := c.sendRegRequest2Master()
-               if err != nil || !rsp.GetSuccess() {
-                       if err != nil {
-                               log.Errorf("[CONSUMER]register2Master error 
%s", err.Error())
-                       } else if rsp.GetErrCode() == 
errs.RetConsumeGroupForbidden ||
-                               rsp.GetErrCode() == 
errs.RetConsumeContentForbidden {
-                               log.Warnf("[CONSUMER] register2master(%s) 
failure exist register, client=%s, error: %s",
-                                       c.master.Address, c.clientID, 
rsp.GetErrMsg())
-                               return errs.New(rsp.GetErrCode(), 
rsp.GetErrMsg())
+               log.Infof("[CONSUMER] master %+v", c.master)
+               // register to master
+               if err := c.register2Master(ctx); err != nil {
+                       log.Errorf("[CONSUMER] register2Master error %v", err)
+                       time.Sleep(time.Second)
+                       continue
+               }
+               c.lastMasterHb = time.Now().UnixMilli()
+               // heartbeat to master
+               time.Sleep(c.config.Heartbeat.Interval / 2)
+               if err := c.heartbeat2Master(ctx); err != nil {
+                       log.Errorf("[CONSUMER] heartbeat2Master error %v", err)
+               } else {
+                       c.lastMasterHb = time.Now().UnixMilli()
+               }
+               heartbeatRetry := 0
+               for {
+                       time.Sleep(c.config.Heartbeat.Interval)
+                       select {
+                       case <-ctx.Done():
+                               return
+                       default:
                        }
-
-                       if !c.master.HasNext {
-                               if err != nil {
-                                       return err
-                               }
-                               if rsp != nil {
-                                       log.Errorf("[CONSUMER] 
register2master(%s) failure exist register, client=%s, error: %s",
-                                               c.master.Address, c.clientID, 
rsp.GetErrMsg())
-                               }
+                       if heartbeatRetry >= c.config.Heartbeat.MaxRetryTimes {
                                break
                        }
-                       retryCount++
-                       log.Warnf("[CONSUMER] register2master(%s) failure, 
client=%s, retry count=%d",
-                               c.master.Address, c.clientID, retryCount)
-                       if c.master, err = 
c.selector.Select(c.config.Consumer.Masters); err != nil {
-                               return err
+                       if err := c.heartbeat2Master(ctx); err != nil {
+                               log.Errorf("[CONSUMER] heartbeat2Master error 
%v", err)
+                               heartbeatRetry++
+                               continue
+                       } else {
+                               heartbeatRetry = 0
+                               c.lastMasterHb = time.Now().UnixMilli()
                        }
-                       continue
                }
-               log.Infof("register2Master response %s", rsp.String())
+       }
+}
 
-               c.masterHBRetry = 0
-               c.processRegisterResponseM2C(rsp)
-               break
+func (c *consumer) register2Master(ctx context.Context) error {
+       rsp, err := c.sendRegRequest2Master(ctx)
+       if err != nil {
+               return err
+       }
+       if !rsp.GetSuccess() {
+               return errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
        }
+       c.processRegisterResponseM2C(rsp)
        return nil
 }
 
-func (c *consumer) sendRegRequest2Master() (*protocol.RegisterResponseM2C, 
error) {
-       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
+func (c *consumer) sendRegRequest2Master(ctx context.Context) 
(*protocol.RegisterResponseM2C, error) {
+       ctx, cancel := context.WithTimeout(ctx, c.config.Net.ReadTimeout)
        defer cancel()
 
        m := &metadata.Metadata{}
@@ -207,7 +228,6 @@ func (c *consumer) processRegisterResponseM2C(rsp 
*protocol.RegisterResponseM2C)
        if rsp.GetAuthorizedInfo() != nil {
                c.processAuthorizedToken(rsp.GetAuthorizedInfo())
        }
-       c.lastMasterHb = time.Now().UnixNano() / int64(time.Millisecond)
 }
 
 func (c *consumer) processAuthorizedToken(info *protocol.MasterAuthorizedInfo) 
{
@@ -215,12 +235,84 @@ func (c *consumer) processAuthorizedToken(info 
*protocol.MasterAuthorizedInfo) {
        c.authorizedInfo = info.GetAuthAuthorizedToken()
 }
 
-// GetMessage implementation of TubeMQ consumer.
-func (c *consumer) GetMessage() (*ConsumerResult, error) {
-       err := c.checkPartitionErr()
+func (c *consumer) heartbeat2Master(ctx context.Context) error {
+       rsp, err := c.sendHeartbeat2Master(ctx)
        if err != nil {
-               return nil, err
+               return err
+       }
+       if !rsp.GetSuccess() {
+               return errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
        }
+       c.processHBResponseM2C(rsp)
+       return nil
+}
+
+func (c *consumer) sendHeartbeat2Master(ctx context.Context) 
(*protocol.HeartResponseM2C, error) {
+       if time.Now().UnixNano()/int64(time.Millisecond)-c.lastMasterHb > 30000 
{
+               
c.rmtDataCache.HandleExpiredPartitions(c.config.Consumer.MaxConfirmWait)
+       }
+       m := &metadata.Metadata{}
+       node := &metadata.Node{}
+       node.SetHost(util.GetLocalHost())
+       node.SetAddress(c.master.Address)
+       m.SetNode(node)
+       sub := &metadata.SubscribeInfo{}
+       sub.SetGroup(c.config.Consumer.Group)
+       m.SetSubscribeInfo(sub)
+       auth := &protocol.AuthenticateInfo{}
+       if c.needGenMasterCertificateInfo(true) {
+               util.GenMasterAuthenticateToken(auth, 
c.config.Net.Auth.UserName, c.config.Net.Auth.Password)
+       }
+       c.unreportedTimes++
+       if c.unreportedTimes > c.config.Consumer.MaxSubInfoReportInterval {
+               m.SetReportTimes(true)
+               c.unreportedTimes = 0
+       }
+
+       ctx, cancel := context.WithTimeout(ctx, c.config.Net.ReadTimeout)
+       defer cancel()
+       rsp, err := c.client.HeartRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+       return rsp, err
+}
+
+func (c *consumer) processHBResponseM2C(rsp *protocol.HeartResponseM2C) {
+       c.masterHBRetry = 0
+       if !rsp.GetNotAllocated() {
+               c.subInfo.CASIsNotAllocated(1, 0)
+       }
+       if rsp.GetDefFlowCheckId() != 0 || rsp.GetGroupFlowCheckId() != 0 {
+               if rsp.GetDefFlowCheckId() != 0 {
+                       
c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), 
rsp.GetDefFlowControlInfo())
+               }
+               qryPriorityID := c.rmtDataCache.GetQryPriorityID()
+               if rsp.GetQryPriorityId() != 0 {
+                       qryPriorityID = rsp.GetQryPriorityId()
+               }
+               c.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, 
rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
+       }
+       if rsp.GetAuthorizedInfo() != nil {
+               c.processAuthorizedToken(rsp.GetAuthorizedInfo())
+       }
+       if rsp.GetRequireAuth() {
+               atomic.StoreInt32(&c.nextAuth2Master, 1)
+       }
+       if rsp.GetEvent() != nil {
+               event := rsp.GetEvent()
+               subscribeInfo := make([]*metadata.SubscribeInfo, 0, 
len(event.GetSubscribeInfo()))
+               for _, sub := range event.GetSubscribeInfo() {
+                       s, err := metadata.NewSubscribeInfo(sub)
+                       if err != nil {
+                               continue
+                       }
+                       subscribeInfo = append(subscribeInfo, s)
+               }
+               e := metadata.NewEvent(event.GetRebalanceId(), 
event.GetOpType(), subscribeInfo)
+               c.rmtDataCache.OfferEvent(e)
+       }
+}
+
+// GetMessage implementation of TubeMQ consumer.
+func (c *consumer) GetMessage() (*ConsumerResult, error) {
        partition, bookedTime, err := c.rmtDataCache.SelectPartition()
        if err != nil {
                return nil, err
@@ -372,35 +464,36 @@ func (c *consumer) GetClientID() string {
 func (c *consumer) Close() {
        c.closeOnce.Do(func() {
                log.Infof("[CONSUMER]Begin to close consumer, client=%s", 
c.clientID)
-               close(c.done)
+               c.cancel()
                c.heartbeatManager.close()
                c.close2Master()
                c.closeAllBrokers()
                c.client.Close()
+               <-c.routineClosed
                log.Infof("[CONSUMER]Consumer has been closed successfully, 
client=%s", c.clientID)
        })
 }
 
-func (c *consumer) processRebalanceEvent() {
+func (c *consumer) processRebalanceEvent(ctx context.Context) {
        log.Info("[CONSUMER]Rebalance event Handler starts!")
        for {
                select {
+               case <-ctx.Done():
+                       log.Info("[CONSUMER] Rebalance event Handler stopped!")
+                       return
                case event, ok := <-c.rmtDataCache.EventCh:
                        if ok {
+                               log.Infof("%+v", event)
                                c.rmtDataCache.ClearEvent()
                                switch event.GetEventType() {
                                case metadata.Disconnect, 
metadata.OnlyDisconnect:
                                        c.disconnect2Broker(event)
                                        c.rmtDataCache.OfferEventResult(event)
                                case metadata.Connect, metadata.OnlyConnect:
-                                       c.connect2Broker(event)
+                                       c.connect2Broker(ctx, event)
                                        c.rmtDataCache.OfferEventResult(event)
                                }
                        }
-               case <-c.done:
-                       log.Infof("[CONSUMER]Rebalance done, client=%s", 
c.clientID)
-                       log.Info("[CONSUMER] Rebalance event Handler stopped!")
-                       return
                }
        }
 }
@@ -424,16 +517,26 @@ func (c *consumer) unregister2Broker(unRegPartitions 
map[*metadata.Node][]*metad
        if len(unRegPartitions) == 0 {
                return
        }
-
+       var wg sync.WaitGroup
        for _, partitions := range unRegPartitions {
                for _, partition := range partitions {
                        log.Tracef("unregister2Brokers, partition key=%s", 
partition.GetPartitionKey())
-                       c.sendUnregisterReq2Broker(partition)
+                       partition := partition
+                       wg.Add(1)
+                       go func() {
+                               defer wg.Done()
+                               if err := 
c.sendUnregisterReq2Broker(partition); err != nil {
+                                       log.Errorf("[CONSUMER] unregister 
partition %+v failed, error %v", partition, err)
+                               } else {
+                                       log.Infof("[connect2Broker] unregister 
partition %+v success", partition)
+                               }
+                       }()
                }
        }
+       wg.Wait()
 }
 
-func (c *consumer) sendUnregisterReq2Broker(partition *metadata.Partition) {
+func (c *consumer) sendUnregisterReq2Broker(partition *metadata.Partition) 
error {
        ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
        defer cancel()
 
@@ -454,20 +557,29 @@ func (c *consumer) sendUnregisterReq2Broker(partition 
*metadata.Partition) {
        rsp, err := c.client.UnregisterRequestC2B(ctx, m, c.subInfo)
        if err != nil {
                log.Errorf("[CONSUMER] fail to unregister partition %s, error 
%s", partition, err.Error())
-               return
+               return err
        }
        if !rsp.GetSuccess() {
                log.Errorf("[CONSUMER] fail to unregister partition %s, err 
code: %d, error msg %s",
                        partition, rsp.GetErrCode(), rsp.GetErrMsg())
+               return errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
        }
+       return nil
 }
 
-func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
+func (c *consumer) connect2Broker(ctx context.Context, event 
*metadata.ConsumerEvent) {
        log.Tracef("[connect2Broker] connect event begin, client=%s", 
c.clientID)
        if len(event.GetSubscribeInfo()) > 0 {
                unsubPartitions := 
c.rmtDataCache.FilterPartitions(event.GetSubscribeInfo())
+               n := len(unsubPartitions)
                if len(unsubPartitions) > 0 {
-                       for _, partition := range unsubPartitions {
+                       for i, partition := range unsubPartitions {
+                               select {
+                               case <-ctx.Done():
+                                       return
+                               default:
+                               }
+
                                node := &metadata.Node{}
                                node.SetHost(util.GetLocalHost())
                                
node.SetAddress(partition.GetBroker().GetAddress())
@@ -482,6 +594,7 @@ func (c *consumer) connect2Broker(event 
*metadata.ConsumerEvent) {
                                        return
                                }
 
+                               log.Infof("[connect2Broker] %v/%v register 
partition %+v success", i, n, partition)
                                c.rmtDataCache.AddNewPartition(partition)
                                
c.heartbeatManager.registerBroker(partition.GetBroker())
                        }
diff --git 
a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/heartbeat.go 
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index df6d079445..67e4faa134 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -21,7 +21,6 @@ import (
        "context"
        "strings"
        "sync"
-       "sync/atomic"
        "time"
 
        
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/errs"
@@ -61,9 +60,6 @@ func (h *heartbeatManager) registerMaster(address string) {
        if h.producer != nil {
                heartbeatInterval = h.producer.config.Heartbeat.Interval / 2
                heartbeatFunc = h.producerHB2Master
-       } else if h.consumer != nil {
-               heartbeatInterval = h.consumer.config.Heartbeat.Interval / 2
-               heartbeatFunc = h.consumerHB2Master
        }
 
        if !ok {
@@ -133,58 +129,6 @@ func (h *heartbeatManager) producerHB2Master() {
        h.resetMasterHeartbeat()
 }
 
-func (h *heartbeatManager) consumerHB2Master() {
-       if 
time.Now().UnixNano()/int64(time.Millisecond)-h.consumer.lastMasterHb > 30000 {
-               
h.consumer.rmtDataCache.HandleExpiredPartitions(h.consumer.config.Consumer.MaxConfirmWait)
-       }
-       m := &metadata.Metadata{}
-       node := &metadata.Node{}
-       node.SetHost(util.GetLocalHost())
-       node.SetAddress(h.consumer.master.Address)
-       m.SetNode(node)
-       sub := &metadata.SubscribeInfo{}
-       sub.SetGroup(h.consumer.config.Consumer.Group)
-       m.SetSubscribeInfo(sub)
-       auth := &protocol.AuthenticateInfo{}
-       if h.consumer.needGenMasterCertificateInfo(true) {
-               util.GenMasterAuthenticateToken(auth, 
h.consumer.config.Net.Auth.UserName, h.consumer.config.Net.Auth.Password)
-       }
-       h.consumer.unreportedTimes++
-       if h.consumer.unreportedTimes > 
h.consumer.config.Consumer.MaxSubInfoReportInterval {
-               m.SetReportTimes(true)
-               h.consumer.unreportedTimes = 0
-       }
-
-       rsp, err := h.sendHeartbeatC2M(m)
-       if err == nil {
-               h.consumer.masterHBRetry = 0
-               h.processHBResponseM2C(rsp)
-               h.resetMasterHeartbeat()
-               return
-       }
-       h.consumer.masterHBRetry++
-       h.resetMasterHeartbeat()
-       hbNoNode := rsp != nil && rsp.GetErrCode() == errs.RetErrHBNoNode
-       standByException := false
-       if e, ok := err.(*errs.Error); ok {
-               standByException = strings.Index(e.Msg, "StandbyException") != 
-1
-       }
-       if (h.consumer.masterHBRetry >= 
h.consumer.config.Heartbeat.MaxRetryTimes) || standByException || hbNoNode {
-               h.deleteHeartbeat(h.consumer.master.Address)
-               go func() {
-                       err := h.consumer.register2Master(!hbNoNode)
-                       if err != nil {
-                               log.Warnf("[CONSUMER] heartBeat2Master failure 
to (%s) : %s, client=%s",
-                                       h.consumer.master.Address, 
rsp.GetErrMsg(), h.consumer.clientID)
-                               return
-                       }
-                       h.registerMaster(h.consumer.master.Address)
-                       log.Infof("[CONSUMER] heartBeat2Master success to (%s), 
client=%s",
-                               h.consumer.master.Address, h.consumer.clientID)
-               }()
-       }
-}
-
 func (h *heartbeatManager) resetMasterHeartbeat() {
        h.mu.Lock()
        defer h.mu.Unlock()
@@ -224,42 +168,6 @@ func (h *heartbeatManager) processHBResponseM2P(rsp 
*protocol.HeartResponseM2P)
        h.producer.updateTopicConfigure(topicInfos)
 }
 
-func (h *heartbeatManager) processHBResponseM2C(rsp 
*protocol.HeartResponseM2C) {
-       h.consumer.masterHBRetry = 0
-       if !rsp.GetNotAllocated() {
-               h.consumer.subInfo.CASIsNotAllocated(1, 0)
-       }
-       if rsp.GetDefFlowCheckId() != 0 || rsp.GetGroupFlowCheckId() != 0 {
-               if rsp.GetDefFlowCheckId() != 0 {
-                       
h.consumer.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), 
rsp.GetDefFlowControlInfo())
-               }
-               qryPriorityID := h.consumer.rmtDataCache.GetQryPriorityID()
-               if rsp.GetQryPriorityId() != 0 {
-                       qryPriorityID = rsp.GetQryPriorityId()
-               }
-               h.consumer.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, 
rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
-       }
-       if rsp.GetAuthorizedInfo() != nil {
-               h.consumer.processAuthorizedToken(rsp.GetAuthorizedInfo())
-       }
-       if rsp.GetRequireAuth() {
-               atomic.StoreInt32(&h.consumer.nextAuth2Master, 1)
-       }
-       if rsp.GetEvent() != nil {
-               event := rsp.GetEvent()
-               subscribeInfo := make([]*metadata.SubscribeInfo, 0, 
len(event.GetSubscribeInfo()))
-               for _, sub := range event.GetSubscribeInfo() {
-                       s, err := metadata.NewSubscribeInfo(sub)
-                       if err != nil {
-                               continue
-                       }
-                       subscribeInfo = append(subscribeInfo, s)
-               }
-               e := metadata.NewEvent(event.GetRebalanceId(), 
event.GetOpType(), subscribeInfo)
-               h.consumer.rmtDataCache.OfferEventAndNotify(e)
-       }
-}
-
 func (h *heartbeatManager) nextHeartbeatInterval() time.Duration {
        var interval time.Duration
        if h.producer != nil {
@@ -350,10 +258,6 @@ func (h *heartbeatManager) close() {
        defer h.mu.Unlock()
 
        for _, heartbeat := range h.heartbeats {
-               if !heartbeat.timer.Stop() {
-                       <-heartbeat.timer.C
-               }
-               heartbeat.timer = nil
+               heartbeat.timer.Stop()
        }
-       h.heartbeats = nil
 }
diff --git 
a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/remote/remote.go 
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 13b7a15472..0563c32305 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -185,21 +185,20 @@ func (r *RmtDataCache) 
UpdateGroupFlowCtrlInfo(qryPriorityID int32, flowCtrlID i
        }
 }
 
-// OfferEventAndNotify offers a consumer event and notifies the consumer 
method and notify the consumer to consume.
-func (r *RmtDataCache) OfferEventAndNotify(event *metadata.ConsumerEvent) {
-       r.eventReadMu.Lock()
-       defer r.eventReadMu.Unlock()
-       r.rebalanceResults = append(r.rebalanceResults, event)
-       e := r.rebalanceResults[0]
-       r.rebalanceResults = r.rebalanceResults[1:]
-       r.EventCh <- e
+// OfferEvent offers a consumer event and notifies the consumer method and 
notify the consumer to consume.
+func (r *RmtDataCache) OfferEvent(event *metadata.ConsumerEvent) {
+       r.EventCh <- event
 }
 
 // ClearEvent clears all the events.
 func (r *RmtDataCache) ClearEvent() {
-       r.eventWriteMu.Lock()
-       defer r.eventWriteMu.Unlock()
-       r.rebalanceResults = r.rebalanceResults[:0]
+       for {
+               select {
+               case <-r.EventCh:
+               default:
+                       return
+               }
+       }
 }
 
 // OfferEventResult offers a consumer event.
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc/master.go 
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc/master.go
index 386f619c9e..38274c1460 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc/master.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/rpc/master.go
@@ -24,6 +24,7 @@ import (
 
        
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/codec"
        
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/errs"
+       
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log"
        
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/metadata"
        
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/protocol"
        
"github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/remote"
@@ -199,6 +200,7 @@ func (c *rpcClient) HeartRequestC2M(ctx context.Context, 
metadata *metadata.Meta
                }
        }
        if event != nil {
+               log.Infof("report Event: %v", event)
                ep := &protocol.EventProto{
                        RebalanceId: proto.Int64(event.GetRebalanceID()),
                        OpType:      proto.Int32(event.GetEventType()),
diff --git 
a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go 
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
index b1193645ec..a87075e3fb 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
@@ -19,6 +19,8 @@ package selector
 
 import (
        "errors"
+       "fmt"
+       "net"
        "strings"
        "sync"
 )
@@ -57,11 +59,21 @@ func (s *ipSelector) Select(serviceName string) (*Node, 
error) {
                }
                s.services[serviceName] = services
        }
+       address := services.addresses[services.nextIndex]
+       services.nextIndex = (services.nextIndex + 1) % len(services.addresses)
+       host, port, err := net.SplitHostPort(address)
+       if err != nil {
+               return nil, err
+       }
+       ips, err := net.LookupHost(host)
+       if err != nil {
+               return nil, err
+       }
+       address = fmt.Sprintf("%v:%v", ips[0], port)
        node := &Node{
                ServiceName: serviceName,
-               Address:     services.addresses[services.nextIndex],
+               Address:     address,
        }
-       services.nextIndex = (services.nextIndex + 1) % len(services.addresses)
        if services.nextIndex > 0 {
                node.HasNext = true
        }
diff --git 
a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
 
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
index c091262f82..1303b6bfd5 100644
--- 
a/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
+++ 
b/inlong-tubemq/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
@@ -34,17 +34,6 @@ func TestSingleIP(t *testing.T) {
        assert.Equal(t, node.ServiceName, "127.0.0.1:9092")
 }
 
-func TestSingleDNS(t *testing.T) {
-       serviceName := "tubemq:8081"
-       selector, err := Get("dns")
-       assert.Nil(t, err)
-       node, err := selector.Select(serviceName)
-       assert.Nil(t, err)
-       assert.Equal(t, node.HasNext, false)
-       assert.Equal(t, node.Address, "tubemq:8081")
-       assert.Equal(t, node.ServiceName, "tubemq:8081")
-}
-
 func TestMultipleIP(t *testing.T) {
        serviceName := 
"127.0.0.1:9091,127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"
        selector, err := Get("dns")
@@ -71,32 +60,6 @@ func TestMultipleIP(t *testing.T) {
        assert.Equal(t, 
"127.0.0.1:9091,127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094", node.ServiceName)
 }
 
-func TestMultipleDNS(t *testing.T) {
-       serviceName := "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084"
-       selector, err := Get("dns")
-       assert.Nil(t, err)
-       node, err := selector.Select(serviceName)
-       assert.Nil(t, err)
-       assert.Equal(t, true, node.HasNext)
-       assert.Equal(t, "tubemq:8081", node.Address)
-       assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", 
node.ServiceName)
-
-       node, err = selector.Select(serviceName)
-       assert.Equal(t, true, node.HasNext)
-       assert.Equal(t, "tubemq:8082", node.Address)
-       assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", 
node.ServiceName)
-
-       node, err = selector.Select(serviceName)
-       assert.Equal(t, true, node.HasNext)
-       assert.Equal(t, "tubemq:8083", node.Address)
-       assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", 
node.ServiceName)
-
-       node, err = selector.Select(serviceName)
-       assert.Equal(t, false, node.HasNext)
-       assert.Equal(t, "tubemq:8084", node.Address)
-       assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", 
node.ServiceName)
-}
-
 func TestEmptyService(t *testing.T) {
        serviceName := ""
        selector, err := Get("ip")

Reply via email to