This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new a093bac [ISSUE #945]init crCh after topic with namespace to avoid
consumption blocked. (#944)
a093bac is described below
commit a093bacaceb285d008a7eaea899fa4c935c0b791
Author: Kay Du <[email protected]>
AuthorDate: Fri Oct 14 14:01:35 2022 +0800
[ISSUE #945]init crCh after topic with namespace to avoid consumption
blocked. (#944)
* init crCh after topic with namespace instead of before which will cause
consumption blocked; opt on log
* init crCh for retry topic when start consumer
* judge on crCh when consume message and init once if nil; limiter on judge
before handle messages
---
consumer/consumer.go | 6 +++---
consumer/push_consumer.go | 20 ++++++++++++++------
internal/client.go | 4 +++-
internal/constants.go | 5 +++++
4 files changed, 25 insertions(+), 10 deletions(-)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 4859168..26e874c 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -420,7 +420,7 @@ func (dc *defaultConsumer) doBalance() {
changed := dc.updateProcessQueueTable(topic,
allocateResult)
if changed {
dc.mqChanged(topic, mqAll, allocateResult)
- rlog.Debug("MessageQueue do balance done",
map[string]interface{}{
+ rlog.Info("MessageQueue do balance done",
map[string]interface{}{
rlog.LogKeyConsumerGroup:
dc.consumerGroup,
rlog.LogKeyTopic: topic,
"clientID":
dc.client.ClientID(),
@@ -676,7 +676,7 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic
string, mqs []*primitiv
if dc.removeUnnecessaryMessageQueue(&mq, pq) {
dc.processQueueTable.Delete(key)
changed = true
- rlog.Debug("remove unnecessary mq when
updateProcessQueueTable", map[string]interface{}{
+ rlog.Info("remove unnecessary mq when
updateProcessQueueTable", map[string]interface{}{
rlog.LogKeyConsumerGroup:
dc.consumerGroup,
rlog.LogKeyMessageQueue:
mq.String(),
})
@@ -686,7 +686,7 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic
string, mqs []*primitiv
if dc.removeUnnecessaryMessageQueue(&mq, pq) {
dc.processQueueTable.Delete(key)
changed = true
- rlog.Debug("remove unnecessary mq
because pull was expired, prepare to fix it", map[string]interface{}{
+ rlog.Warning("remove unnecessary mq
because pull was expired, prepare to fix it", map[string]interface{}{
rlog.LogKeyConsumerGroup:
dc.consumerGroup,
rlog.LogKeyMessageQueue:
mq.String(),
})
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index e81d4ef..d9d0a50 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -155,6 +155,9 @@ func (pc *pushConsumer) Start() error {
return
}
+ retryTopic := internal.GetRetryTopic(pc.consumerGroup)
+ pc.crCh[retryTopic] = make(chan struct{},
pc.defaultConsumer.option.ConsumeGoroutineNums)
+
go func() {
// todo start clean msg expired
for {
@@ -260,13 +263,12 @@ func (pc *pushConsumer) Subscribe(topic string, selector
MessageSelector,
return errors2.ErrStartTopic
}
- if _, ok := pc.crCh[topic]; !ok {
- pc.crCh[topic] = make(chan struct{},
pc.defaultConsumer.option.ConsumeGoroutineNums)
- }
-
if pc.option.Namespace != "" {
topic = pc.option.Namespace + "%" + topic
}
+ if _, ok := pc.crCh[topic]; !ok {
+ pc.crCh[topic] = make(chan struct{},
pc.defaultConsumer.option.ConsumeGoroutineNums)
+ }
data := buildSubscriptionData(topic, selector)
pc.subscriptionDataTable.Store(topic, data)
pc.subscribedTopic[topic] = ""
@@ -1023,6 +1025,14 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
return
}
+ limiter := pc.option.Limiter
+ limiterOn := limiter != nil
+ if !limiterOn {
+ if _, ok := pc.crCh[mq.Topic]; !ok {
+ pc.crCh[mq.Topic] = make(chan struct{},
pc.defaultConsumer.option.ConsumeGoroutineNums)
+ }
+ }
+
for count := 0; count < len(msgs); count++ {
var subMsgs []*primitive.MessageExt
if count+pc.option.ConsumeMessageBatchMaxSize > len(msgs) {
@@ -1034,8 +1044,6 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
count = next - 1
}
- limiter := pc.option.Limiter
- limiterOn := limiter != nil
if limiterOn {
limiter(utils.WithoutNamespace(mq.Topic))
} else {
diff --git a/internal/client.go b/internal/client.go
index bb7538c..e3c48ed 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -603,7 +603,9 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
return true
})
if hbData.ProducerDatas.Len() == 0 && hbData.ConsumerDatas.Len() == 0 {
- rlog.Info("sending heartbeat, but no producer and no consumer",
nil)
+ rlog.Info("sending heartbeat, but no producer and no consumer",
map[string]interface{}{
+ "clientid": hbData.ClientId,
+ })
return
}
c.GetNameSrv().(*namesrvs).brokerAddressesMap.Range(func(key, value
interface{}) bool {
diff --git a/internal/constants.go b/internal/constants.go
index 1f15a7e..97411e8 100644
--- a/internal/constants.go
+++ b/internal/constants.go
@@ -17,6 +17,8 @@ limitations under the License.
package internal
+import "strings"
+
const (
RetryGroupTopicPrefix = "%RETRY%"
DefaultConsumerGroup = "DEFAULT_CONSUMER"
@@ -31,5 +33,8 @@ func GetReplyTopic(clusterName string) string {
}
func GetRetryTopic(group string) string {
+ if strings.HasPrefix(group, RetryGroupTopicPrefix) {
+ return group
+ }
return RetryGroupTopicPrefix + group
}