This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 2e1b335 (feat): add user defined topic dimension consumer limiter
(#895)
2e1b335 is described below
commit 2e1b33558788162abe2b871c029993303eb327f1
Author: Robin Han <[email protected]>
AuthorDate: Mon Aug 22 19:27:42 2022 +0800
(feat): add user defined topic dimension consumer limiter (#895)
Co-authored-by: wumu.hx <[email protected]>
---
consumer/limiter.go | 3 +++
consumer/option.go | 8 ++++++++
consumer/push_consumer.go | 17 ++++++++++++++---
internal/utils/namespace.go | 21 +++++++++++++++++++++
4 files changed, 46 insertions(+), 3 deletions(-)
diff --git a/consumer/limiter.go b/consumer/limiter.go
new file mode 100644
index 0000000..1930f8a
--- /dev/null
+++ b/consumer/limiter.go
@@ -0,0 +1,3 @@
+package consumer
+
+type Limiter func(topic string)
diff --git a/consumer/option.go b/consumer/option.go
index 2247aa1..2d51b46 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -111,6 +111,8 @@ type consumerOptions struct {
ConsumeGoroutineNums int
filterMessageHooks []hooks.FilterMessageHook
+
+ Limiter Limiter
}
func defaultPushConsumerOptions() consumerOptions {
@@ -344,3 +346,9 @@ func WithFilterMessageHook(hooks []hooks.FilterMessageHook)
Option {
opts.filterMessageHooks = hooks
}
}
+
+func WithLimiter(limiter Limiter) Option {
+ return func(opts *consumerOptions) {
+ opts.Limiter = limiter
+ }
+}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 41d7e6d..81c74aa 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -1026,7 +1026,14 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
count = next - 1
}
- pc.crCh <- struct{}{}
+ limiter := pc.option.Limiter
+ limiterOn := limiter != nil
+ if limiterOn {
+ limiter(utils.WithoutNamespace(mq.Topic))
+ } else {
+ pc.crCh <- struct{}{}
+ }
+
go primitive.WithRecover(func() {
RETRY:
if pq.IsDroppd() {
@@ -1034,7 +1041,9 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
rlog.LogKeyMessageQueue: mq.String(),
rlog.LogKeyConsumerGroup:
pc.consumerGroup,
})
- <-pc.crCh
+ if !limiterOn {
+ <-pc.crCh
+ }
return
}
@@ -1114,7 +1123,9 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
"message": subMsgs,
})
}
- <-pc.crCh
+ if !limiterOn {
+ <-pc.crCh
+ }
})
}
}
diff --git a/internal/utils/namespace.go b/internal/utils/namespace.go
index 5e98706..754e8c8 100644
--- a/internal/utils/namespace.go
+++ b/internal/utils/namespace.go
@@ -3,6 +3,8 @@ package utils
import "strings"
const namespaceSeparator = "%"
+const retryPrefix = "%RETRY%"
+const dlqPrefix = "%DLQ%"
func WrapNamespace(namespace, resourceWithOutNamespace string) string {
if IsEmpty(namespace) || IsEmpty(resourceWithOutNamespace) {
@@ -22,3 +24,22 @@ func isAlreadyWithNamespace(resource, namespace string) bool
{
}
return strings.Contains(resource, namespace+namespaceSeparator)
}
+
+func WithoutNamespace(resource string) string {
+ if len(resource) == 0 {
+ return resource
+ }
+ resourceWithoutNamespace := ""
+ if strings.HasPrefix(resource, retryPrefix) {
+ resourceWithoutNamespace += retryPrefix
+ } else if strings.HasPrefix(resource, dlqPrefix) {
+ resourceWithoutNamespace += dlqPrefix
+ }
+ index := strings.LastIndex(resource, namespaceSeparator)
+ if index > 0 {
+ resourceWithoutNamespace += resource[index+1:]
+ } else {
+ resourceWithoutNamespace = resource
+ }
+ return resourceWithoutNamespace
+}