This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new e1ddb88 Add param which like consumeThreadMin of java sdk to control
consumption rate (#883)
e1ddb88 is described below
commit e1ddb88fd6f5b77d7abbbf3c9bb3ce9b93ea8181
Author: cserwen <[email protected]>
AuthorDate: Sun Aug 7 15:29:56 2022 +0800
Add param which like consumeThreadMin of java sdk to control consumption
rate (#883)
* add param which like consumeThreadMin of java sdk to control consumption
rate
* make some configs can be set for push_consumer
Co-authored-by: dengzhiwen1 <[email protected]>
---
consumer/option.go | 33 +++++++++++++++++++++++++++++++++
consumer/push_consumer.go | 6 ++++++
2 files changed, 39 insertions(+)
diff --git a/consumer/option.go b/consumer/option.go
index 86f48f1..3b035a3 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -106,6 +106,8 @@ type consumerOptions struct {
RebalanceLockInterval time.Duration
Resolver primitive.NsResolver
+
+ ConsumeGoroutineNums int
}
func defaultPushConsumerOptions() consumerOptions {
@@ -120,6 +122,7 @@ func defaultPushConsumerOptions() consumerOptions {
Resolver:
primitive.NewHttpResolver("DEFAULT"),
ConsumeTimestamp: time.Now().Add(time.Minute *
(-30)).Format("20060102150405"),
ConsumeTimeout: 15 * time.Minute,
+ ConsumeGoroutineNums: 20,
}
opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
return opts
@@ -178,6 +181,30 @@ func
WithConsumeConcurrentlyMaxSpan(consumeConcurrentlyMaxSpan int) Option {
}
}
+func WithPullThresholdForQueue(pullThresholdForQueue int64) Option {
+ return func(options *consumerOptions) {
+ options.PullThresholdForQueue = pullThresholdForQueue
+ }
+}
+
+func WithPullThresholdSizeForQueue(pullThresholdSizeForQueue int) Option {
+ return func(options *consumerOptions) {
+ options.PullThresholdSizeForQueue = pullThresholdSizeForQueue
+ }
+}
+
+func WithPullThresholdForTopic(pullThresholdForTopic int) Option {
+ return func(options *consumerOptions) {
+ options.PullThresholdForTopic = pullThresholdForTopic
+ }
+}
+
+func WithPullThresholdSizeForTopic(pullThresholdSizeForTopic int) Option {
+ return func(options *consumerOptions) {
+ options.PullThresholdSizeForTopic = pullThresholdSizeForTopic
+ }
+}
+
// WithChainConsumerInterceptor returns a ConsumerOption that specifies the
chained interceptor for consumer.
// The first interceptor will be the outer most, while the last interceptor
will be the inner most wrapper
// around the real call.
@@ -300,3 +327,9 @@ func WithConsumeTimeout(timeout time.Duration) Option {
opts.ConsumeTimeout = timeout
}
}
+
+func WithConsumeGoroutineNums(nums int) Option {
+ return func(opts *consumerOptions) {
+ opts.ConsumeGoroutineNums = nums
+ }
+}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index a5884d8..41d7e6d 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -72,6 +72,7 @@ type pushConsumer struct {
queueLock *QueueLock
done chan struct{}
closeOnce sync.Once
+ crCh chan struct{}
}
func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
@@ -115,6 +116,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error)
{
queueLock: newQueueLock(),
done: make(chan struct{}, 1),
consumeFunc: utils.NewSet(),
+ crCh: make(chan struct{},
defaultOpts.ConsumeGoroutineNums),
}
dc.mqChanged = p.messageQueueChanged
if p.consumeOrderly {
@@ -1023,6 +1025,8 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
subMsgs = msgs[count:next]
count = next - 1
}
+
+ pc.crCh <- struct{}{}
go primitive.WithRecover(func() {
RETRY:
if pq.IsDroppd() {
@@ -1030,6 +1034,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
rlog.LogKeyMessageQueue: mq.String(),
rlog.LogKeyConsumerGroup:
pc.consumerGroup,
})
+ <-pc.crCh
return
}
@@ -1109,6 +1114,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
"message": subMsgs,
})
}
+ <-pc.crCh
})
}
}