This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch INLONG-25 in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 65cfc8fdd9ba9aa5856eb3cba63ed2bd801f5ad2 Author: Zijie Lu <[email protected]> AuthorDate: Thu Jun 10 10:55:19 2021 +0800 [INLONG-624]Go SDK Consumer Start API Signed-off-by: Zijie Lu <[email protected]> --- .../tubemq-client-go/client/consumer.go | 2 - .../tubemq-client-go/client/consumer_impl.go | 42 ++++++------- .../tubemq-client-go/client/heartbeat.go | 70 +++++++++++----------- 3 files changed, 55 insertions(+), 59 deletions(-) diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go b/tubemq-client-twins/tubemq-client-go/client/consumer.go index a74dfa5..27a63c6 100644 --- a/tubemq-client-twins/tubemq-client-go/client/consumer.go +++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go @@ -31,8 +31,6 @@ var clientID uint64 // Consumer is an interface that abstracts behavior of TubeMQ's consumer type Consumer interface { - // Start starts the consumer. - Start() error // GetMessage receive a single message. GetMessage() (*ConsumerResult, error) // Confirm the consumption of a message. diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go index 30b93b4..f4211c5 100644 --- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go +++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go @@ -97,18 +97,13 @@ func NewConsumer(config *config.Config) (Consumer, error) { c.subInfo.SetClientID(clientID) hbm := newHBManager(c) c.heartbeatManager = hbm - return c, nil -} - -// Start implementation of tubeMQ consumer. -func (c *consumer) Start() error { - err := c.register2Master(false) + err = c.register2Master(false) if err != nil { - return err + return nil, err } c.heartbeatManager.registerMaster(c.master.Address) go c.processRebalanceEvent() - return nil + return c, nil } func (c *consumer) register2Master(needChange bool) error { @@ -124,18 +119,20 @@ func (c *consumer) register2Master(needChange bool) error { if err != nil { return err } - if rsp.GetSuccess() { - c.masterHBRetry = 0 - c.processRegisterResponseM2C(rsp) - return nil - } else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden { - return nil - } else { - c.master, err = c.selector.Select(c.config.Consumer.Masters) - if err != nil { + if !rsp.GetSuccess() { + if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden { + return errs.New(rsp.GetErrCode(), rsp.GetErrMsg()) + } + + if c.master, err = c.selector.Select(c.config.Consumer.Masters); err != nil { return err } + continue } + + c.masterHBRetry = 0 + c.processRegisterResponseM2C(rsp) + return nil } return nil } @@ -276,7 +273,6 @@ func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) { unsubPartitions := c.rmtDataCache.FilterPartitions(event.GetSubscribeInfo()) if len(unsubPartitions) > 0 { for _, partition := range unsubPartitions { - node := &metadata.Node{} node.SetHost(util.GetLocalHost()) node.SetAddress(partition.GetBroker().GetAddress()) @@ -284,11 +280,15 @@ func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) { rsp, err := c.sendRegisterReq2Broker(partition, node) if err != nil { //todo add log + return } - if rsp.GetSuccess() { - c.rmtDataCache.AddNewPartition(partition) - c.heartbeatManager.registerBroker(node) + if !rsp.GetSuccess() { + //todo add log + return } + + c.rmtDataCache.AddNewPartition(partition) + c.heartbeatManager.registerBroker(node) } } } diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go index 5ec0b5c..fdd0d20 100644 --- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go +++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go @@ -99,24 +99,27 @@ func (h *heartbeatManager) consumerHB2Master() { if err != nil { continue } - if rsp.GetSuccess() { - h.processHBResponseM2C(rsp) - break - } else if rsp.GetErrCode() == errs.RetErrHBNoNode || strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 { + + if !rsp.GetSuccess() { h.consumer.masterHBRetry++ - address := h.consumer.master.Address - go h.consumer.register2Master(rsp.GetErrCode() != errs.RetErrHBNoNode) - if rsp.GetErrCode() != errs.RetErrHBNoNode { - hm := h.heartbeats[address] - hm.numConnections-- - if hm.numConnections == 0 { - h.mu.Lock() - delete(h.heartbeats, address) - h.mu.Unlock() + if rsp.GetErrCode() == errs.RetErrHBNoNode || strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 { + address := h.consumer.master.Address + go h.consumer.register2Master(rsp.GetErrCode() != errs.RetErrHBNoNode) + if rsp.GetErrCode() != errs.RetErrHBNoNode { + hm := h.heartbeats[address] + hm.numConnections-- + if hm.numConnections == 0 { + h.mu.Lock() + delete(h.heartbeats, address) + h.mu.Unlock() + } } + return } - return } + h.consumer.masterHBRetry = 0 + h.processHBResponseM2C(rsp) + break } h.mu.Lock() defer h.mu.Unlock() @@ -187,31 +190,26 @@ func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) { if err != nil { return } - - if rsp.GetSuccess() { - if rsp.GetHasPartFailure() { - partitionKeys := make([]string, 0, len(rsp.GetFailureInfo())) - for _, fi := range rsp.GetFailureInfo() { - pos := strings.Index(fi, ":") - if pos == -1 { - continue - } - partition, err := metadata.NewPartition(fi[pos+1:]) - if err != nil { - continue - } - partitionKeys = append(partitionKeys, partition.GetPartitionKey()) + partitionKeys := make([]string, 0, len(partitions)) + if rsp.GetErrCode() == errs.RetCertificateFailure { + for _, partition := range partitions { + partitionKeys = append(partitionKeys, partition.GetPartitionKey()) + } + h.consumer.rmtDataCache.RemovePartition(partitionKeys) + } + if rsp.GetSuccess() && rsp.GetHasPartFailure() { + for _, fi := range rsp.GetFailureInfo() { + pos := strings.Index(fi, ":") + if pos == -1 { + continue } - h.consumer.rmtDataCache.RemovePartition(partitionKeys) - } else { - if rsp.GetErrCode() == errs.RetCertificateFailure { - partitionKeys := make([]string, 0, len(partitions)) - for _, partition := range partitions { - partitionKeys = append(partitionKeys, partition.GetPartitionKey()) - } - h.consumer.rmtDataCache.RemovePartition(partitionKeys) + partition, err := metadata.NewPartition(fi[pos+1:]) + if err != nil { + continue } + partitionKeys = append(partitionKeys, partition.GetPartitionKey()) } + h.consumer.rmtDataCache.RemovePartition(partitionKeys) } h.resetBrokerTimer(broker) }
