This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 92515f7  [ISSUE #926] don't share limit channel in pushConsumer (#923)
92515f7 is described below

commit 92515f7ee3fb9373481191c6cf4e9023cb221234
Author: Xuexue <[email protected]>
AuthorDate: Wed Sep 28 22:20:06 2022 +0800

    [ISSUE #926] don't share limit channel in pushConsumer (#923)
    
    * [feat] don't share goroutin in pushConsumer
---
 consumer/push_consumer.go | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 6080657..e81d4ef 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -72,7 +72,7 @@ type pushConsumer struct {
        queueLock                    *QueueLock
        done                         chan struct{}
        closeOnce                    sync.Once
-       crCh                         chan struct{}
+       crCh                         map[string]chan struct{}
 }
 
 func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
@@ -116,7 +116,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) 
{
                queueLock:       newQueueLock(),
                done:            make(chan struct{}, 1),
                consumeFunc:     utils.NewSet(),
-               crCh:            make(chan struct{}, 
defaultOpts.ConsumeGoroutineNums),
+               crCh:            make(map[string]chan struct{}),
        }
        dc.mqChanged = p.messageQueueChanged
        if p.consumeOrderly {
@@ -260,6 +260,10 @@ func (pc *pushConsumer) Subscribe(topic string, selector 
MessageSelector,
                return errors2.ErrStartTopic
        }
 
+       if _, ok := pc.crCh[topic]; !ok {
+               pc.crCh[topic] = make(chan struct{}, 
pc.defaultConsumer.option.ConsumeGoroutineNums)
+       }
+
        if pc.option.Namespace != "" {
                topic = pc.option.Namespace + "%" + topic
        }
@@ -1018,6 +1022,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
        if msgs == nil {
                return
        }
+
        for count := 0; count < len(msgs); count++ {
                var subMsgs []*primitive.MessageExt
                if count+pc.option.ConsumeMessageBatchMaxSize > len(msgs) {
@@ -1034,7 +1039,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                if limiterOn {
                        limiter(utils.WithoutNamespace(mq.Topic))
                } else {
-                       pc.crCh <- struct{}{}
+                       pc.crCh[mq.Topic] <- struct{}{}
                }
 
                go primitive.WithRecover(func() {
@@ -1046,7 +1051,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                                        })
                                }
                                if !limiterOn {
-                                       <-pc.crCh
+                                       <-pc.crCh[mq.Topic]
                                }
                        }()
                RETRY:

Reply via email to