This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch INLONG-25 in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit a2af6f05690e750b899cb5e5257010459d693959 Author: Zijie Lu <[email protected]> AuthorDate: Fri Jun 4 10:20:54 2021 +0800 Implement offerEventResult Signed-off-by: Zijie Lu <[email protected]> --- .../tubemq-client-go/client/consumer_impl.go | 7 ++-- .../tubemq-client-go/remote/remote.go | 39 ++++++++++++++-------- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go index c0f7e9a..b7312bb 100644 --- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go +++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go @@ -203,9 +203,6 @@ func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) { func (c *consumer) processRebalanceEvent() { for { event := c.rmtDataCache.TakeEvent() - if event == nil { - continue - } if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue { break } @@ -213,10 +210,10 @@ func (c *consumer) processRebalanceEvent() { switch event.GetEventType() { case 2, 20: c.disconnect2Broker(event) - c.rmtDataCache.OfferEvent(event) + c.rmtDataCache.OfferEventResult(event) case 1, 10: c.connect2Broker(event) - c.rmtDataCache.OfferEvent(event) + c.rmtDataCache.OfferEventResult(event) } } } diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go index aa9a2d1..1d82c89 100644 --- a/tubemq-client-twins/tubemq-client-go/remote/remote.go +++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go @@ -35,9 +35,11 @@ type RmtDataCache struct { groupFlowCtrlID int64 partitionSubInfo map[string]*metadata.SubscribeInfo rebalanceResults []*metadata.ConsumerEvent - eventMu sync.Mutex + eventWriteMu sync.Mutex + eventReadMu sync.Mutex metaMu sync.Mutex dataBookMu sync.Mutex + eventReadCond *sync.Cond brokerPartitions map[*metadata.Node]map[string]bool qryPriorityID int32 partitions map[string]*metadata.Partition @@ -50,7 +52,7 @@ type RmtDataCache struct { // NewRmtDataCache returns a default rmtDataCache. func NewRmtDataCache() *RmtDataCache { - return &RmtDataCache{ + r := &RmtDataCache{ defFlowCtrlID: util.InvalidValue, groupFlowCtrlID: util.InvalidValue, qryPriorityID: int32(util.InvalidValue), @@ -64,6 +66,8 @@ func NewRmtDataCache() *RmtDataCache { topicPartitions: make(map[string]map[string]bool), partitionRegBooked: make(map[string]bool), } + r.eventReadCond = sync.NewCond(&r.eventReadMu) + return r } // GetUnderGroupCtrl returns the underGroupCtrl. @@ -104,8 +108,8 @@ func (r *RmtDataCache) GetQryPriorityID() int32 { // PollEventResult polls the first event result from the rebalanceResults. func (r *RmtDataCache) PollEventResult() *metadata.ConsumerEvent { - r.eventMu.Lock() - defer r.eventMu.Unlock() + r.eventWriteMu.Lock() + defer r.eventWriteMu.Unlock() if len(r.rebalanceResults) > 0 { event := r.rebalanceResults[0] r.rebalanceResults = r.rebalanceResults[1:] @@ -145,19 +149,20 @@ func (r *RmtDataCache) UpdateGroupFlowCtrlInfo(qryPriorityID int32, flowCtrlID i } -// OfferEvent offers an consumer event. +// OfferEvent offers an consumer event and notifies the consumer method. func (r *RmtDataCache) OfferEvent(event *metadata.ConsumerEvent) { - r.eventMu.Lock() - defer r.eventMu.Unlock() + r.eventReadMu.Lock() + defer r.eventReadMu.Unlock() r.rebalanceResults = append(r.rebalanceResults, event) + r.eventReadCond.Broadcast() } // TakeEvent takes an event from the rebalanceResults. func (r *RmtDataCache) TakeEvent() *metadata.ConsumerEvent { - r.eventMu.Lock() - defer r.eventMu.Unlock() - if len(r.rebalanceResults) == 0 { - return nil + r.eventReadMu.Lock() + defer r.eventReadMu.Unlock() + for len(r.rebalanceResults) == 0 { + r.eventReadCond.Wait() } event := r.rebalanceResults[0] r.rebalanceResults = r.rebalanceResults[1:] @@ -166,11 +171,19 @@ func (r *RmtDataCache) TakeEvent() *metadata.ConsumerEvent { // ClearEvent clears all the events. func (r *RmtDataCache) ClearEvent() { - r.eventMu.Lock() - defer r.eventMu.Unlock() + r.eventWriteMu.Lock() + defer r.eventWriteMu.Unlock() r.rebalanceResults = r.rebalanceResults[:0] } +// OfferEventResult offers an consumer event. +func (r *RmtDataCache) OfferEventResult(event *metadata.ConsumerEvent) { + r.eventWriteMu.Lock() + defer r.eventWriteMu.Unlock() + + r.rebalanceResults = append(r.rebalanceResults, event) +} + // RemoveAndGetPartition removes the given partitions. func (r *RmtDataCache) RemoveAndGetPartition(subscribeInfos []*metadata.SubscribeInfo, processingRollback bool, partitions map[*metadata.Node][]*metadata.Partition) { if len(subscribeInfos) == 0 {
