This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit a2af6f05690e750b899cb5e5257010459d693959
Author: Zijie Lu <[email protected]>
AuthorDate: Fri Jun 4 10:20:54 2021 +0800

    Implement offerEventResult
    
    Signed-off-by: Zijie Lu <[email protected]>
---
 .../tubemq-client-go/client/consumer_impl.go       |  7 ++--
 .../tubemq-client-go/remote/remote.go              | 39 ++++++++++++++--------
 2 files changed, 28 insertions(+), 18 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go 
b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index c0f7e9a..b7312bb 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -203,9 +203,6 @@ func (c *consumer) GetCurrConsumedInfo() 
(map[string]*ConsumerOffset, error) {
 func (c *consumer) processRebalanceEvent() {
        for {
                event := c.rmtDataCache.TakeEvent()
-               if event == nil {
-                       continue
-               }
                if event.GetEventStatus() == int32(util.InvalidValue) && 
event.GetRebalanceID() == util.InvalidValue {
                        break
                }
@@ -213,10 +210,10 @@ func (c *consumer) processRebalanceEvent() {
                switch event.GetEventType() {
                case 2, 20:
                        c.disconnect2Broker(event)
-                       c.rmtDataCache.OfferEvent(event)
+                       c.rmtDataCache.OfferEventResult(event)
                case 1, 10:
                        c.connect2Broker(event)
-                       c.rmtDataCache.OfferEvent(event)
+                       c.rmtDataCache.OfferEventResult(event)
                }
        }
 }
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go 
b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index aa9a2d1..1d82c89 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -35,9 +35,11 @@ type RmtDataCache struct {
        groupFlowCtrlID    int64
        partitionSubInfo   map[string]*metadata.SubscribeInfo
        rebalanceResults   []*metadata.ConsumerEvent
-       eventMu            sync.Mutex
+       eventWriteMu       sync.Mutex
+       eventReadMu        sync.Mutex
        metaMu             sync.Mutex
        dataBookMu         sync.Mutex
+       eventReadCond      *sync.Cond
        brokerPartitions   map[*metadata.Node]map[string]bool
        qryPriorityID      int32
        partitions         map[string]*metadata.Partition
@@ -50,7 +52,7 @@ type RmtDataCache struct {
 
 // NewRmtDataCache returns a default rmtDataCache.
 func NewRmtDataCache() *RmtDataCache {
-       return &RmtDataCache{
+       r := &RmtDataCache{
                defFlowCtrlID:      util.InvalidValue,
                groupFlowCtrlID:    util.InvalidValue,
                qryPriorityID:      int32(util.InvalidValue),
@@ -64,6 +66,8 @@ func NewRmtDataCache() *RmtDataCache {
                topicPartitions:    make(map[string]map[string]bool),
                partitionRegBooked: make(map[string]bool),
        }
+       r.eventReadCond = sync.NewCond(&r.eventReadMu)
+       return r
 }
 
 // GetUnderGroupCtrl returns the underGroupCtrl.
@@ -104,8 +108,8 @@ func (r *RmtDataCache) GetQryPriorityID() int32 {
 
 // PollEventResult polls the first event result from the rebalanceResults.
 func (r *RmtDataCache) PollEventResult() *metadata.ConsumerEvent {
-       r.eventMu.Lock()
-       defer r.eventMu.Unlock()
+       r.eventWriteMu.Lock()
+       defer r.eventWriteMu.Unlock()
        if len(r.rebalanceResults) > 0 {
                event := r.rebalanceResults[0]
                r.rebalanceResults = r.rebalanceResults[1:]
@@ -145,19 +149,20 @@ func (r *RmtDataCache) 
UpdateGroupFlowCtrlInfo(qryPriorityID int32, flowCtrlID i
 
 }
 
-// OfferEvent offers an consumer event.
+// OfferEvent offers an consumer event and notifies the consumer method.
 func (r *RmtDataCache) OfferEvent(event *metadata.ConsumerEvent) {
-       r.eventMu.Lock()
-       defer r.eventMu.Unlock()
+       r.eventReadMu.Lock()
+       defer r.eventReadMu.Unlock()
        r.rebalanceResults = append(r.rebalanceResults, event)
+       r.eventReadCond.Broadcast()
 }
 
 // TakeEvent takes an event from the rebalanceResults.
 func (r *RmtDataCache) TakeEvent() *metadata.ConsumerEvent {
-       r.eventMu.Lock()
-       defer r.eventMu.Unlock()
-       if len(r.rebalanceResults) == 0 {
-               return nil
+       r.eventReadMu.Lock()
+       defer r.eventReadMu.Unlock()
+       for len(r.rebalanceResults) == 0 {
+               r.eventReadCond.Wait()
        }
        event := r.rebalanceResults[0]
        r.rebalanceResults = r.rebalanceResults[1:]
@@ -166,11 +171,19 @@ func (r *RmtDataCache) TakeEvent() 
*metadata.ConsumerEvent {
 
 // ClearEvent clears all the events.
 func (r *RmtDataCache) ClearEvent() {
-       r.eventMu.Lock()
-       defer r.eventMu.Unlock()
+       r.eventWriteMu.Lock()
+       defer r.eventWriteMu.Unlock()
        r.rebalanceResults = r.rebalanceResults[:0]
 }
 
+// OfferEventResult offers an consumer event.
+func (r *RmtDataCache) OfferEventResult(event *metadata.ConsumerEvent) {
+       r.eventWriteMu.Lock()
+       defer r.eventWriteMu.Unlock()
+
+       r.rebalanceResults = append(r.rebalanceResults, event)
+}
+
 // RemoveAndGetPartition removes the given partitions.
 func (r *RmtDataCache) RemoveAndGetPartition(subscribeInfos 
[]*metadata.SubscribeInfo, processingRollback bool, partitions 
map[*metadata.Node][]*metadata.Partition) {
        if len(subscribeInfos) == 0 {

Reply via email to