This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 5aa04a2  clean expired message in CLUSTERING model for push_consumer 
(#879)
5aa04a2 is described below

commit 5aa04a2b2aeb8a4c99dffcb91e6ded9011cc7744
Author: cserwen <[email protected]>
AuthorDate: Wed Aug 3 15:11:55 2022 +0800

    clean expired message in CLUSTERING model for push_consumer (#879)
    
    Co-authored-by: dengzhiwen1 <[email protected]>
---
 consumer/option.go        |  9 ++++++++-
 consumer/process_queue.go | 17 ++++++++---------
 consumer/push_consumer.go | 30 ++++++++++++++++++++++++++++++
 3 files changed, 46 insertions(+), 10 deletions(-)

diff --git a/consumer/option.go b/consumer/option.go
index 1ef184b..86f48f1 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -118,7 +118,8 @@ func defaultPushConsumerOptions() consumerOptions {
                ConsumerModel:              Clustering,
                AutoCommit:                 true,
                Resolver:                   
primitive.NewHttpResolver("DEFAULT"),
-               ConsumeTimestamp:                       
time.Now().Add(time.Minute * (-30)).Format("20060102150405"),
+               ConsumeTimestamp:           time.Now().Add(time.Minute * 
(-30)).Format("20060102150405"),
+               ConsumeTimeout:             15 * time.Minute,
        }
        opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
        return opts
@@ -293,3 +294,9 @@ func WithNameServerDomain(nameServerUrl string) Option {
                opts.Resolver = primitive.NewHttpResolver("DEFAULT", 
nameServerUrl)
        }
 }
+
+func WithConsumeTimeout(timeout time.Duration) Option {
+       return func(opts *consumerOptions) {
+               opts.ConsumeTimeout = timeout
+       }
+}
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index c07cf7e..92a82a3 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -234,8 +234,8 @@ func (pq *processQueue) isPullExpired() bool {
        return time.Now().Sub(pq.LastPullTime()) > _PullMaxIdleTime
 }
 
-func (pq *processQueue) cleanExpiredMsg(consumer *defaultConsumer) {
-       if consumer.option.ConsumeOrderly {
+func (pq *processQueue) cleanExpiredMsg(pc *pushConsumer) {
+       if pc.option.ConsumeOrderly {
                return
        }
        var loop = 16
@@ -246,7 +246,7 @@ func (pq *processQueue) cleanExpiredMsg(consumer 
*defaultConsumer) {
        for i := 0; i < loop; i++ {
                pq.mutex.RLock()
                if pq.msgCache.Empty() {
-                       pq.mutex.RLock()
+                       pq.mutex.RUnlock()
                        return
                }
                _, firstValue := pq.msgCache.Min()
@@ -261,17 +261,16 @@ func (pq *processQueue) cleanExpiredMsg(consumer 
*defaultConsumer) {
                                })
                                continue
                        }
-                       if time.Now().Unix()-st <= 
int64(consumer.option.ConsumeTimeout) {
-                               pq.mutex.RLock()
+                       if time.Now().UnixNano()/1e6-st <= 
int64(pc.option.ConsumeTimeout/time.Millisecond) {
+                               pq.mutex.RUnlock()
                                return
                        }
                }
-               pq.mutex.RLock()
+               pq.mutex.RUnlock()
 
-               err := consumer.sendBack(msg, 3)
-               if err != nil {
+               if !pc.sendMessageBack("", msg, int(3+msg.ReconsumeTimes)) {
                        rlog.Error("send message back to broker error when 
clean expired messages", map[string]interface{}{
-                               rlog.LogKeyUnderlayError: err,
+                               rlog.LogKeyConsumerGroup: pc.consumerGroup,
                        })
                        continue
                }
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index cb4768b..d723519 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -170,6 +170,28 @@ func (pc *pushConsumer) Start() error {
                        }
                }()
 
+               go primitive.WithRecover(func() {
+                       if pc.consumeOrderly {
+                               return
+                       }
+                       time.Sleep(pc.option.ConsumeTimeout)
+                       pc.cleanExpiredMsg()
+
+                       ticker := time.NewTicker(pc.option.ConsumeTimeout)
+                       defer ticker.Stop()
+                       for {
+                               select {
+                               case <-ticker.C:
+                                       pc.cleanExpiredMsg()
+                               case <-pc.done:
+                                       rlog.Info("push consumer close 
cleanExpiredMsg listener.", map[string]interface{}{
+                                               rlog.LogKeyConsumerGroup: 
pc.consumerGroup,
+                                       })
+                                       return
+                               }
+                       }
+               })
+
                go primitive.WithRecover(func() {
                        // initial lock.
                        if !pc.consumeOrderly {
@@ -1292,3 +1314,11 @@ func (pc *pushConsumer) 
submitConsumeRequestLater(suspendTimeMillis int64) {
        }
        time.Sleep(time.Duration(suspendTimeMillis) * time.Millisecond)
 }
+
+func (pc *pushConsumer) cleanExpiredMsg() {
+       pc.processQueueTable.Range(func(key, value interface{}) bool {
+               pq := value.(*processQueue)
+               pq.cleanExpiredMsg(pc)
+               return true
+       })
+}

Reply via email to