This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new d6e66a2  [ISSUE #568] Update lastPullTime use atomic.Value as same 
with lastConsumeTime and lastLockTime (#613)
d6e66a2 is described below

commit d6e66a2d648d6529eca833f9bf912915d1749a5c
Author: Jerry Tao <[email protected]>
AuthorDate: Tue Mar 16 20:24:24 2021 +0800

    [ISSUE #568] Update lastPullTime use atomic.Value as same with 
lastConsumeTime and lastLockTime (#613)
---
 consumer/process_queue.go | 19 +++++++++++++++----
 consumer/push_consumer.go |  2 +-
 2 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 19e831b..a306470 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -50,7 +50,7 @@ type processQueue struct {
        consumeLock                sync.Mutex
        consumingMsgOrderlyTreeMap *treemap.Map
        dropped                    *uatomic.Bool
-       lastPullTime               time.Time
+       lastPullTime               atomic.Value
        lastConsumeTime            atomic.Value
        locked                     *uatomic.Bool
        lastLockTime               atomic.Value
@@ -69,9 +69,12 @@ func newProcessQueue(order bool) *processQueue {
        lastLockTime := atomic.Value{}
        lastLockTime.Store(time.Now())
 
+       lastPullTime := atomic.Value{}
+       lastPullTime.Store(time.Now())
+
        pq := &processQueue{
                msgCache:                   
treemap.NewWith(utils.Int64Comparator),
-               lastPullTime:               time.Now(),
+               lastPullTime:               lastPullTime,
                lastConsumeTime:            lastConsumeTime,
                lastLockTime:               lastLockTime,
                msgCh:                      make(chan []*primitive.MessageExt, 
32),
@@ -157,6 +160,14 @@ func (pq *processQueue) LastLockTime() time.Time {
        return pq.lastLockTime.Load().(time.Time)
 }
 
+func (pq *processQueue) LastPullTime() time.Time {
+       return pq.lastPullTime.Load().(time.Time)
+}
+
+func (pq *processQueue) UpdateLastPullTime() {
+       pq.lastPullTime.Store(time.Now())
+}
+
 func (pq *processQueue) makeMessageToCosumeAgain(messages 
...*primitive.MessageExt) {
        pq.mutex.Lock()
        for _, msg := range messages {
@@ -199,7 +210,7 @@ func (pq *processQueue) isLockExpired() bool {
 }
 
 func (pq *processQueue) isPullExpired() bool {
-       return time.Now().Sub(pq.lastPullTime) > _PullMaxIdleTime
+       return time.Now().Sub(pq.LastPullTime()) > _PullMaxIdleTime
 }
 
 func (pq *processQueue) cleanExpiredMsg(consumer defaultConsumer) {
@@ -360,7 +371,7 @@ func (pq *processQueue) currentInfo() 
internal.ProcessQueueInfo {
                TryUnlockTimes:       pq.tryUnlockTimes,
                LastLockTimestamp:    pq.LastLockTime().UnixNano() / 
int64(time.Millisecond),
                Dropped:              pq.dropped.Load(),
-               LastPullTimestamp:    pq.lastPullTime.UnixNano() / 
int64(time.Millisecond),
+               LastPullTimestamp:    pq.LastPullTime().UnixNano() / 
int64(time.Millisecond),
                LastConsumeTimestamp: pq.LastConsumeTime().UnixNano() / 
int64(time.Millisecond),
        }
 
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 393a0e4..59bfd12 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -562,7 +562,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                }
                // reset time
                sleepTime = pc.option.PullInterval
-               pq.lastPullTime = time.Now()
+               pq.lastPullTime.Store(time.Now())
                err := pc.makeSureStateOK()
                if err != nil {
                        rlog.Warning("consumer state error", 
map[string]interface{}{

Reply via email to