This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e1b335  (feat): add user defined topic dimension consumer limiter 
(#895)
2e1b335 is described below

commit 2e1b33558788162abe2b871c029993303eb327f1
Author: Robin Han <[email protected]>
AuthorDate: Mon Aug 22 19:27:42 2022 +0800

    (feat): add user defined topic dimension consumer limiter (#895)
    
    Co-authored-by: wumu.hx <[email protected]>
---
 consumer/limiter.go         |  3 +++
 consumer/option.go          |  8 ++++++++
 consumer/push_consumer.go   | 17 ++++++++++++++---
 internal/utils/namespace.go | 21 +++++++++++++++++++++
 4 files changed, 46 insertions(+), 3 deletions(-)

diff --git a/consumer/limiter.go b/consumer/limiter.go
new file mode 100644
index 0000000..1930f8a
--- /dev/null
+++ b/consumer/limiter.go
@@ -0,0 +1,3 @@
+package consumer
+
+type Limiter func(topic string)
diff --git a/consumer/option.go b/consumer/option.go
index 2247aa1..2d51b46 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -111,6 +111,8 @@ type consumerOptions struct {
        ConsumeGoroutineNums int
 
        filterMessageHooks []hooks.FilterMessageHook
+
+       Limiter Limiter
 }
 
 func defaultPushConsumerOptions() consumerOptions {
@@ -344,3 +346,9 @@ func WithFilterMessageHook(hooks []hooks.FilterMessageHook) 
Option {
                opts.filterMessageHooks = hooks
        }
 }
+
+func WithLimiter(limiter Limiter) Option {
+       return func(opts *consumerOptions) {
+               opts.Limiter = limiter
+       }
+}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 41d7e6d..81c74aa 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -1026,7 +1026,14 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                        count = next - 1
                }
 
-               pc.crCh <- struct{}{}
+               limiter := pc.option.Limiter
+               limiterOn := limiter != nil
+               if limiterOn {
+                       limiter(utils.WithoutNamespace(mq.Topic))
+               } else {
+                       pc.crCh <- struct{}{}
+               }
+
                go primitive.WithRecover(func() {
                RETRY:
                        if pq.IsDroppd() {
@@ -1034,7 +1041,9 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                                        rlog.LogKeyMessageQueue:  mq.String(),
                                        rlog.LogKeyConsumerGroup: 
pc.consumerGroup,
                                })
-                               <-pc.crCh
+                               if !limiterOn {
+                                       <-pc.crCh
+                               }
                                return
                        }
 
@@ -1114,7 +1123,9 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                                        "message":               subMsgs,
                                })
                        }
-                       <-pc.crCh
+                       if !limiterOn {
+                               <-pc.crCh
+                       }
                })
        }
 }
diff --git a/internal/utils/namespace.go b/internal/utils/namespace.go
index 5e98706..754e8c8 100644
--- a/internal/utils/namespace.go
+++ b/internal/utils/namespace.go
@@ -3,6 +3,8 @@ package utils
 import "strings"
 
 const namespaceSeparator = "%"
+const retryPrefix = "%RETRY%"
+const dlqPrefix = "%DLQ%"
 
 func WrapNamespace(namespace, resourceWithOutNamespace string) string {
        if IsEmpty(namespace) || IsEmpty(resourceWithOutNamespace) {
@@ -22,3 +24,22 @@ func isAlreadyWithNamespace(resource, namespace string) bool 
{
        }
        return strings.Contains(resource, namespace+namespaceSeparator)
 }
+
+func WithoutNamespace(resource string) string {
+       if len(resource) == 0 {
+               return resource
+       }
+       resourceWithoutNamespace := ""
+       if strings.HasPrefix(resource, retryPrefix) {
+               resourceWithoutNamespace += retryPrefix
+       } else if strings.HasPrefix(resource, dlqPrefix) {
+               resourceWithoutNamespace += dlqPrefix
+       }
+       index := strings.LastIndex(resource, namespaceSeparator)
+       if index > 0 {
+               resourceWithoutNamespace += resource[index+1:]
+       } else {
+               resourceWithoutNamespace = resource
+       }
+       return resourceWithoutNamespace
+}

Reply via email to