This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch INLONG-25 in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 2d64bb530f302ab97bbc2930197909b10e8152df Author: Zijie Lu <[email protected]> AuthorDate: Tue Jun 8 17:40:17 2021 +0800 Address review comments Signed-off-by: Zijie Lu <[email protected]> --- .../tubemq-client-go/client/consumer.go | 6 +- .../tubemq-client-go/client/consumer_impl.go | 159 ++++++++++++--------- .../tubemq-client-go/client/heartbeat.go | 35 +++-- .../tubemq-client-go/client/version.go | 5 + .../tubemq-client-go/metadata/consumer_event.go | 7 + .../tubemq-client-go/remote/remote.go | 23 +-- 6 files changed, 130 insertions(+), 105 deletions(-) diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go b/tubemq-client-twins/tubemq-client-go/client/consumer.go index 27a8536..a74dfa5 100644 --- a/tubemq-client-twins/tubemq-client-go/client/consumer.go +++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go @@ -19,10 +19,6 @@ // which can be exposed to user. package client -const ( - tubeMQClientVersion = "0.1.0" -) - // ConsumerResult of a consumption. type ConsumerResult struct { } @@ -31,7 +27,7 @@ type ConsumerResult struct { type ConsumerOffset struct { } -var clientIndex uint64 +var clientID uint64 // Consumer is an interface that abstracts behavior of TubeMQ's consumer type Consumer interface { diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go index b7312bb..674f810 100644 --- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go +++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go @@ -61,6 +61,7 @@ type consumer struct { masterHBRetry int heartbeatManager *heartbeatManager unreportedTimes int + done chan struct{} } // NewConsumer returns a consumer which is constructed by a given config. @@ -70,7 +71,7 @@ func NewConsumer(config *config.Config) (Consumer, error) { return nil, err } - clientID := newClientID(config.Consumer.Group) + clientID := newClient(config.Consumer.Group) pool := multiplexing.NewPool() opts := &transport.Options{} if config.Net.TLS.Enable { @@ -118,40 +119,18 @@ func (c *consumer) register2Master(needChange bool) error { c.master = node } for c.master.HasNext { - ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) - - m := &metadata.Metadata{} - node := &metadata.Node{} - node.SetHost(util.GetLocalHost()) - node.SetAddress(c.master.Address) - m.SetNode(node) - sub := &metadata.SubscribeInfo{} - sub.SetGroup(c.config.Consumer.Group) - m.SetSubscribeInfo(sub) - - auth := &protocol.AuthenticateInfo{} - c.genMasterAuthenticateToken(auth, true) - mci := &protocol.MasterCertificateInfo{ - AuthInfo: auth, - } - c.subInfo.SetMasterCertificateInfo(mci) - - rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache) + rsp, err := c.sendRegRequest2Master() if err != nil { - cancel() return err } if rsp.GetSuccess() { c.masterHBRetry = 0 c.processRegisterResponseM2C(rsp) - cancel() return nil } else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden { - cancel() return nil } else { c.master, err = c.selector.Select(c.config.Consumer.Masters) - cancel() if err != nil { return err } @@ -160,6 +139,30 @@ func (c *consumer) register2Master(needChange bool) error { return nil } +func (c *consumer) sendRegRequest2Master() (*protocol.RegisterResponseM2C, error) { + ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) + defer cancel() + + m := &metadata.Metadata{} + node := &metadata.Node{} + node.SetHost(util.GetLocalHost()) + node.SetAddress(c.master.Address) + m.SetNode(node) + sub := &metadata.SubscribeInfo{} + sub.SetGroup(c.config.Consumer.Group) + m.SetSubscribeInfo(sub) + + auth := &protocol.AuthenticateInfo{} + c.genMasterAuthenticateToken(auth, true) + mci := &protocol.MasterCertificateInfo{ + AuthInfo: auth, + } + c.subInfo.SetMasterCertificateInfo(mci) + + rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache) + return rsp, err +} + func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) { if rsp.GetNotAllocated() { c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated()) @@ -202,19 +205,25 @@ func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) { func (c *consumer) processRebalanceEvent() { for { - event := c.rmtDataCache.TakeEvent() - if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue { + select { + case event, ok := <-c.rmtDataCache.EventCh: + if ok { + if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue { + break + } + c.rmtDataCache.ClearEvent() + switch event.GetEventType() { + case metadata.Disconnect, metadata.OnlyDisconnect: + c.disconnect2Broker(event) + c.rmtDataCache.OfferEventResult(event) + case metadata.Connect, metadata.OnlyConnect: + c.connect2Broker(event) + c.rmtDataCache.OfferEventResult(event) + } + } + case <-c.done: break } - c.rmtDataCache.ClearEvent() - switch event.GetEventType() { - case 2, 20: - c.disconnect2Broker(event) - c.rmtDataCache.OfferEventResult(event) - case 1, 10: - c.connect2Broker(event) - c.rmtDataCache.OfferEventResult(event) - } } } @@ -237,48 +246,41 @@ func (c *consumer) unregister2Broker(unRegPartitions map[*metadata.Node][]*metad for _, partitions := range unRegPartitions { for _, partition := range partitions { - ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) - - m := &metadata.Metadata{} - node := &metadata.Node{} - node.SetHost(util.GetLocalHost()) - node.SetAddress(partition.GetBroker().GetAddress()) - m.SetNode(node) - m.SetReadStatus(1) - sub := &metadata.SubscribeInfo{} - sub.SetGroup(c.config.Consumer.Group) - sub.SetConsumerID(c.clientID) - sub.SetPartition(partition) - m.SetSubscribeInfo(sub) - - c.client.UnregisterRequestC2B(ctx, m, c.subInfo) - cancel() + c.sendUnregisterReq2Broker(partition) } } } +func (c *consumer) sendUnregisterReq2Broker(partition *metadata.Partition) { + ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) + defer cancel() + + m := &metadata.Metadata{} + node := &metadata.Node{} + node.SetHost(util.GetLocalHost()) + node.SetAddress(partition.GetBroker().GetAddress()) + m.SetNode(node) + m.SetReadStatus(1) + sub := &metadata.SubscribeInfo{} + sub.SetGroup(c.config.Consumer.Group) + sub.SetConsumerID(c.clientID) + sub.SetPartition(partition) + m.SetSubscribeInfo(sub) + + c.client.UnregisterRequestC2B(ctx, m, c.subInfo) +} + func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) { if len(event.GetSubscribeInfo()) > 0 { unsubPartitions := c.rmtDataCache.FilterPartitions(event.GetSubscribeInfo()) if len(unsubPartitions) > 0 { for _, partition := range unsubPartitions { - m := &metadata.Metadata{} + node := &metadata.Node{} node.SetHost(util.GetLocalHost()) node.SetAddress(partition.GetBroker().GetAddress()) - m.SetNode(node) - sub := &metadata.SubscribeInfo{} - sub.SetGroup(c.config.Consumer.Group) - sub.SetConsumerID(c.clientID) - sub.SetPartition(partition) - m.SetSubscribeInfo(sub) - isFirstRegister := c.rmtDataCache.IsFirstRegister(partition.GetPartitionKey()) - m.SetReadStatus(c.getConsumeReadStatus(isFirstRegister)) - auth := c.genBrokerAuthenticInfo(true) - c.subInfo.SetAuthorizedInfo(auth) - - ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) - rsp, err := c.client.RegisterRequestC2B(ctx, m, c.subInfo, c.rmtDataCache) + + rsp, err := c.sendRegisterReq2Broker(partition, node) if err != nil { //todo add log } @@ -286,24 +288,39 @@ func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) { c.rmtDataCache.AddNewPartition(partition) c.heartbeatManager.registerBroker(node) } - cancel() } } } c.subInfo.FirstRegistered() - event.SetEventStatus(2) + event.SetEventStatus(metadata.Disconnect) } -func newClientIndex() uint64 { - return atomic.AddUint64(&clientIndex, 1) +func (c *consumer) sendRegisterReq2Broker(partition *metadata.Partition, node *metadata.Node) (*protocol.RegisterResponseB2C, error) { + ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout) + defer cancel() + + m := &metadata.Metadata{} + m.SetNode(node) + sub := &metadata.SubscribeInfo{} + sub.SetGroup(c.config.Consumer.Group) + sub.SetConsumerID(c.clientID) + sub.SetPartition(partition) + m.SetSubscribeInfo(sub) + isFirstRegister := c.rmtDataCache.IsFirstRegister(partition.GetPartitionKey()) + m.SetReadStatus(c.getConsumeReadStatus(isFirstRegister)) + auth := c.genBrokerAuthenticInfo(true) + c.subInfo.SetAuthorizedInfo(auth) + + rsp, err := c.client.RegisterRequestC2B(ctx, m, c.subInfo, c.rmtDataCache) + return rsp, err } -func newClientID(group string) string { +func newClient(group string) string { return group + "_" + util.GetLocalHost() + "_" + strconv.Itoa(os.Getpid()) + "_" + strconv.Itoa(int(time.Now().Unix()*1000)) + "_" + - strconv.Itoa(int(newClientIndex())) + "_" + + strconv.Itoa(int(atomic.AddUint64(&clientID, 1))) + "_" + tubeMQClientVersion } diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go index 9c29854..5ec0b5c 100644 --- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go +++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go @@ -95,17 +95,14 @@ func (h *heartbeatManager) consumerHB2Master() { retry := 0 for retry < h.consumer.config.Heartbeat.MaxRetryTimes { - ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout) - rsp, err := h.consumer.client.HeartRequestC2M(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache) + rsp, err := h.sendHeartbeatC2M(m) if err != nil { - cancel() + continue } if rsp.GetSuccess() { - cancel() h.processHBResponseM2C(rsp) break } else if rsp.GetErrCode() == errs.RetErrHBNoNode || strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 { - cancel() h.consumer.masterHBRetry++ address := h.consumer.master.Address go h.consumer.register2Master(rsp.GetErrCode() != errs.RetErrHBNoNode) @@ -120,7 +117,6 @@ func (h *heartbeatManager) consumerHB2Master() { } return } - cancel() } h.mu.Lock() defer h.mu.Unlock() @@ -128,6 +124,13 @@ func (h *heartbeatManager) consumerHB2Master() { hm.timer.Reset(h.nextHeartbeatInterval()) } +func (h *heartbeatManager) sendHeartbeatC2M(m *metadata.Metadata) (*protocol.HeartResponseM2C, error) { + ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout) + defer cancel() + rsp, err := h.consumer.client.HeartRequestC2M(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache) + return rsp, err +} + func (h *heartbeatManager) processHBResponseM2C(rsp *protocol.HeartResponseM2C) { h.consumer.masterHBRetry = 0 h.consumer.subInfo.SetIsNotAllocated(rsp.GetNotAllocated()) @@ -158,7 +161,7 @@ func (h *heartbeatManager) processHBResponseM2C(rsp *protocol.HeartResponseM2C) subscribeInfo = append(subscribeInfo, s) } e := metadata.NewEvent(event.GetRebalanceId(), event.GetOpType(), subscribeInfo) - h.consumer.rmtDataCache.OfferEvent(e) + h.consumer.rmtDataCache.OfferEventAndNotify(e) } } @@ -179,16 +182,12 @@ func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) { h.resetBrokerTimer(broker) return } - m := &metadata.Metadata{} - m.SetReadStatus(h.consumer.getConsumeReadStatus(false)) - m.SetNode(broker) - ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout) - defer cancel() - rsp, err := h.consumer.client.HeartbeatRequestC2B(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache) + rsp, err := h.sendHeartbeatC2B(broker) if err != nil { return } + if rsp.GetSuccess() { if rsp.GetHasPartFailure() { partitionKeys := make([]string, 0, len(rsp.GetFailureInfo())) @@ -217,6 +216,16 @@ func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) { h.resetBrokerTimer(broker) } +func (h *heartbeatManager) sendHeartbeatC2B(broker *metadata.Node) (*protocol.HeartBeatResponseB2C, error) { + m := &metadata.Metadata{} + m.SetReadStatus(h.consumer.getConsumeReadStatus(false)) + m.SetNode(broker) + ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout) + defer cancel() + rsp, err := h.consumer.client.HeartbeatRequestC2B(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache) + return rsp, err +} + func (h *heartbeatManager) resetBrokerTimer(broker *metadata.Node) { interval := h.consumer.config.Heartbeat.Interval partitions := h.consumer.rmtDataCache.GetPartitionByBroker(broker) diff --git a/tubemq-client-twins/tubemq-client-go/client/version.go b/tubemq-client-twins/tubemq-client-go/client/version.go new file mode 100644 index 0000000..1828fd7 --- /dev/null +++ b/tubemq-client-twins/tubemq-client-go/client/version.go @@ -0,0 +1,5 @@ +package client + +const ( + tubeMQClientVersion = "0.1.0" +) diff --git a/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go index 6ce2915..08295c5 100644 --- a/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go +++ b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go @@ -17,6 +17,13 @@ package metadata +const ( + Disconnect = 2 + OnlyDisconnect = 20 + Connect = 1 + OnlyConnect = 10 +) + // ConsumerEvent represents the metadata of a consumer event type ConsumerEvent struct { rebalanceID int64 diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go index 355e09b..bfc63b7 100644 --- a/tubemq-client-twins/tubemq-client-go/remote/remote.go +++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go @@ -39,7 +39,6 @@ type RmtDataCache struct { eventReadMu sync.Mutex metaMu sync.Mutex dataBookMu sync.Mutex - eventReadCond *sync.Cond brokerPartitions map[*metadata.Node]map[string]bool qryPriorityID int32 partitions map[string]*metadata.Partition @@ -48,6 +47,8 @@ type RmtDataCache struct { partitionTimeouts map[string]*time.Timer topicPartitions map[string]map[string]bool partitionRegBooked map[string]bool + // EventCh is the channel for consumer to consume + EventCh chan *metadata.ConsumerEvent } // NewRmtDataCache returns a default rmtDataCache. @@ -65,8 +66,8 @@ func NewRmtDataCache() *RmtDataCache { partitionTimeouts: make(map[string]*time.Timer), topicPartitions: make(map[string]map[string]bool), partitionRegBooked: make(map[string]bool), + EventCh: make(chan *metadata.ConsumerEvent, 1), } - r.eventReadCond = sync.NewCond(&r.eventReadMu) return r } @@ -149,24 +150,14 @@ func (r *RmtDataCache) UpdateGroupFlowCtrlInfo(qryPriorityID int32, flowCtrlID i } -// OfferEvent offers an consumer event and notifies the consumer method. -func (r *RmtDataCache) OfferEvent(event *metadata.ConsumerEvent) { +// OfferEventAndNotify offers an consumer event and notifies the consumer method and notify the consumer to consume. +func (r *RmtDataCache) OfferEventAndNotify(event *metadata.ConsumerEvent) { r.eventReadMu.Lock() defer r.eventReadMu.Unlock() r.rebalanceResults = append(r.rebalanceResults, event) - r.eventReadCond.Broadcast() -} - -// TakeEvent takes an event from the rebalanceResults. -func (r *RmtDataCache) TakeEvent() *metadata.ConsumerEvent { - r.eventReadMu.Lock() - defer r.eventReadMu.Unlock() - for len(r.rebalanceResults) == 0 { - r.eventReadCond.Wait() - } - event := r.rebalanceResults[0] + e := r.rebalanceResults[0] r.rebalanceResults = r.rebalanceResults[1:] - return event + r.EventCh <- e } // ClearEvent clears all the events.
