This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new a093bac  [ISSUE #945]init crCh after topic with namespace to avoid 
consumption blocked. (#944)
a093bac is described below

commit a093bacaceb285d008a7eaea899fa4c935c0b791
Author: Kay Du <[email protected]>
AuthorDate: Fri Oct 14 14:01:35 2022 +0800

    [ISSUE #945]init crCh after topic with namespace to avoid consumption 
blocked. (#944)
    
    * init crCh after topic with namespace instead of before which will cause 
consumption blocked; opt on log
    
    * init crCh for retry topic when start consumer
    
    * judge on crCh when consume message and init once if nil; limiter on judge 
before handle messages
---
 consumer/consumer.go      |  6 +++---
 consumer/push_consumer.go | 20 ++++++++++++++------
 internal/client.go        |  4 +++-
 internal/constants.go     |  5 +++++
 4 files changed, 25 insertions(+), 10 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 4859168..26e874c 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -420,7 +420,7 @@ func (dc *defaultConsumer) doBalance() {
                        changed := dc.updateProcessQueueTable(topic, 
allocateResult)
                        if changed {
                                dc.mqChanged(topic, mqAll, allocateResult)
-                               rlog.Debug("MessageQueue do balance done", 
map[string]interface{}{
+                               rlog.Info("MessageQueue do balance done", 
map[string]interface{}{
                                        rlog.LogKeyConsumerGroup: 
dc.consumerGroup,
                                        rlog.LogKeyTopic:         topic,
                                        "clientID":               
dc.client.ClientID(),
@@ -676,7 +676,7 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic 
string, mqs []*primitiv
                                if dc.removeUnnecessaryMessageQueue(&mq, pq) {
                                        dc.processQueueTable.Delete(key)
                                        changed = true
-                                       rlog.Debug("remove unnecessary mq when 
updateProcessQueueTable", map[string]interface{}{
+                                       rlog.Info("remove unnecessary mq when 
updateProcessQueueTable", map[string]interface{}{
                                                rlog.LogKeyConsumerGroup: 
dc.consumerGroup,
                                                rlog.LogKeyMessageQueue:  
mq.String(),
                                        })
@@ -686,7 +686,7 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic 
string, mqs []*primitiv
                                if dc.removeUnnecessaryMessageQueue(&mq, pq) {
                                        dc.processQueueTable.Delete(key)
                                        changed = true
-                                       rlog.Debug("remove unnecessary mq 
because pull was expired, prepare to fix it", map[string]interface{}{
+                                       rlog.Warning("remove unnecessary mq 
because pull was expired, prepare to fix it", map[string]interface{}{
                                                rlog.LogKeyConsumerGroup: 
dc.consumerGroup,
                                                rlog.LogKeyMessageQueue:  
mq.String(),
                                        })
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index e81d4ef..d9d0a50 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -155,6 +155,9 @@ func (pc *pushConsumer) Start() error {
                        return
                }
 
+               retryTopic := internal.GetRetryTopic(pc.consumerGroup)
+               pc.crCh[retryTopic] = make(chan struct{}, 
pc.defaultConsumer.option.ConsumeGoroutineNums)
+
                go func() {
                        // todo start clean msg expired
                        for {
@@ -260,13 +263,12 @@ func (pc *pushConsumer) Subscribe(topic string, selector 
MessageSelector,
                return errors2.ErrStartTopic
        }
 
-       if _, ok := pc.crCh[topic]; !ok {
-               pc.crCh[topic] = make(chan struct{}, 
pc.defaultConsumer.option.ConsumeGoroutineNums)
-       }
-
        if pc.option.Namespace != "" {
                topic = pc.option.Namespace + "%" + topic
        }
+       if _, ok := pc.crCh[topic]; !ok {
+               pc.crCh[topic] = make(chan struct{}, 
pc.defaultConsumer.option.ConsumeGoroutineNums)
+       }
        data := buildSubscriptionData(topic, selector)
        pc.subscriptionDataTable.Store(topic, data)
        pc.subscribedTopic[topic] = ""
@@ -1023,6 +1025,14 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                return
        }
 
+       limiter := pc.option.Limiter
+       limiterOn := limiter != nil
+       if !limiterOn {
+               if _, ok := pc.crCh[mq.Topic]; !ok {
+                       pc.crCh[mq.Topic] = make(chan struct{}, 
pc.defaultConsumer.option.ConsumeGoroutineNums)
+               }
+       }
+
        for count := 0; count < len(msgs); count++ {
                var subMsgs []*primitive.MessageExt
                if count+pc.option.ConsumeMessageBatchMaxSize > len(msgs) {
@@ -1034,8 +1044,6 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                        count = next - 1
                }
 
-               limiter := pc.option.Limiter
-               limiterOn := limiter != nil
                if limiterOn {
                        limiter(utils.WithoutNamespace(mq.Topic))
                } else {
diff --git a/internal/client.go b/internal/client.go
index bb7538c..e3c48ed 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -603,7 +603,9 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
                return true
        })
        if hbData.ProducerDatas.Len() == 0 && hbData.ConsumerDatas.Len() == 0 {
-               rlog.Info("sending heartbeat, but no producer and no consumer", 
nil)
+               rlog.Info("sending heartbeat, but no producer and no consumer", 
map[string]interface{}{
+                       "clientid": hbData.ClientId,
+               })
                return
        }
        c.GetNameSrv().(*namesrvs).brokerAddressesMap.Range(func(key, value 
interface{}) bool {
diff --git a/internal/constants.go b/internal/constants.go
index 1f15a7e..97411e8 100644
--- a/internal/constants.go
+++ b/internal/constants.go
@@ -17,6 +17,8 @@ limitations under the License.
 
 package internal
 
+import "strings"
+
 const (
        RetryGroupTopicPrefix    = "%RETRY%"
        DefaultConsumerGroup     = "DEFAULT_CONSUMER"
@@ -31,5 +33,8 @@ func GetReplyTopic(clusterName string) string {
 }
 
 func GetRetryTopic(group string) string {
+       if strings.HasPrefix(group, RetryGroupTopicPrefix) {
+               return group
+       }
        return RetryGroupTopicPrefix + group
 }

Reply via email to