This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 3493a47 feat: support consumer consume tps option (#1101)
3493a47 is described below
commit 3493a4783ab39d810cffc37757b5b005cfa57bd9
Author: WeizhongTu <[email protected]>
AuthorDate: Mon Oct 16 14:36:08 2023 +0800
feat: support consumer consume tps option (#1101)
* feat: support consumer consume tps option
* feat: support consumer consume tps option
---
consumer/consumer.go | 8 ++++++++
consumer/option.go | 28 ++++++++++++++++++++--------
consumer/pull_consumer.go | 4 ++--
consumer/push_consumer.go | 12 ++++++------
4 files changed, 36 insertions(+), 16 deletions(-)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index acce804..4ed4940 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -414,6 +414,14 @@ func (dc *defaultConsumer) doBalance() {
return (mqAll[i].QueueId - mqAll[j].QueueId) < 0
})
allocateResult := dc.allocate(dc.consumerGroup,
dc.client.ClientID(), mqAll, cidAll)
+
+ // Principle of flow control: pull TPS =
1000ms/PullInterval * BatchSize * len(allocateResult)
+ if consumeTPS := dc.option.ConsumeTPS.Load();
consumeTPS > 0 && len(allocateResult) > 0 {
+ pullBatchSize := dc.option.PullBatchSize.Load()
+ pullTimesPerSecond := float64(consumeTPS) /
float64(pullBatchSize*int32(len(allocateResult)))
+
dc.option.PullInterval.Store(time.Duration(float64(time.Second) /
pullTimesPerSecond))
+ }
+
changed := dc.updateProcessQueueTable(topic,
allocateResult)
if changed {
dc.mqChanged(topic, mqAll, allocateResult)
diff --git a/consumer/option.go b/consumer/option.go
index 3f7e6ca..ac7dd93 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -21,6 +21,8 @@ import (
"strings"
"time"
+ "go.uber.org/atomic"
+
"github.com/apache/rocketmq-client-go/v2/hooks"
"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/primitive"
@@ -55,8 +57,8 @@ type consumerOptions struct {
// Flow control threshold on topic level, default value is -1(Unlimited)
//
- // The value of {@code pullThresholdForQueue} will be overwrote and
calculated based on
- // {@code pullThresholdForTopic} if it is't unlimited
+ // The value of {@code pullThresholdForQueue} will be overwritten and
calculated based on
+ // {@code pullThresholdForTopic} if it isn't unlimited
//
// For example, if the value of pullThresholdForTopic is 1000 and 10
message queues are assigned to this consumer,
// then pullThresholdForQueue will be set to 100
@@ -64,21 +66,24 @@ type consumerOptions struct {
// Limit the cached message size on topic level, default value is -1
MiB(Unlimited)
//
- // The value of {@code pullThresholdSizeForQueue} will be overwrote and
calculated based on
- // {@code pullThresholdSizeForTopic} if it is't unlimited
+ // The value of {@code pullThresholdSizeForQueue} will be overwritten
and calculated based on
+ // {@code pullThresholdSizeForTopic} if it isn't unlimited
//
// For example, if the value of pullThresholdSizeForTopic is 1000 MiB
and 10 message queues are
// assigned to this consumer, then pullThresholdSizeForQueue will be
set to 100 MiB
PullThresholdSizeForTopic int
// Message pull Interval
- PullInterval time.Duration
+ PullInterval atomic.Duration
+
+ // Message consumer tps
+ ConsumeTPS atomic.Int32
// Batch consumption size
ConsumeMessageBatchMaxSize int
// Batch pull size
- PullBatchSize int32
+ PullBatchSize atomic.Int32
// Whether update subscription relationship when every pull
PostSubscriptionWhenPull bool
@@ -283,7 +288,7 @@ func WithStrategy(strategy AllocateStrategy) Option {
func WithPullBatchSize(batchSize int32) Option {
return func(options *consumerOptions) {
- options.PullBatchSize = batchSize
+ options.PullBatchSize.Store(batchSize)
}
}
@@ -307,7 +312,14 @@ func WithSuspendCurrentQueueTimeMillis(suspendT
time.Duration) Option {
func WithPullInterval(interval time.Duration) Option {
return func(options *consumerOptions) {
- options.PullInterval = interval
+ options.PullInterval.Store(interval)
+ }
+}
+
+// WithConsumeTPS set single-machine consumption tps
+func WithConsumeTPS(tps int32) Option {
+ return func(options *consumerOptions) {
+ options.ConsumeTPS.Store(tps)
}
}
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 8af88f1..bd05cbf 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -700,7 +700,7 @@ func (pc *defaultPullConsumer) pullMessage(request
*PullRequest) {
time.Sleep(sleepTime)
}
// reset time
- sleepTime = pc.option.PullInterval
+ sleepTime = pc.option.PullInterval.Load()
pq.lastPullTime.Store(time.Now())
err := pc.makeSureStateOK()
if err != nil {
@@ -736,7 +736,7 @@ func (pc *defaultPullConsumer) pullMessage(request
*PullRequest) {
Topic: request.mq.Topic,
QueueId: int32(request.mq.QueueId),
QueueOffset: request.nextOffset,
- MaxMsgNums: pc.option.PullBatchSize,
+ MaxMsgNums: pc.option.PullBatchSize.Load(),
SysFlag: sysFlag,
CommitOffset: 0,
SubExpression: sd.SubString,
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 2799841..ab105b3 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -596,7 +596,7 @@ func (pc *pushConsumer) validate() error {
}
}
- if pc.option.PullInterval < 0 || pc.option.PullInterval >
65535*time.Millisecond {
+ if interval := pc.option.PullInterval.Load(); interval < 0 || interval
> 65535*time.Millisecond {
return errors.New("option.PullInterval out of range [0, 65535]")
}
@@ -608,9 +608,9 @@ func (pc *pushConsumer) validate() error {
}
}
- if pc.option.PullBatchSize < 1 || pc.option.PullBatchSize > 1024 {
- if pc.option.PullBatchSize == 0 {
- pc.option.PullBatchSize = 32
+ if pullBatchSize := pc.option.PullBatchSize.Load(); pullBatchSize < 1
|| pullBatchSize > 1024 {
+ if pullBatchSize == 0 {
+ pc.option.PullBatchSize.Store(32)
} else {
return errors.New("option.PullBatchSize out of range
[1, 1024]")
}
@@ -674,7 +674,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
time.Sleep(sleepTime)
}
// reset time
- sleepTime = pc.option.PullInterval
+ sleepTime = pc.option.PullInterval.Load()
pq.lastPullTime.Store(time.Now())
err := pc.makeSureStateOK()
if err != nil {
@@ -813,7 +813,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
Topic: request.mq.Topic,
QueueId: int32(request.mq.QueueId),
QueueOffset: request.nextOffset,
- MaxMsgNums: pc.option.PullBatchSize,
+ MaxMsgNums: pc.option.PullBatchSize.Load(),
SysFlag: sysFlag,
CommitOffset: commitOffsetValue,
SubExpression: subExpression,