This is an automated email from the ASF dual-hosted git repository.

cserwen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c77a95d  fix: unsubscribed topic queues not dropped (#1025)
c77a95d is described below

commit c77a95d431bb2fb1e5395ac97dceb6e946599c23
Author: Gordon Wang <[email protected]>
AuthorDate: Thu Apr 27 10:19:15 2023 +0800

    fix: unsubscribed topic queues not dropped (#1025)
---
 consumer/consumer.go | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 7bd2b02..acce804 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -430,6 +430,25 @@ func (dc *defaultConsumer) doBalance() {
                }
                return true
        })
+       dc.truncateMessageQueueNotMyTopic()
+}
+
+func (dc *defaultConsumer) truncateMessageQueueNotMyTopic() {
+       dc.processQueueTable.Range(func(key, value interface{}) bool {
+               mq := key.(primitive.MessageQueue)
+               pq := value.(*processQueue)
+               if _, ok := dc.subscriptionDataTable.Load(mq.Topic); !ok {
+                       pq.WithDropped(true)
+                       if dc.removeUnnecessaryMessageQueue(&mq, pq) {
+                               dc.processQueueTable.Delete(key)
+                               rlog.Info("remove unnecessary mq because 
unsubscribed", map[string]interface{}{
+                                       rlog.LogKeyConsumerGroup: 
dc.consumerGroup,
+                                       rlog.LogKeyMessageQueue:  mq.String(),
+                               })
+                       }
+               }
+               return true
+       })
 }
 
 func (dc *defaultConsumer) SubscriptionDataList() []*internal.SubscriptionData 
{

Reply via email to