This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d93e29  bugfix rebalance is not paused after suspend and rebalance 
not trigged in resume
     new ade7cfa  Merge pull request #852 from NeonToo/fix/reset_offset
3d93e29 is described below

commit 3d93e293671495a4fe06104f6d9e6a9496e9b11a
Author: 筱瑜 <[email protected]>
AuthorDate: Wed Jun 29 01:27:20 2022 +0800

    bugfix rebalance is not paused after suspend and rebalance not trigged in 
resume
---
 consumer/consumer.go      | 16 +++++++++++++---
 consumer/push_consumer.go | 25 ++++++++++++++-----------
 internal/client.go        | 15 +++++++++++++--
 3 files changed, 40 insertions(+), 16 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 6e53eaf..b605659 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -221,8 +221,8 @@ type PullRequest struct {
 }
 
 func (pr *PullRequest) String() string {
-       return fmt.Sprintf("[ConsumerGroup: %s, Topic: %s, MessageQueue: %d]",
-               pr.consumerGroup, pr.mq.Topic, pr.mq.QueueId)
+       return fmt.Sprintf("[ConsumerGroup: %s, Topic: %s, MessageQueue: 
brokerName=%s, queueId=%d, nextOffset=%d]",
+               pr.consumerGroup, pr.mq.Topic, pr.mq.BrokerName, pr.mq.QueueId, 
pr.nextOffset)
 }
 
 type defaultConsumer struct {
@@ -357,6 +357,16 @@ func (dc *defaultConsumer) 
isSubscribeTopicNeedUpdate(topic string) bool {
        return !exist
 }
 
+func (dc *defaultConsumer) doBalanceIfNotPaused() {
+       if dc.pause {
+               rlog.Info("[BALANCE-SKIP] since consumer paused", 
map[string]interface{}{
+                       rlog.LogKeyConsumerGroup: dc.consumerGroup,
+               })
+               return
+       }
+       dc.doBalance()
+}
+
 func (dc *defaultConsumer) doBalance() {
        dc.subscriptionDataTable.Range(func(key, value interface{}) bool {
                topic := key.(string)
@@ -674,7 +684,7 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic 
string, mqs []*primitiv
                                if dc.removeUnnecessaryMessageQueue(&mq, pq) {
                                        dc.processQueueTable.Delete(key)
                                        changed = true
-                                       rlog.Debug("remove unnecessary mq 
because pull was paused, prepare to fix it", map[string]interface{}{
+                                       rlog.Debug("remove unnecessary mq 
because pull was expired, prepare to fix it", map[string]interface{}{
                                                rlog.LogKeyConsumerGroup: 
dc.consumerGroup,
                                                rlog.LogKeyMessageQueue:  
mq.String(),
                                        })
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 4ad5ee3..57726cf 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -259,6 +259,10 @@ func (pc *pushConsumer) Rebalance() {
        pc.defaultConsumer.doBalance()
 }
 
+func (pc *pushConsumer) RebalanceIfNotPaused() {
+       pc.defaultConsumer.doBalanceIfNotPaused()
+}
+
 func (pc *pushConsumer) PersistConsumerOffset() error {
        return pc.defaultConsumer.persistConsumerOffset()
 }
@@ -877,6 +881,8 @@ func (pc *pushConsumer) ResetOffset(topic string, table 
map[primitive.MessageQue
        pc.suspend()
        defer pc.resume()
 
+       mqs := make([]*primitive.MessageQueue, 0)
+       copyPc := sync.Map{}
        pc.processQueueTable.Range(func(key, value interface{}) bool {
                mq := key.(primitive.MessageQueue)
                pq := value.(*processQueue)
@@ -884,24 +890,21 @@ func (pc *pushConsumer) ResetOffset(topic string, table 
map[primitive.MessageQue
                        pq.WithDropped(true)
                        pq.clear()
                }
+               mqs = append(mqs, &mq)
+               copyPc.Store(&mq, pq)
                return true
        })
        time.Sleep(10 * time.Second)
-       v, exist := pc.topicSubscribeInfoTable.Load(topic)
-       if !exist {
-               return
-       }
-       queuesOfTopic := v.([]*primitive.MessageQueue)
-       for _, k := range queuesOfTopic {
-               if _, ok := table[*k]; ok {
-                       pc.storage.update(k, table[*k], false)
-                       v, exist := pc.processQueueTable.Load(k)
+       for _, mq := range mqs {
+               if _, ok := table[*mq]; ok {
+                       pc.storage.update(mq, table[*mq], false)
+                       v, exist := copyPc.Load(mq)
                        if !exist {
                                continue
                        }
                        pq := v.(*processQueue)
-                       pc.removeUnnecessaryMessageQueue(k, pq)
-                       pc.processQueueTable.Delete(k)
+                       pc.removeUnnecessaryMessageQueue(mq, pq)
+                       pc.processQueueTable.Delete(mq)
                }
        }
 }
diff --git a/internal/client.go b/internal/client.go
index e8c4803..138dcf3 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -84,6 +84,7 @@ type InnerConsumer interface {
        IsSubscribeTopicNeedUpdate(topic string) bool
        SubscriptionDataList() []*SubscriptionData
        Rebalance()
+       RebalanceIfNotPaused()
        IsUnitMode() bool
        GetConsumerRunningInfo(stack bool) *ConsumerRunningInfo
        ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) 
*ConsumeMessageDirectlyResult
@@ -223,7 +224,7 @@ func GetOrNewRocketMQClient(option ClientOptions, 
callbackCh chan interface{}) R
                        rlog.Info("receive broker's notification to consumer 
group", map[string]interface{}{
                                rlog.LogKeyConsumerGroup: 
req.ExtFields["consumerGroup"],
                        })
-                       client.RebalanceImmediately()
+                       client.RebalanceIfNotPaused()
                        return nil
                })
                
client.remoteClient.RegisterRequestFunc(ReqCheckTransactionState, func(req 
*remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
@@ -492,7 +493,7 @@ func (c *rmqClient) Start() {
                        for {
                                select {
                                case <-ticker.C:
-                                       c.RebalanceImmediately()
+                                       c.RebalanceIfNotPaused()
                                case <-c.done:
                                        rlog.Info("The RMQClient stopping do 
rebalance", map[string]interface{}{
                                                "clientID": c.ClientID(),
@@ -820,6 +821,16 @@ func (c *rmqClient) RebalanceImmediately() {
        })
 }
 
+func (c *rmqClient) RebalanceIfNotPaused() {
+       c.rbMutex.Lock()
+       defer c.rbMutex.Unlock()
+       c.consumerMap.Range(func(key, value interface{}) bool {
+               consumer := value.(InnerConsumer)
+               consumer.RebalanceIfNotPaused()
+               return true
+       })
+}
+
 func (c *rmqClient) UpdatePublishInfo(topic string, data *TopicRouteData, 
changed bool) {
        if data == nil {
                return

Reply via email to