This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 65cfc8fdd9ba9aa5856eb3cba63ed2bd801f5ad2
Author: Zijie Lu <[email protected]>
AuthorDate: Thu Jun 10 10:55:19 2021 +0800

    [INLONG-624]Go SDK Consumer Start API
    
    Signed-off-by: Zijie Lu <[email protected]>
---
 .../tubemq-client-go/client/consumer.go            |  2 -
 .../tubemq-client-go/client/consumer_impl.go       | 42 ++++++-------
 .../tubemq-client-go/client/heartbeat.go           | 70 +++++++++++-----------
 3 files changed, 55 insertions(+), 59 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go 
b/tubemq-client-twins/tubemq-client-go/client/consumer.go
index a74dfa5..27a63c6 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -31,8 +31,6 @@ var clientID uint64
 
 // Consumer is an interface that abstracts behavior of TubeMQ's consumer
 type Consumer interface {
-       // Start starts the consumer.
-       Start() error
        // GetMessage receive a single message.
        GetMessage() (*ConsumerResult, error)
        // Confirm the consumption of a message.
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go 
b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 30b93b4..f4211c5 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -97,18 +97,13 @@ func NewConsumer(config *config.Config) (Consumer, error) {
        c.subInfo.SetClientID(clientID)
        hbm := newHBManager(c)
        c.heartbeatManager = hbm
-       return c, nil
-}
-
-// Start implementation of tubeMQ consumer.
-func (c *consumer) Start() error {
-       err := c.register2Master(false)
+       err = c.register2Master(false)
        if err != nil {
-               return err
+               return nil, err
        }
        c.heartbeatManager.registerMaster(c.master.Address)
        go c.processRebalanceEvent()
-       return nil
+       return c, nil
 }
 
 func (c *consumer) register2Master(needChange bool) error {
@@ -124,18 +119,20 @@ func (c *consumer) register2Master(needChange bool) error 
{
                if err != nil {
                        return err
                }
-               if rsp.GetSuccess() {
-                       c.masterHBRetry = 0
-                       c.processRegisterResponseM2C(rsp)
-                       return nil
-               } else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || 
rsp.GetErrCode() == errs.RetConsumeContentForbidden {
-                       return nil
-               } else {
-                       c.master, err = 
c.selector.Select(c.config.Consumer.Masters)
-                       if err != nil {
+               if !rsp.GetSuccess() {
+                       if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || 
rsp.GetErrCode() == errs.RetConsumeContentForbidden {
+                               return errs.New(rsp.GetErrCode(), 
rsp.GetErrMsg())
+                       }
+
+                       if c.master, err = 
c.selector.Select(c.config.Consumer.Masters); err != nil {
                                return err
                        }
+                       continue
                }
+
+               c.masterHBRetry = 0
+               c.processRegisterResponseM2C(rsp)
+               return nil
        }
        return nil
 }
@@ -276,7 +273,6 @@ func (c *consumer) connect2Broker(event 
*metadata.ConsumerEvent) {
                unsubPartitions := 
c.rmtDataCache.FilterPartitions(event.GetSubscribeInfo())
                if len(unsubPartitions) > 0 {
                        for _, partition := range unsubPartitions {
-
                                node := &metadata.Node{}
                                node.SetHost(util.GetLocalHost())
                                
node.SetAddress(partition.GetBroker().GetAddress())
@@ -284,11 +280,15 @@ func (c *consumer) connect2Broker(event 
*metadata.ConsumerEvent) {
                                rsp, err := c.sendRegisterReq2Broker(partition, 
node)
                                if err != nil {
                                        //todo add log
+                                       return
                                }
-                               if rsp.GetSuccess() {
-                                       
c.rmtDataCache.AddNewPartition(partition)
-                                       c.heartbeatManager.registerBroker(node)
+                               if !rsp.GetSuccess() {
+                                       //todo add log
+                                       return
                                }
+
+                               c.rmtDataCache.AddNewPartition(partition)
+                               c.heartbeatManager.registerBroker(node)
                        }
                }
        }
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go 
b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index 5ec0b5c..fdd0d20 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -99,24 +99,27 @@ func (h *heartbeatManager) consumerHB2Master() {
                if err != nil {
                        continue
                }
-               if rsp.GetSuccess() {
-                       h.processHBResponseM2C(rsp)
-                       break
-               } else if rsp.GetErrCode() == errs.RetErrHBNoNode || 
strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 {
+
+               if !rsp.GetSuccess() {
                        h.consumer.masterHBRetry++
-                       address := h.consumer.master.Address
-                       go h.consumer.register2Master(rsp.GetErrCode() != 
errs.RetErrHBNoNode)
-                       if rsp.GetErrCode() != errs.RetErrHBNoNode {
-                               hm := h.heartbeats[address]
-                               hm.numConnections--
-                               if hm.numConnections == 0 {
-                                       h.mu.Lock()
-                                       delete(h.heartbeats, address)
-                                       h.mu.Unlock()
+                       if rsp.GetErrCode() == errs.RetErrHBNoNode || 
strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 {
+                               address := h.consumer.master.Address
+                               go h.consumer.register2Master(rsp.GetErrCode() 
!= errs.RetErrHBNoNode)
+                               if rsp.GetErrCode() != errs.RetErrHBNoNode {
+                                       hm := h.heartbeats[address]
+                                       hm.numConnections--
+                                       if hm.numConnections == 0 {
+                                               h.mu.Lock()
+                                               delete(h.heartbeats, address)
+                                               h.mu.Unlock()
+                                       }
                                }
+                               return
                        }
-                       return
                }
+               h.consumer.masterHBRetry = 0
+               h.processHBResponseM2C(rsp)
+               break
        }
        h.mu.Lock()
        defer h.mu.Unlock()
@@ -187,31 +190,26 @@ func (h *heartbeatManager) consumerHB2Broker(broker 
*metadata.Node) {
        if err != nil {
                return
        }
-
-       if rsp.GetSuccess() {
-               if rsp.GetHasPartFailure() {
-                       partitionKeys := make([]string, 0, 
len(rsp.GetFailureInfo()))
-                       for _, fi := range rsp.GetFailureInfo() {
-                               pos := strings.Index(fi, ":")
-                               if pos == -1 {
-                                       continue
-                               }
-                               partition, err := 
metadata.NewPartition(fi[pos+1:])
-                               if err != nil {
-                                       continue
-                               }
-                               partitionKeys = append(partitionKeys, 
partition.GetPartitionKey())
+       partitionKeys := make([]string, 0, len(partitions))
+       if rsp.GetErrCode() == errs.RetCertificateFailure {
+               for _, partition := range partitions {
+                       partitionKeys = append(partitionKeys, 
partition.GetPartitionKey())
+               }
+               h.consumer.rmtDataCache.RemovePartition(partitionKeys)
+       }
+       if rsp.GetSuccess() && rsp.GetHasPartFailure() {
+               for _, fi := range rsp.GetFailureInfo() {
+                       pos := strings.Index(fi, ":")
+                       if pos == -1 {
+                               continue
                        }
-                       h.consumer.rmtDataCache.RemovePartition(partitionKeys)
-               } else {
-                       if rsp.GetErrCode() == errs.RetCertificateFailure {
-                               partitionKeys := make([]string, 0, 
len(partitions))
-                               for _, partition := range partitions {
-                                       partitionKeys = append(partitionKeys, 
partition.GetPartitionKey())
-                               }
-                               
h.consumer.rmtDataCache.RemovePartition(partitionKeys)
+                       partition, err := metadata.NewPartition(fi[pos+1:])
+                       if err != nil {
+                               continue
                        }
+                       partitionKeys = append(partitionKeys, 
partition.GetPartitionKey())
                }
+               h.consumer.rmtDataCache.RemovePartition(partitionKeys)
        }
        h.resetBrokerTimer(broker)
 }

Reply via email to