This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 92515f7 [ISSUE #926] don't share limit channel in pushConsumer (#923)
92515f7 is described below
commit 92515f7ee3fb9373481191c6cf4e9023cb221234
Author: Xuexue <[email protected]>
AuthorDate: Wed Sep 28 22:20:06 2022 +0800
[ISSUE #926] don't share limit channel in pushConsumer (#923)
* [feat] don't share goroutin in pushConsumer
---
consumer/push_consumer.go | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 6080657..e81d4ef 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -72,7 +72,7 @@ type pushConsumer struct {
queueLock *QueueLock
done chan struct{}
closeOnce sync.Once
- crCh chan struct{}
+ crCh map[string]chan struct{}
}
func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
@@ -116,7 +116,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error)
{
queueLock: newQueueLock(),
done: make(chan struct{}, 1),
consumeFunc: utils.NewSet(),
- crCh: make(chan struct{},
defaultOpts.ConsumeGoroutineNums),
+ crCh: make(map[string]chan struct{}),
}
dc.mqChanged = p.messageQueueChanged
if p.consumeOrderly {
@@ -260,6 +260,10 @@ func (pc *pushConsumer) Subscribe(topic string, selector
MessageSelector,
return errors2.ErrStartTopic
}
+ if _, ok := pc.crCh[topic]; !ok {
+ pc.crCh[topic] = make(chan struct{},
pc.defaultConsumer.option.ConsumeGoroutineNums)
+ }
+
if pc.option.Namespace != "" {
topic = pc.option.Namespace + "%" + topic
}
@@ -1018,6 +1022,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
if msgs == nil {
return
}
+
for count := 0; count < len(msgs); count++ {
var subMsgs []*primitive.MessageExt
if count+pc.option.ConsumeMessageBatchMaxSize > len(msgs) {
@@ -1034,7 +1039,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
if limiterOn {
limiter(utils.WithoutNamespace(mq.Topic))
} else {
- pc.crCh <- struct{}{}
+ pc.crCh[mq.Topic] <- struct{}{}
}
go primitive.WithRecover(func() {
@@ -1046,7 +1051,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
})
}
if !limiterOn {
- <-pc.crCh
+ <-pc.crCh[mq.Topic]
}
}()
RETRY: