This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new c8064e2  [INLONG-630]Go SDK GetCurrConsumedInfo and GetClientID API 
(#493)
c8064e2 is described below

commit c8064e2dfff28eb9e6eeb73d49781376759db4b1
Author: Zijie Lu <[email protected]>
AuthorDate: Sat Jul 3 15:52:59 2021 +0800

    [INLONG-630]Go SDK GetCurrConsumedInfo and GetClientID API (#493)
    
    Signed-off-by: Zijie Lu <[email protected]>
---
 .../tubemq-client-go/client/consumer.go                |  8 ++++++--
 .../tubemq-client-go/client/consumer_impl.go           | 18 ++++++++++++++++--
 tubemq-client-twins/tubemq-client-go/remote/remote.go  |  7 +++++++
 3 files changed, 29 insertions(+), 4 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go 
b/tubemq-client-twins/tubemq-client-go/client/consumer.go
index bb17f03..d10e4a6 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -27,8 +27,10 @@ type ConsumerResult struct {
        messages       []*Message
 }
 
-// ConsumerOffset of a consumption,
+// ConsumerOffset of a consumption.
 type ConsumerOffset struct {
+       partitionKey string
+       currOffset   int64
 }
 
 var clientID uint64
@@ -40,7 +42,9 @@ type Consumer interface {
        // Confirm the consumption of a message.
        Confirm(confirmContext string, consumed bool) (*ConsumerResult, error)
        // GetCurrConsumedInfo returns the consumptions of the consumer.
-       GetCurrConsumedInfo() (map[string]*ConsumerOffset, error)
+       GetCurrConsumedInfo() map[string]*ConsumerOffset
        // Close closes the consumer client and release the resources.
        Close() error
+       // GetClientID returns the clientID of the consumer.
+       GetClientID() string
 }
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go 
b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 616be17..8163de4 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -316,8 +316,22 @@ func parsePartitionKeyToTopic(partitionKey string) 
(string, error) {
 }
 
 // GetCurrConsumedInfo implementation of TubeMQ consumer.
-func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
-       panic("implement me")
+func (c *consumer) GetCurrConsumedInfo() map[string]*ConsumerOffset {
+       partitionOffset := c.rmtDataCache.GetCurPartitionOffset()
+       consumedInfo := make(map[string]*ConsumerOffset, len(partitionOffset))
+       for partition, offset := range partitionOffset {
+               co := &ConsumerOffset{
+                       partitionKey: partition,
+                       currOffset:   offset,
+               }
+               consumedInfo[partition] = co
+       }
+       return consumedInfo
+}
+
+// GetClientID implementation of TubeMQ consumer.
+func (c *consumer) GetClientID() string {
+       return c.clientID
 }
 
 // Close implementation of TubeMQ consumer.
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go 
b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index c645d3e..8246a56 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -514,3 +514,10 @@ func (r *RmtDataCache) GetAllClosedBrokerParts() 
map[*metadata.Node][]*metadata.
        }
        return brokerPartitions
 }
+
+// GetCurPartitionOffset returns the partition to offset map.
+func (r *RmtDataCache) GetCurPartitionOffset() map[string]int64 {
+       r.dataBookMu.Lock()
+       defer r.dataBookMu.Unlock()
+       return r.partitionOffset
+}

Reply via email to