This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new e1ddb88  Add param which like consumeThreadMin of java sdk to control 
consumption rate (#883)
e1ddb88 is described below

commit e1ddb88fd6f5b77d7abbbf3c9bb3ce9b93ea8181
Author: cserwen <[email protected]>
AuthorDate: Sun Aug 7 15:29:56 2022 +0800

    Add param which like consumeThreadMin of java sdk to control consumption 
rate (#883)
    
    * add param which like consumeThreadMin of java sdk to control consumption 
rate
    
    * make some configs can be set for push_consumer
    
    Co-authored-by: dengzhiwen1 <[email protected]>
---
 consumer/option.go        | 33 +++++++++++++++++++++++++++++++++
 consumer/push_consumer.go |  6 ++++++
 2 files changed, 39 insertions(+)

diff --git a/consumer/option.go b/consumer/option.go
index 86f48f1..3b035a3 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -106,6 +106,8 @@ type consumerOptions struct {
        RebalanceLockInterval time.Duration
 
        Resolver primitive.NsResolver
+
+       ConsumeGoroutineNums int
 }
 
 func defaultPushConsumerOptions() consumerOptions {
@@ -120,6 +122,7 @@ func defaultPushConsumerOptions() consumerOptions {
                Resolver:                   
primitive.NewHttpResolver("DEFAULT"),
                ConsumeTimestamp:           time.Now().Add(time.Minute * 
(-30)).Format("20060102150405"),
                ConsumeTimeout:             15 * time.Minute,
+               ConsumeGoroutineNums:       20,
        }
        opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
        return opts
@@ -178,6 +181,30 @@ func 
WithConsumeConcurrentlyMaxSpan(consumeConcurrentlyMaxSpan int) Option {
        }
 }
 
+func WithPullThresholdForQueue(pullThresholdForQueue int64) Option {
+       return func(options *consumerOptions) {
+               options.PullThresholdForQueue = pullThresholdForQueue
+       }
+}
+
+func WithPullThresholdSizeForQueue(pullThresholdSizeForQueue int) Option {
+       return func(options *consumerOptions) {
+               options.PullThresholdSizeForQueue = pullThresholdSizeForQueue
+       }
+}
+
+func WithPullThresholdForTopic(pullThresholdForTopic int) Option {
+       return func(options *consumerOptions) {
+               options.PullThresholdForTopic = pullThresholdForTopic
+       }
+}
+
+func WithPullThresholdSizeForTopic(pullThresholdSizeForTopic int) Option {
+       return func(options *consumerOptions) {
+               options.PullThresholdSizeForTopic = pullThresholdSizeForTopic
+       }
+}
+
 // WithChainConsumerInterceptor returns a ConsumerOption that specifies the 
chained interceptor for consumer.
 // The first interceptor will be the outer most, while the last interceptor 
will be the inner most wrapper
 // around the real call.
@@ -300,3 +327,9 @@ func WithConsumeTimeout(timeout time.Duration) Option {
                opts.ConsumeTimeout = timeout
        }
 }
+
+func WithConsumeGoroutineNums(nums int) Option {
+       return func(opts *consumerOptions) {
+               opts.ConsumeGoroutineNums = nums
+       }
+}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index a5884d8..41d7e6d 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -72,6 +72,7 @@ type pushConsumer struct {
        queueLock                    *QueueLock
        done                         chan struct{}
        closeOnce                    sync.Once
+       crCh                         chan struct{}
 }
 
 func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
@@ -115,6 +116,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) 
{
                queueLock:       newQueueLock(),
                done:            make(chan struct{}, 1),
                consumeFunc:     utils.NewSet(),
+               crCh:            make(chan struct{}, 
defaultOpts.ConsumeGoroutineNums),
        }
        dc.mqChanged = p.messageQueueChanged
        if p.consumeOrderly {
@@ -1023,6 +1025,8 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                        subMsgs = msgs[count:next]
                        count = next - 1
                }
+
+               pc.crCh <- struct{}{}
                go primitive.WithRecover(func() {
                RETRY:
                        if pq.IsDroppd() {
@@ -1030,6 +1034,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                                        rlog.LogKeyMessageQueue:  mq.String(),
                                        rlog.LogKeyConsumerGroup: 
pc.consumerGroup,
                                })
+                               <-pc.crCh
                                return
                        }
 
@@ -1109,6 +1114,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                                        "message":               subMsgs,
                                })
                        }
+                       <-pc.crCh
                })
        }
 }

Reply via email to