This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 2d64bb530f302ab97bbc2930197909b10e8152df
Author: Zijie Lu <[email protected]>
AuthorDate: Tue Jun 8 17:40:17 2021 +0800

    Address review comments
    
    Signed-off-by: Zijie Lu <[email protected]>
---
 .../tubemq-client-go/client/consumer.go            |   6 +-
 .../tubemq-client-go/client/consumer_impl.go       | 159 ++++++++++++---------
 .../tubemq-client-go/client/heartbeat.go           |  35 +++--
 .../tubemq-client-go/client/version.go             |   5 +
 .../tubemq-client-go/metadata/consumer_event.go    |   7 +
 .../tubemq-client-go/remote/remote.go              |  23 +--
 6 files changed, 130 insertions(+), 105 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go 
b/tubemq-client-twins/tubemq-client-go/client/consumer.go
index 27a8536..a74dfa5 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -19,10 +19,6 @@
 // which can be exposed to user.
 package client
 
-const (
-       tubeMQClientVersion = "0.1.0"
-)
-
 // ConsumerResult of a consumption.
 type ConsumerResult struct {
 }
@@ -31,7 +27,7 @@ type ConsumerResult struct {
 type ConsumerOffset struct {
 }
 
-var clientIndex uint64
+var clientID uint64
 
 // Consumer is an interface that abstracts behavior of TubeMQ's consumer
 type Consumer interface {
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go 
b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index b7312bb..674f810 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -61,6 +61,7 @@ type consumer struct {
        masterHBRetry    int
        heartbeatManager *heartbeatManager
        unreportedTimes  int
+       done             chan struct{}
 }
 
 // NewConsumer returns a consumer which is constructed by a given config.
@@ -70,7 +71,7 @@ func NewConsumer(config *config.Config) (Consumer, error) {
                return nil, err
        }
 
-       clientID := newClientID(config.Consumer.Group)
+       clientID := newClient(config.Consumer.Group)
        pool := multiplexing.NewPool()
        opts := &transport.Options{}
        if config.Net.TLS.Enable {
@@ -118,40 +119,18 @@ func (c *consumer) register2Master(needChange bool) error 
{
                c.master = node
        }
        for c.master.HasNext {
-               ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
-
-               m := &metadata.Metadata{}
-               node := &metadata.Node{}
-               node.SetHost(util.GetLocalHost())
-               node.SetAddress(c.master.Address)
-               m.SetNode(node)
-               sub := &metadata.SubscribeInfo{}
-               sub.SetGroup(c.config.Consumer.Group)
-               m.SetSubscribeInfo(sub)
-
-               auth := &protocol.AuthenticateInfo{}
-               c.genMasterAuthenticateToken(auth, true)
-               mci := &protocol.MasterCertificateInfo{
-                       AuthInfo: auth,
-               }
-               c.subInfo.SetMasterCertificateInfo(mci)
-
-               rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, 
c.rmtDataCache)
+               rsp, err := c.sendRegRequest2Master()
                if err != nil {
-                       cancel()
                        return err
                }
                if rsp.GetSuccess() {
                        c.masterHBRetry = 0
                        c.processRegisterResponseM2C(rsp)
-                       cancel()
                        return nil
                } else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || 
rsp.GetErrCode() == errs.RetConsumeContentForbidden {
-                       cancel()
                        return nil
                } else {
                        c.master, err = 
c.selector.Select(c.config.Consumer.Masters)
-                       cancel()
                        if err != nil {
                                return err
                        }
@@ -160,6 +139,30 @@ func (c *consumer) register2Master(needChange bool) error {
        return nil
 }
 
+func (c *consumer) sendRegRequest2Master() (*protocol.RegisterResponseM2C, 
error) {
+       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
+       defer cancel()
+
+       m := &metadata.Metadata{}
+       node := &metadata.Node{}
+       node.SetHost(util.GetLocalHost())
+       node.SetAddress(c.master.Address)
+       m.SetNode(node)
+       sub := &metadata.SubscribeInfo{}
+       sub.SetGroup(c.config.Consumer.Group)
+       m.SetSubscribeInfo(sub)
+
+       auth := &protocol.AuthenticateInfo{}
+       c.genMasterAuthenticateToken(auth, true)
+       mci := &protocol.MasterCertificateInfo{
+               AuthInfo: auth,
+       }
+       c.subInfo.SetMasterCertificateInfo(mci)
+
+       rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, 
c.rmtDataCache)
+       return rsp, err
+}
+
 func (c *consumer) processRegisterResponseM2C(rsp 
*protocol.RegisterResponseM2C) {
        if rsp.GetNotAllocated() {
                c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
@@ -202,19 +205,25 @@ func (c *consumer) GetCurrConsumedInfo() 
(map[string]*ConsumerOffset, error) {
 
 func (c *consumer) processRebalanceEvent() {
        for {
-               event := c.rmtDataCache.TakeEvent()
-               if event.GetEventStatus() == int32(util.InvalidValue) && 
event.GetRebalanceID() == util.InvalidValue {
+               select {
+               case event, ok := <-c.rmtDataCache.EventCh:
+                       if ok {
+                               if event.GetEventStatus() == 
int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue {
+                                       break
+                               }
+                               c.rmtDataCache.ClearEvent()
+                               switch event.GetEventType() {
+                               case metadata.Disconnect, 
metadata.OnlyDisconnect:
+                                       c.disconnect2Broker(event)
+                                       c.rmtDataCache.OfferEventResult(event)
+                               case metadata.Connect, metadata.OnlyConnect:
+                                       c.connect2Broker(event)
+                                       c.rmtDataCache.OfferEventResult(event)
+                               }
+                       }
+               case <-c.done:
                        break
                }
-               c.rmtDataCache.ClearEvent()
-               switch event.GetEventType() {
-               case 2, 20:
-                       c.disconnect2Broker(event)
-                       c.rmtDataCache.OfferEventResult(event)
-               case 1, 10:
-                       c.connect2Broker(event)
-                       c.rmtDataCache.OfferEventResult(event)
-               }
        }
 }
 
@@ -237,48 +246,41 @@ func (c *consumer) unregister2Broker(unRegPartitions 
map[*metadata.Node][]*metad
 
        for _, partitions := range unRegPartitions {
                for _, partition := range partitions {
-                       ctx, cancel := 
context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
-
-                       m := &metadata.Metadata{}
-                       node := &metadata.Node{}
-                       node.SetHost(util.GetLocalHost())
-                       node.SetAddress(partition.GetBroker().GetAddress())
-                       m.SetNode(node)
-                       m.SetReadStatus(1)
-                       sub := &metadata.SubscribeInfo{}
-                       sub.SetGroup(c.config.Consumer.Group)
-                       sub.SetConsumerID(c.clientID)
-                       sub.SetPartition(partition)
-                       m.SetSubscribeInfo(sub)
-
-                       c.client.UnregisterRequestC2B(ctx, m, c.subInfo)
-                       cancel()
+                       c.sendUnregisterReq2Broker(partition)
                }
        }
 }
 
+func (c *consumer) sendUnregisterReq2Broker(partition *metadata.Partition) {
+       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
+       defer cancel()
+
+       m := &metadata.Metadata{}
+       node := &metadata.Node{}
+       node.SetHost(util.GetLocalHost())
+       node.SetAddress(partition.GetBroker().GetAddress())
+       m.SetNode(node)
+       m.SetReadStatus(1)
+       sub := &metadata.SubscribeInfo{}
+       sub.SetGroup(c.config.Consumer.Group)
+       sub.SetConsumerID(c.clientID)
+       sub.SetPartition(partition)
+       m.SetSubscribeInfo(sub)
+
+       c.client.UnregisterRequestC2B(ctx, m, c.subInfo)
+}
+
 func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
        if len(event.GetSubscribeInfo()) > 0 {
                unsubPartitions := 
c.rmtDataCache.FilterPartitions(event.GetSubscribeInfo())
                if len(unsubPartitions) > 0 {
                        for _, partition := range unsubPartitions {
-                               m := &metadata.Metadata{}
+
                                node := &metadata.Node{}
                                node.SetHost(util.GetLocalHost())
                                
node.SetAddress(partition.GetBroker().GetAddress())
-                               m.SetNode(node)
-                               sub := &metadata.SubscribeInfo{}
-                               sub.SetGroup(c.config.Consumer.Group)
-                               sub.SetConsumerID(c.clientID)
-                               sub.SetPartition(partition)
-                               m.SetSubscribeInfo(sub)
-                               isFirstRegister := 
c.rmtDataCache.IsFirstRegister(partition.GetPartitionKey())
-                               
m.SetReadStatus(c.getConsumeReadStatus(isFirstRegister))
-                               auth := c.genBrokerAuthenticInfo(true)
-                               c.subInfo.SetAuthorizedInfo(auth)
-
-                               ctx, cancel := 
context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
-                               rsp, err := c.client.RegisterRequestC2B(ctx, m, 
c.subInfo, c.rmtDataCache)
+
+                               rsp, err := c.sendRegisterReq2Broker(partition, 
node)
                                if err != nil {
                                        //todo add log
                                }
@@ -286,24 +288,39 @@ func (c *consumer) connect2Broker(event 
*metadata.ConsumerEvent) {
                                        
c.rmtDataCache.AddNewPartition(partition)
                                        c.heartbeatManager.registerBroker(node)
                                }
-                               cancel()
                        }
                }
        }
        c.subInfo.FirstRegistered()
-       event.SetEventStatus(2)
+       event.SetEventStatus(metadata.Disconnect)
 }
 
-func newClientIndex() uint64 {
-       return atomic.AddUint64(&clientIndex, 1)
+func (c *consumer) sendRegisterReq2Broker(partition *metadata.Partition, node 
*metadata.Node) (*protocol.RegisterResponseB2C, error) {
+       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
+       defer cancel()
+
+       m := &metadata.Metadata{}
+       m.SetNode(node)
+       sub := &metadata.SubscribeInfo{}
+       sub.SetGroup(c.config.Consumer.Group)
+       sub.SetConsumerID(c.clientID)
+       sub.SetPartition(partition)
+       m.SetSubscribeInfo(sub)
+       isFirstRegister := 
c.rmtDataCache.IsFirstRegister(partition.GetPartitionKey())
+       m.SetReadStatus(c.getConsumeReadStatus(isFirstRegister))
+       auth := c.genBrokerAuthenticInfo(true)
+       c.subInfo.SetAuthorizedInfo(auth)
+
+       rsp, err := c.client.RegisterRequestC2B(ctx, m, c.subInfo, 
c.rmtDataCache)
+       return rsp, err
 }
 
-func newClientID(group string) string {
+func newClient(group string) string {
        return group + "_" +
                util.GetLocalHost() + "_" +
                strconv.Itoa(os.Getpid()) + "_" +
                strconv.Itoa(int(time.Now().Unix()*1000)) + "_" +
-               strconv.Itoa(int(newClientIndex())) + "_" +
+               strconv.Itoa(int(atomic.AddUint64(&clientID, 1))) + "_" +
                tubeMQClientVersion
 }
 
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go 
b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index 9c29854..5ec0b5c 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -95,17 +95,14 @@ func (h *heartbeatManager) consumerHB2Master() {
 
        retry := 0
        for retry < h.consumer.config.Heartbeat.MaxRetryTimes {
-               ctx, cancel := context.WithTimeout(context.Background(), 
h.consumer.config.Net.ReadTimeout)
-               rsp, err := h.consumer.client.HeartRequestC2M(ctx, m, 
h.consumer.subInfo, h.consumer.rmtDataCache)
+               rsp, err := h.sendHeartbeatC2M(m)
                if err != nil {
-                       cancel()
+                       continue
                }
                if rsp.GetSuccess() {
-                       cancel()
                        h.processHBResponseM2C(rsp)
                        break
                } else if rsp.GetErrCode() == errs.RetErrHBNoNode || 
strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 {
-                       cancel()
                        h.consumer.masterHBRetry++
                        address := h.consumer.master.Address
                        go h.consumer.register2Master(rsp.GetErrCode() != 
errs.RetErrHBNoNode)
@@ -120,7 +117,6 @@ func (h *heartbeatManager) consumerHB2Master() {
                        }
                        return
                }
-               cancel()
        }
        h.mu.Lock()
        defer h.mu.Unlock()
@@ -128,6 +124,13 @@ func (h *heartbeatManager) consumerHB2Master() {
        hm.timer.Reset(h.nextHeartbeatInterval())
 }
 
+func (h *heartbeatManager) sendHeartbeatC2M(m *metadata.Metadata) 
(*protocol.HeartResponseM2C, error) {
+       ctx, cancel := context.WithTimeout(context.Background(), 
h.consumer.config.Net.ReadTimeout)
+       defer cancel()
+       rsp, err := h.consumer.client.HeartRequestC2M(ctx, m, 
h.consumer.subInfo, h.consumer.rmtDataCache)
+       return rsp, err
+}
+
 func (h *heartbeatManager) processHBResponseM2C(rsp 
*protocol.HeartResponseM2C) {
        h.consumer.masterHBRetry = 0
        h.consumer.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
@@ -158,7 +161,7 @@ func (h *heartbeatManager) processHBResponseM2C(rsp 
*protocol.HeartResponseM2C)
                        subscribeInfo = append(subscribeInfo, s)
                }
                e := metadata.NewEvent(event.GetRebalanceId(), 
event.GetOpType(), subscribeInfo)
-               h.consumer.rmtDataCache.OfferEvent(e)
+               h.consumer.rmtDataCache.OfferEventAndNotify(e)
        }
 }
 
@@ -179,16 +182,12 @@ func (h *heartbeatManager) consumerHB2Broker(broker 
*metadata.Node) {
                h.resetBrokerTimer(broker)
                return
        }
-       m := &metadata.Metadata{}
-       m.SetReadStatus(h.consumer.getConsumeReadStatus(false))
-       m.SetNode(broker)
-       ctx, cancel := context.WithTimeout(context.Background(), 
h.consumer.config.Net.ReadTimeout)
-       defer cancel()
 
-       rsp, err := h.consumer.client.HeartbeatRequestC2B(ctx, m, 
h.consumer.subInfo, h.consumer.rmtDataCache)
+       rsp, err := h.sendHeartbeatC2B(broker)
        if err != nil {
                return
        }
+
        if rsp.GetSuccess() {
                if rsp.GetHasPartFailure() {
                        partitionKeys := make([]string, 0, 
len(rsp.GetFailureInfo()))
@@ -217,6 +216,16 @@ func (h *heartbeatManager) consumerHB2Broker(broker 
*metadata.Node) {
        h.resetBrokerTimer(broker)
 }
 
+func (h *heartbeatManager) sendHeartbeatC2B(broker *metadata.Node) 
(*protocol.HeartBeatResponseB2C, error) {
+       m := &metadata.Metadata{}
+       m.SetReadStatus(h.consumer.getConsumeReadStatus(false))
+       m.SetNode(broker)
+       ctx, cancel := context.WithTimeout(context.Background(), 
h.consumer.config.Net.ReadTimeout)
+       defer cancel()
+       rsp, err := h.consumer.client.HeartbeatRequestC2B(ctx, m, 
h.consumer.subInfo, h.consumer.rmtDataCache)
+       return rsp, err
+}
+
 func (h *heartbeatManager) resetBrokerTimer(broker *metadata.Node) {
        interval := h.consumer.config.Heartbeat.Interval
        partitions := h.consumer.rmtDataCache.GetPartitionByBroker(broker)
diff --git a/tubemq-client-twins/tubemq-client-go/client/version.go 
b/tubemq-client-twins/tubemq-client-go/client/version.go
new file mode 100644
index 0000000..1828fd7
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/client/version.go
@@ -0,0 +1,5 @@
+package client
+
+const (
+       tubeMQClientVersion = "0.1.0"
+)
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go 
b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
index 6ce2915..08295c5 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
@@ -17,6 +17,13 @@
 
 package metadata
 
+const (
+       Disconnect     = 2
+       OnlyDisconnect = 20
+       Connect        = 1
+       OnlyConnect    = 10
+)
+
 // ConsumerEvent represents the metadata of a consumer event
 type ConsumerEvent struct {
        rebalanceID   int64
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go 
b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 355e09b..bfc63b7 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -39,7 +39,6 @@ type RmtDataCache struct {
        eventReadMu        sync.Mutex
        metaMu             sync.Mutex
        dataBookMu         sync.Mutex
-       eventReadCond      *sync.Cond
        brokerPartitions   map[*metadata.Node]map[string]bool
        qryPriorityID      int32
        partitions         map[string]*metadata.Partition
@@ -48,6 +47,8 @@ type RmtDataCache struct {
        partitionTimeouts  map[string]*time.Timer
        topicPartitions    map[string]map[string]bool
        partitionRegBooked map[string]bool
+       // EventCh is the channel for consumer to consume
+       EventCh chan *metadata.ConsumerEvent
 }
 
 // NewRmtDataCache returns a default rmtDataCache.
@@ -65,8 +66,8 @@ func NewRmtDataCache() *RmtDataCache {
                partitionTimeouts:  make(map[string]*time.Timer),
                topicPartitions:    make(map[string]map[string]bool),
                partitionRegBooked: make(map[string]bool),
+               EventCh:            make(chan *metadata.ConsumerEvent, 1),
        }
-       r.eventReadCond = sync.NewCond(&r.eventReadMu)
        return r
 }
 
@@ -149,24 +150,14 @@ func (r *RmtDataCache) 
UpdateGroupFlowCtrlInfo(qryPriorityID int32, flowCtrlID i
 
 }
 
-// OfferEvent offers an consumer event and notifies the consumer method.
-func (r *RmtDataCache) OfferEvent(event *metadata.ConsumerEvent) {
+// OfferEventAndNotify offers an consumer event and notifies the consumer 
method and notify the consumer to consume.
+func (r *RmtDataCache) OfferEventAndNotify(event *metadata.ConsumerEvent) {
        r.eventReadMu.Lock()
        defer r.eventReadMu.Unlock()
        r.rebalanceResults = append(r.rebalanceResults, event)
-       r.eventReadCond.Broadcast()
-}
-
-// TakeEvent takes an event from the rebalanceResults.
-func (r *RmtDataCache) TakeEvent() *metadata.ConsumerEvent {
-       r.eventReadMu.Lock()
-       defer r.eventReadMu.Unlock()
-       for len(r.rebalanceResults) == 0 {
-               r.eventReadCond.Wait()
-       }
-       event := r.rebalanceResults[0]
+       e := r.rebalanceResults[0]
        r.rebalanceResults = r.rebalanceResults[1:]
-       return event
+       r.EventCh <- e
 }
 
 // ClearEvent clears all the events.

Reply via email to