This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 5aa04a2 clean expired message in CLUSTERING model for push_consumer
(#879)
5aa04a2 is described below
commit 5aa04a2b2aeb8a4c99dffcb91e6ded9011cc7744
Author: cserwen <[email protected]>
AuthorDate: Wed Aug 3 15:11:55 2022 +0800
clean expired message in CLUSTERING model for push_consumer (#879)
Co-authored-by: dengzhiwen1 <[email protected]>
---
consumer/option.go | 9 ++++++++-
consumer/process_queue.go | 17 ++++++++---------
consumer/push_consumer.go | 30 ++++++++++++++++++++++++++++++
3 files changed, 46 insertions(+), 10 deletions(-)
diff --git a/consumer/option.go b/consumer/option.go
index 1ef184b..86f48f1 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -118,7 +118,8 @@ func defaultPushConsumerOptions() consumerOptions {
ConsumerModel: Clustering,
AutoCommit: true,
Resolver:
primitive.NewHttpResolver("DEFAULT"),
- ConsumeTimestamp:
time.Now().Add(time.Minute * (-30)).Format("20060102150405"),
+ ConsumeTimestamp: time.Now().Add(time.Minute *
(-30)).Format("20060102150405"),
+ ConsumeTimeout: 15 * time.Minute,
}
opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
return opts
@@ -293,3 +294,9 @@ func WithNameServerDomain(nameServerUrl string) Option {
opts.Resolver = primitive.NewHttpResolver("DEFAULT",
nameServerUrl)
}
}
+
+func WithConsumeTimeout(timeout time.Duration) Option {
+ return func(opts *consumerOptions) {
+ opts.ConsumeTimeout = timeout
+ }
+}
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index c07cf7e..92a82a3 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -234,8 +234,8 @@ func (pq *processQueue) isPullExpired() bool {
return time.Now().Sub(pq.LastPullTime()) > _PullMaxIdleTime
}
-func (pq *processQueue) cleanExpiredMsg(consumer *defaultConsumer) {
- if consumer.option.ConsumeOrderly {
+func (pq *processQueue) cleanExpiredMsg(pc *pushConsumer) {
+ if pc.option.ConsumeOrderly {
return
}
var loop = 16
@@ -246,7 +246,7 @@ func (pq *processQueue) cleanExpiredMsg(consumer
*defaultConsumer) {
for i := 0; i < loop; i++ {
pq.mutex.RLock()
if pq.msgCache.Empty() {
- pq.mutex.RLock()
+ pq.mutex.RUnlock()
return
}
_, firstValue := pq.msgCache.Min()
@@ -261,17 +261,16 @@ func (pq *processQueue) cleanExpiredMsg(consumer
*defaultConsumer) {
})
continue
}
- if time.Now().Unix()-st <=
int64(consumer.option.ConsumeTimeout) {
- pq.mutex.RLock()
+ if time.Now().UnixNano()/1e6-st <=
int64(pc.option.ConsumeTimeout/time.Millisecond) {
+ pq.mutex.RUnlock()
return
}
}
- pq.mutex.RLock()
+ pq.mutex.RUnlock()
- err := consumer.sendBack(msg, 3)
- if err != nil {
+ if !pc.sendMessageBack("", msg, int(3+msg.ReconsumeTimes)) {
rlog.Error("send message back to broker error when
clean expired messages", map[string]interface{}{
- rlog.LogKeyUnderlayError: err,
+ rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
continue
}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index cb4768b..d723519 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -170,6 +170,28 @@ func (pc *pushConsumer) Start() error {
}
}()
+ go primitive.WithRecover(func() {
+ if pc.consumeOrderly {
+ return
+ }
+ time.Sleep(pc.option.ConsumeTimeout)
+ pc.cleanExpiredMsg()
+
+ ticker := time.NewTicker(pc.option.ConsumeTimeout)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ pc.cleanExpiredMsg()
+ case <-pc.done:
+ rlog.Info("push consumer close
cleanExpiredMsg listener.", map[string]interface{}{
+ rlog.LogKeyConsumerGroup:
pc.consumerGroup,
+ })
+ return
+ }
+ }
+ })
+
go primitive.WithRecover(func() {
// initial lock.
if !pc.consumeOrderly {
@@ -1292,3 +1314,11 @@ func (pc *pushConsumer)
submitConsumeRequestLater(suspendTimeMillis int64) {
}
time.Sleep(time.Duration(suspendTimeMillis) * time.Millisecond)
}
+
+func (pc *pushConsumer) cleanExpiredMsg() {
+ pc.processQueueTable.Range(func(key, value interface{}) bool {
+ pq := value.(*processQueue)
+ pq.cleanExpiredMsg(pc)
+ return true
+ })
+}