This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 3d93e29 bugfix rebalance is not paused after suspend and rebalance
not trigged in resume
new ade7cfa Merge pull request #852 from NeonToo/fix/reset_offset
3d93e29 is described below
commit 3d93e293671495a4fe06104f6d9e6a9496e9b11a
Author: 筱瑜 <[email protected]>
AuthorDate: Wed Jun 29 01:27:20 2022 +0800
bugfix rebalance is not paused after suspend and rebalance not trigged in
resume
---
consumer/consumer.go | 16 +++++++++++++---
consumer/push_consumer.go | 25 ++++++++++++++-----------
internal/client.go | 15 +++++++++++++--
3 files changed, 40 insertions(+), 16 deletions(-)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 6e53eaf..b605659 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -221,8 +221,8 @@ type PullRequest struct {
}
func (pr *PullRequest) String() string {
- return fmt.Sprintf("[ConsumerGroup: %s, Topic: %s, MessageQueue: %d]",
- pr.consumerGroup, pr.mq.Topic, pr.mq.QueueId)
+ return fmt.Sprintf("[ConsumerGroup: %s, Topic: %s, MessageQueue:
brokerName=%s, queueId=%d, nextOffset=%d]",
+ pr.consumerGroup, pr.mq.Topic, pr.mq.BrokerName, pr.mq.QueueId,
pr.nextOffset)
}
type defaultConsumer struct {
@@ -357,6 +357,16 @@ func (dc *defaultConsumer)
isSubscribeTopicNeedUpdate(topic string) bool {
return !exist
}
+func (dc *defaultConsumer) doBalanceIfNotPaused() {
+ if dc.pause {
+ rlog.Info("[BALANCE-SKIP] since consumer paused",
map[string]interface{}{
+ rlog.LogKeyConsumerGroup: dc.consumerGroup,
+ })
+ return
+ }
+ dc.doBalance()
+}
+
func (dc *defaultConsumer) doBalance() {
dc.subscriptionDataTable.Range(func(key, value interface{}) bool {
topic := key.(string)
@@ -674,7 +684,7 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic
string, mqs []*primitiv
if dc.removeUnnecessaryMessageQueue(&mq, pq) {
dc.processQueueTable.Delete(key)
changed = true
- rlog.Debug("remove unnecessary mq
because pull was paused, prepare to fix it", map[string]interface{}{
+ rlog.Debug("remove unnecessary mq
because pull was expired, prepare to fix it", map[string]interface{}{
rlog.LogKeyConsumerGroup:
dc.consumerGroup,
rlog.LogKeyMessageQueue:
mq.String(),
})
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 4ad5ee3..57726cf 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -259,6 +259,10 @@ func (pc *pushConsumer) Rebalance() {
pc.defaultConsumer.doBalance()
}
+func (pc *pushConsumer) RebalanceIfNotPaused() {
+ pc.defaultConsumer.doBalanceIfNotPaused()
+}
+
func (pc *pushConsumer) PersistConsumerOffset() error {
return pc.defaultConsumer.persistConsumerOffset()
}
@@ -877,6 +881,8 @@ func (pc *pushConsumer) ResetOffset(topic string, table
map[primitive.MessageQue
pc.suspend()
defer pc.resume()
+ mqs := make([]*primitive.MessageQueue, 0)
+ copyPc := sync.Map{}
pc.processQueueTable.Range(func(key, value interface{}) bool {
mq := key.(primitive.MessageQueue)
pq := value.(*processQueue)
@@ -884,24 +890,21 @@ func (pc *pushConsumer) ResetOffset(topic string, table
map[primitive.MessageQue
pq.WithDropped(true)
pq.clear()
}
+ mqs = append(mqs, &mq)
+ copyPc.Store(&mq, pq)
return true
})
time.Sleep(10 * time.Second)
- v, exist := pc.topicSubscribeInfoTable.Load(topic)
- if !exist {
- return
- }
- queuesOfTopic := v.([]*primitive.MessageQueue)
- for _, k := range queuesOfTopic {
- if _, ok := table[*k]; ok {
- pc.storage.update(k, table[*k], false)
- v, exist := pc.processQueueTable.Load(k)
+ for _, mq := range mqs {
+ if _, ok := table[*mq]; ok {
+ pc.storage.update(mq, table[*mq], false)
+ v, exist := copyPc.Load(mq)
if !exist {
continue
}
pq := v.(*processQueue)
- pc.removeUnnecessaryMessageQueue(k, pq)
- pc.processQueueTable.Delete(k)
+ pc.removeUnnecessaryMessageQueue(mq, pq)
+ pc.processQueueTable.Delete(mq)
}
}
}
diff --git a/internal/client.go b/internal/client.go
index e8c4803..138dcf3 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -84,6 +84,7 @@ type InnerConsumer interface {
IsSubscribeTopicNeedUpdate(topic string) bool
SubscriptionDataList() []*SubscriptionData
Rebalance()
+ RebalanceIfNotPaused()
IsUnitMode() bool
GetConsumerRunningInfo(stack bool) *ConsumerRunningInfo
ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string)
*ConsumeMessageDirectlyResult
@@ -223,7 +224,7 @@ func GetOrNewRocketMQClient(option ClientOptions,
callbackCh chan interface{}) R
rlog.Info("receive broker's notification to consumer
group", map[string]interface{}{
rlog.LogKeyConsumerGroup:
req.ExtFields["consumerGroup"],
})
- client.RebalanceImmediately()
+ client.RebalanceIfNotPaused()
return nil
})
client.remoteClient.RegisterRequestFunc(ReqCheckTransactionState, func(req
*remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
@@ -492,7 +493,7 @@ func (c *rmqClient) Start() {
for {
select {
case <-ticker.C:
- c.RebalanceImmediately()
+ c.RebalanceIfNotPaused()
case <-c.done:
rlog.Info("The RMQClient stopping do
rebalance", map[string]interface{}{
"clientID": c.ClientID(),
@@ -820,6 +821,16 @@ func (c *rmqClient) RebalanceImmediately() {
})
}
+func (c *rmqClient) RebalanceIfNotPaused() {
+ c.rbMutex.Lock()
+ defer c.rbMutex.Unlock()
+ c.consumerMap.Range(func(key, value interface{}) bool {
+ consumer := value.(InnerConsumer)
+ consumer.RebalanceIfNotPaused()
+ return true
+ })
+}
+
func (c *rmqClient) UpdatePublishInfo(topic string, data *TopicRouteData,
changed bool) {
if data == nil {
return