This is an automated email from the ASF dual-hosted git repository.
cserwen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new c77a95d fix: unsubscribed topic queues not dropped (#1025)
c77a95d is described below
commit c77a95d431bb2fb1e5395ac97dceb6e946599c23
Author: Gordon Wang <[email protected]>
AuthorDate: Thu Apr 27 10:19:15 2023 +0800
fix: unsubscribed topic queues not dropped (#1025)
---
consumer/consumer.go | 19 +++++++++++++++++++
1 file changed, 19 insertions(+)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 7bd2b02..acce804 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -430,6 +430,25 @@ func (dc *defaultConsumer) doBalance() {
}
return true
})
+ dc.truncateMessageQueueNotMyTopic()
+}
+
+func (dc *defaultConsumer) truncateMessageQueueNotMyTopic() {
+ dc.processQueueTable.Range(func(key, value interface{}) bool {
+ mq := key.(primitive.MessageQueue)
+ pq := value.(*processQueue)
+ if _, ok := dc.subscriptionDataTable.Load(mq.Topic); !ok {
+ pq.WithDropped(true)
+ if dc.removeUnnecessaryMessageQueue(&mq, pq) {
+ dc.processQueueTable.Delete(key)
+ rlog.Info("remove unnecessary mq because
unsubscribed", map[string]interface{}{
+ rlog.LogKeyConsumerGroup:
dc.consumerGroup,
+ rlog.LogKeyMessageQueue: mq.String(),
+ })
+ }
+ }
+ return true
+ })
}
func (dc *defaultConsumer) SubscriptionDataList() []*internal.SubscriptionData
{