This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new ee5a175 fix PullThresholdForQueue and PullThresholdSizeForQueue
update (#1140)
ee5a175 is described below
commit ee5a175188990c0a2fda45adb2e03264a368bf03
Author: wenxuwan <[email protected]>
AuthorDate: Tue Apr 23 17:14:29 2024 +0800
fix PullThresholdForQueue and PullThresholdSizeForQueue update (#1140)
* seperate interface and implement
* fix panic when close tracedispatcher
---
consumer/option.go | 12 ++++++------
consumer/push_consumer.go | 32 ++++++++++++++++----------------
internal/utils/compression.go | 1 +
3 files changed, 23 insertions(+), 22 deletions(-)
diff --git a/consumer/option.go b/consumer/option.go
index 2e08163..608917d 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -45,15 +45,15 @@ type consumerOptions struct {
// Concurrently max span offset.it has no effect on sequential
consumption
ConsumeConcurrentlyMaxSpan int
- // Flow control threshold on queue level, each message queue will cache
at most 1000 messages by default,
+ // Flow control threshold on queue level, each message queue will cache
at most 1024 messages by default,
// Consider the {PullBatchSize}, the instantaneous value may exceed the
limit
- PullThresholdForQueue int64
+ PullThresholdForQueue atomic.Int64
- // Limit the cached message size on queue level, each message queue
will cache at most 100 MiB messages by default,
+ // Limit the cached message size on queue level, each message queue
will cache at most 512 MiB messages by default,
// Consider the {@code pullBatchSize}, the instantaneous value may
exceed the limit
//
// The size of a message only measured by message body, so it's not
accurate
- PullThresholdSizeForQueue int
+ PullThresholdSizeForQueue atomic.Int32
// Flow control threshold on topic level, default value is -1(Unlimited)
//
@@ -198,13 +198,13 @@ func
WithConsumeConcurrentlyMaxSpan(consumeConcurrentlyMaxSpan int) Option {
func WithPullThresholdForQueue(pullThresholdForQueue int64) Option {
return func(options *consumerOptions) {
- options.PullThresholdForQueue = pullThresholdForQueue
+ options.PullThresholdForQueue.Store(pullThresholdForQueue)
}
}
func WithPullThresholdSizeForQueue(pullThresholdSizeForQueue int) Option {
return func(options *consumerOptions) {
- options.PullThresholdSizeForQueue = pullThresholdSizeForQueue
+
options.PullThresholdSizeForQueue.Store(int32(pullThresholdSizeForQueue))
}
}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index c600f13..259980a 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -518,11 +518,11 @@ func (pc *pushConsumer) messageQueueChanged(topic string,
mqAll, mqDivided []*pr
if newVal == 0 {
newVal = 1
}
- rlog.Info("The PullThresholdForTopic is changed",
map[string]interface{}{
- rlog.LogKeyValueChangedFrom:
pc.option.PullThresholdForTopic,
+ rlog.Info("The PullThresholdForQueue is changed",
map[string]interface{}{
+ rlog.LogKeyValueChangedFrom:
pc.option.PullThresholdForQueue.Load(),
rlog.LogKeyValueChangedTo: newVal,
})
- pc.option.PullThresholdForTopic = newVal
+ pc.option.PullThresholdForQueue.Store(int64(newVal))
}
if pc.option.PullThresholdSizeForTopic != -1 {
@@ -530,11 +530,11 @@ func (pc *pushConsumer) messageQueueChanged(topic string,
mqAll, mqDivided []*pr
if newVal == 0 {
newVal = 1
}
- rlog.Info("The PullThresholdSizeForTopic is changed",
map[string]interface{}{
- rlog.LogKeyValueChangedFrom:
pc.option.PullThresholdSizeForTopic,
+ rlog.Info("The PullThresholdSizeForQueue is changed",
map[string]interface{}{
+ rlog.LogKeyValueChangedFrom:
pc.option.PullThresholdSizeForQueue.Load(),
rlog.LogKeyValueChangedTo: newVal,
})
- pc.option.PullThresholdSizeForTopic = newVal
+ pc.option.PullThresholdSizeForQueue.Store(int32(newVal))
}
}
pc.client.SendHeartbeatToAllBrokerWithLock()
@@ -564,9 +564,9 @@ func (pc *pushConsumer) validate() error {
}
}
- if pc.option.PullThresholdForQueue < 1 ||
pc.option.PullThresholdForQueue > 65535 {
- if pc.option.PullThresholdForQueue == 0 {
- pc.option.PullThresholdForQueue = 1024
+ if pc.option.PullThresholdForQueue.Load() < 1 ||
pc.option.PullThresholdForQueue.Load() > 65535 {
+ if pc.option.PullThresholdForQueue.Load() == 0 {
+ pc.option.PullThresholdForQueue.Store(1024)
} else {
return errors.New("option.PullThresholdForQueue out of
range [1, 65535]")
}
@@ -580,9 +580,9 @@ func (pc *pushConsumer) validate() error {
}
}
- if pc.option.PullThresholdSizeForQueue < 1 ||
pc.option.PullThresholdSizeForQueue > 1024 {
- if pc.option.PullThresholdSizeForQueue == 0 {
- pc.option.PullThresholdSizeForQueue = 512
+ if pc.option.PullThresholdSizeForQueue.Load() < 1 ||
pc.option.PullThresholdSizeForQueue.Load() > 1024 {
+ if pc.option.PullThresholdSizeForQueue.Load() == 0 {
+ pc.option.PullThresholdSizeForQueue.Store(512)
} else {
return errors.New("option.PullThresholdSizeForQueue out
of range [1, 1024]")
}
@@ -693,10 +693,10 @@ func (pc *pushConsumer) pullMessage(request *PullRequest)
{
}
cachedMessageSizeInMiB := int(pq.cachedMsgSize.Load() / Mb)
- if pq.cachedMsgCount.Load() > pc.option.PullThresholdForQueue {
+ if pq.cachedMsgCount.Load() >
pc.option.PullThresholdForQueue.Load() {
if pc.queueFlowControlTimes%1000 == 0 {
rlog.Warning("the cached message count exceeds
the threshold, so do flow control", map[string]interface{}{
- "PullThresholdForQueue":
pc.option.PullThresholdForQueue,
+ "PullThresholdForQueue":
pc.option.PullThresholdForQueue.Load(),
"minOffset": pq.Min(),
"maxOffset": pq.Max(),
"count":
pq.cachedMsgCount,
@@ -710,10 +710,10 @@ func (pc *pushConsumer) pullMessage(request *PullRequest)
{
goto NEXT
}
- if cachedMessageSizeInMiB > pc.option.PullThresholdSizeForQueue
{
+ if cachedMessageSizeInMiB >
int(pc.option.PullThresholdSizeForQueue.Load()) {
if pc.queueFlowControlTimes%1000 == 0 {
rlog.Warning("the cached message size exceeds
the threshold, so do flow control", map[string]interface{}{
- "PullThresholdSizeForQueue":
pc.option.PullThresholdSizeForQueue,
+ "PullThresholdSizeForQueue":
pc.option.PullThresholdSizeForQueue.Load(),
"minOffset": pq.Min(),
"maxOffset": pq.Max(),
"count":
pq.cachedMsgCount,
diff --git a/internal/utils/compression.go b/internal/utils/compression.go
index 162864f..11f1791 100644
--- a/internal/utils/compression.go
+++ b/internal/utils/compression.go
@@ -78,6 +78,7 @@ func UnCompress(data []byte) []byte {
if err != nil {
return data
}
+ defer r.Close()
retData, err := ioutil.ReadAll(r)
if err != nil {
return data