This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new 968eae0  [INLONG-1427]Go SDK return maxOffset and updateTime in 
ConsumerOffset (#1428)
968eae0 is described below

commit 968eae037dbde0252d75db4296fe0c2c89615b00
Author: Zijie Lu <[email protected]>
AuthorDate: Mon Aug 9 14:59:55 2021 +0800

    [INLONG-1427]Go SDK return maxOffset and updateTime in ConsumerOffset 
(#1428)
    
    Signed-off-by: Zijie Lu <[email protected]>
---
 .../tubemq-client-go/client/consumer.go            | 12 +++----
 .../tubemq-client-go/client/consumer_impl.go       | 24 +++++++------
 .../tubemq-client-go/remote/remote.go              | 39 ++++++++++++++++++----
 3 files changed, 50 insertions(+), 25 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go 
b/tubemq-client-twins/tubemq-client-go/client/consumer.go
index bea0223..b805c30 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -19,6 +19,10 @@
 // which can be exposed to user.
 package client
 
+import (
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+)
+
 // ConsumerResult of a consumption.
 type ConsumerResult struct {
        TopicName      string
@@ -27,12 +31,6 @@ type ConsumerResult struct {
        Messages       []*Message
 }
 
-// ConsumerOffset of a consumption.
-type ConsumerOffset struct {
-       PartitionKey string
-       CurrOffset   int64
-}
-
 var clientID uint64
 
 // Consumer is an interface that abstracts behavior of TubeMQ's consumer
@@ -42,7 +40,7 @@ type Consumer interface {
        // Confirm the consumption of a message.
        Confirm(confirmContext string, consumed bool) (*ConsumerResult, error)
        // GetCurrConsumedInfo returns the consumptions of the consumer.
-       GetCurrConsumedInfo() map[string]*ConsumerOffset
+       GetCurrConsumedInfo() map[string]*remote.ConsumerOffset
        // Close closes the consumer client and release the resources.
        Close() error
        // GetClientID returns the clientID of the consumer.
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go 
b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index a4bc2f2..e42319b 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -286,7 +286,7 @@ func (c *consumer) Confirm(confirmContext string, consumed 
bool) (*ConsumerResul
                return cs, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
        }
        currOffset := rsp.GetCurrOffset()
-       c.rmtDataCache.BookPartitionInfo(partitionKey, currOffset)
+       c.rmtDataCache.BookPartitionInfo(partitionKey, currOffset, 
util.InvalidValue)
        err = c.rmtDataCache.ReleasePartition(true, 
c.subInfo.IsFiltered(topic), confirmContext, consumed)
        return cs, err
 }
@@ -324,15 +324,17 @@ func parsePartitionKeyToTopic(partitionKey string) 
(string, error) {
 }
 
 // GetCurrConsumedInfo implementation of TubeMQ consumer.
-func (c *consumer) GetCurrConsumedInfo() map[string]*ConsumerOffset {
+func (c *consumer) GetCurrConsumedInfo() map[string]*remote.ConsumerOffset {
        partitionOffset := c.rmtDataCache.GetCurPartitionOffset()
-       consumedInfo := make(map[string]*ConsumerOffset, len(partitionOffset))
-       for partition, offset := range partitionOffset {
-               co := &ConsumerOffset{
-                       PartitionKey: partition,
-                       CurrOffset:   offset,
+       consumedInfo := make(map[string]*remote.ConsumerOffset, 
len(partitionOffset))
+       for partitionKey, offset := range partitionOffset {
+               co := &remote.ConsumerOffset{
+                       PartitionKey: partitionKey,
+                       CurrOffset:   offset.CurrOffset,
+                       MaxOffset:    offset.MaxOffset,
+                       UpdateTime:   offset.UpdateTime,
                }
-               consumedInfo[partition] = co
+               consumedInfo[partitionKey] = co
        }
        return consumedInfo
 }
@@ -581,7 +583,7 @@ func (c *consumer) processGetMessageRspB2C(pi *PeerInfo, 
filtered bool, partitio
                        maxOffset = rsp.GetMaxOffset()
                }
                msgSize, msgs := c.convertMessages(filtered, 
partition.GetTopic(), rsp)
-               c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), 
currOffset)
+               c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), 
currOffset, maxOffset)
                cd := metadata.NewConsumeData(now, 200, escLimit, 
int32(msgSize), 0, dataDleVal, rsp.GetRequireSlow())
                c.rmtDataCache.BookConsumeData(partition.GetPartitionKey(), cd)
                pi.CurrOffset = currOffset
@@ -601,7 +603,7 @@ func (c *consumer) processGetMessageRspB2C(pi *PeerInfo, 
filtered bool, partitio
                        defDltTime = 
c.config.Consumer.MsgNotFoundWait.Milliseconds()
                }
                cd := metadata.NewConsumeData(now, rsp.GetErrCode(), false, 0, 
limitDlt, defDltTime, rsp.GetRequireSlow())
-               c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), 
util.InvalidValue)
+               c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), 
util.InvalidValue, util.InvalidValue)
                c.rmtDataCache.BookConsumeData(partition.GetPartitionKey(), cd)
                return nil, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
        case errs.RetErrNotFound:
@@ -614,7 +616,7 @@ func (c *consumer) processGetMessageRspB2C(pi *PeerInfo, 
filtered bool, partitio
        }
        if rsp.GetErrCode() != errs.RetSuccess {
                cd := metadata.NewConsumeData(now, rsp.GetErrCode(), false, 0, 
limitDlt, util.InvalidValue, rsp.GetRequireSlow())
-               c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), 
util.InvalidValue)
+               c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), 
util.InvalidValue, util.InvalidValue)
                c.rmtDataCache.BookConsumeData(partition.GetPartitionKey(), cd)
                c.rmtDataCache.ReleasePartition(true, filtered, confirmContext, 
false)
                return nil, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go 
b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 5545e0d..6e66af9 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -53,13 +53,21 @@ type RmtDataCache struct {
        partitionTimeouts  map[string]*time.Timer
        topicPartitions    map[string]map[string]bool
        partitionRegBooked map[string]bool
-       partitionOffset    map[string]int64
+       partitionOffset    map[string]*ConsumerOffset
        groupHandler       *flowctrl.RuleHandler
        defHandler         *flowctrl.RuleHandler
        // EventCh is the channel for consumer to consume
        EventCh chan *metadata.ConsumerEvent
 }
 
+// ConsumerOffset of a consumption.
+type ConsumerOffset struct {
+       PartitionKey string
+       CurrOffset   int64
+       MaxOffset    int64
+       UpdateTime   int64
+}
+
 // NewRmtDataCache returns a default rmtDataCache.
 func NewRmtDataCache() *RmtDataCache {
        r := &RmtDataCache{
@@ -73,7 +81,7 @@ func NewRmtDataCache() *RmtDataCache {
                partitionTimeouts:  make(map[string]*time.Timer),
                topicPartitions:    make(map[string]map[string]bool),
                partitionRegBooked: make(map[string]bool),
-               partitionOffset:    make(map[string]int64),
+               partitionOffset:    make(map[string]*ConsumerOffset),
                groupHandler:       flowctrl.NewRuleHandler(),
                defHandler:         flowctrl.NewRuleHandler(),
                EventCh:            make(chan *metadata.ConsumerEvent, 1),
@@ -461,11 +469,28 @@ func (r *RmtDataCache) 
removeFromIndexPartitions(partitionKey string) {
        r.indexPartitions = append(r.indexPartitions[:pos], 
r.indexPartitions[pos+1:]...)
 }
 
-func (r *RmtDataCache) BookPartitionInfo(partitionKey string, currOffset 
int64) {
+func (r *RmtDataCache) BookPartitionInfo(partitionKey string, currOffset 
int64, maxOffset int64) {
+       r.dataBookMu.Lock()
+       defer r.dataBookMu.Unlock()
+       if _, ok := r.partitionOffset[partitionKey]; !ok {
+               co := &ConsumerOffset{
+                       CurrOffset:   util.InvalidValue,
+                       PartitionKey: partitionKey,
+                       MaxOffset:    util.InvalidValue,
+                       UpdateTime:   util.InvalidValue,
+               }
+               r.partitionOffset[partitionKey] = co
+       }
+       updated := false
+       co := r.partitionOffset[partitionKey]
        if currOffset >= 0 {
-               r.dataBookMu.Lock()
-               defer r.dataBookMu.Unlock()
-               r.partitionOffset[partitionKey] = currOffset
+               co.CurrOffset = currOffset
+       }
+       if maxOffset >= 0 {
+               co.MaxOffset = maxOffset
+       }
+       if updated {
+               co.UpdateTime = time.Now().UnixNano() / int64(time.Millisecond)
        }
 }
 
@@ -519,7 +544,7 @@ func (r *RmtDataCache) GetAllClosedBrokerParts() 
map[*metadata.Node][]*metadata.
 }
 
 // GetCurPartitionOffset returns the partition to offset map.
-func (r *RmtDataCache) GetCurPartitionOffset() map[string]int64 {
+func (r *RmtDataCache) GetCurPartitionOffset() map[string]*ConsumerOffset {
        r.dataBookMu.Lock()
        defer r.dataBookMu.Unlock()
        return r.partitionOffset

Reply via email to