This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 0742aac  push consumer enable getting offset diff map (#964)
0742aac is described below

commit 0742aac1437d09fac948ea6572ad0107fb8fce66
Author: Kay Du <[email protected]>
AuthorDate: Thu Dec 1 18:33:00 2022 +0800

    push consumer enable getting offset diff map (#964)
    
    Co-authored-by: 筱瑜 <[email protected]>
---
 consumer/process_queue.go |  3 +++
 consumer/push_consumer.go | 24 ++++++++++++++++++++++++
 2 files changed, 27 insertions(+)

diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 49dae7f..aeb66d8 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -60,6 +60,7 @@ type processQueue struct {
        order                      bool
        closeChanOnce              *sync.Once
        closeChan                  chan struct{}
+       maxOffsetInQueue           int64
 }
 
 func newProcessQueue(order bool) *processQueue {
@@ -88,6 +89,7 @@ func newProcessQueue(order bool) *processQueue {
                closeChan:                  make(chan struct{}),
                locked:                     atomic.NewBool(false),
                dropped:                    atomic.NewBool(false),
+               maxOffsetInQueue:           -1,
        }
        return pq
 }
@@ -372,6 +374,7 @@ func (pq *processQueue) clear() {
        pq.cachedMsgCount.Store(0)
        pq.cachedMsgSize.Store(0)
        pq.queueOffsetMax = 0
+       pq.maxOffsetInQueue = -1
 }
 
 func (pq *processQueue) commit() int64 {
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index e91bad0..a3366e6 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -249,6 +249,27 @@ func (pc *pushConsumer) Start() error {
        return err
 }
 
+func (pc *pushConsumer) GetOffsetDiffMap() map[string]int64 {
+       offsetDiffMap := make(map[string]int64)
+       pc.processQueueTable.Range(func(key, value interface{}) bool {
+               mq := key.(primitive.MessageQueue)
+               pq := value.(*processQueue)
+               topic := mq.Topic
+               consumerOffset, _ := pc.storage.readWithException(&mq, 
_ReadFromMemory)
+               maxOffset := pq.maxOffsetInQueue
+               if consumerOffset < 0 || maxOffset < 0 || consumerOffset > 
maxOffset {
+                       return true
+               }
+               if _, ok := offsetDiffMap[topic]; !ok {
+                       offsetDiffMap[topic] = 0
+               }
+               offsetDiff := offsetDiffMap[topic]
+               offsetDiffMap[topic] = offsetDiff + (maxOffset - consumerOffset)
+               return true
+       })
+       return offsetDiffMap
+}
+
 func (pc *pushConsumer) Shutdown() error {
        var err error
        pc.closeOnce.Do(func() {
@@ -823,6 +844,9 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                }
 
                pc.processPullResult(request.mq, result, sd)
+               if result.MaxOffset > pq.maxOffsetInQueue {
+                       pq.maxOffsetInQueue = result.MaxOffset
+               }
 
                switch result.Status {
                case primitive.PullFound:

Reply via email to