This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new f9cc5ad  [INLONG-628]Go SDK Confirm API (#483)
f9cc5ad is described below

commit f9cc5addbea6ca1419bc433d81156685e73e0171
Author: Zijie Lu <[email protected]>
AuthorDate: Thu Jul 1 15:37:29 2021 +0800

    [INLONG-628]Go SDK Confirm API (#483)
---
 .../tubemq-client-go/client/consumer_impl.go       | 73 +++++++++++++++++++++-
 tubemq-client-twins/tubemq-client-go/errs/errs.go  |  2 +
 .../tubemq-client-go/metadata/partition.go         |  7 ++-
 .../tubemq-client-go/remote/remote.go              | 24 +++++++
 4 files changed, 102 insertions(+), 4 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go 
b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 6db4f40..031d761 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -20,9 +20,11 @@ package client
 import (
        "context"
        "encoding/binary"
+       "fmt"
        "hash/crc32"
        "os"
        "strconv"
+       "strings"
        "sync/atomic"
        "time"
 
@@ -241,7 +243,76 @@ func (c *consumer) GetMessage() (*ConsumerResult, error) {
 
 // Confirm implementation of TubeMQ consumer.
 func (c *consumer) Confirm(confirmContext string, consumed bool) 
(*ConsumerResult, error) {
-       panic("implement me")
+       partitionKey, bookedTime, err := 
util.ParseConfirmContext(confirmContext)
+       if err != nil {
+               return nil, errs.New(errs.RetBadRequest, "illegel 
confirm_context content: unregular confirm_context value format")
+       }
+       topic, err := parsePartitionKeyToTopic(partitionKey)
+       if err != nil {
+               return nil, errs.New(errs.RetBadRequest, err.Error())
+       }
+       if !c.rmtDataCache.IsPartitionInUse(partitionKey, bookedTime) {
+               return nil, errs.New(errs.RetErrConfirmTimeout, "The 
confirm_context's value invalid!")
+       }
+       partition := c.rmtDataCache.GetPartition(partitionKey)
+       if partition == nil {
+               return nil, errs.New(errs.RetErrConfirmTimeout, "Not found the 
partition by confirm_context!")
+       }
+
+       rsp, err := c.sendConfirmReq2Broker(partition)
+       if err != nil {
+               return nil, err
+       }
+
+       pi := &PeerInfo{
+               brokerHost:   partition.GetBroker().GetHost(),
+               partitionID:  uint32(partition.GetPartitionID()),
+               partitionKey: partition.GetPartitionKey(),
+               currOffset:   util.InvalidValue,
+       }
+       cs := &ConsumerResult{
+               topicName: partition.GetTopic(),
+               peerInfo:  pi,
+       }
+       if !rsp.GetSuccess() {
+               return cs, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
+       }
+       currOffset := rsp.GetCurrOffset()
+       c.rmtDataCache.BookPartitionInfo(partitionKey, currOffset)
+       err = c.rmtDataCache.ReleasePartition(true, 
c.subInfo.IsFiltered(topic), confirmContext, consumed)
+       return cs, err
+}
+
+func (c *consumer) sendConfirmReq2Broker(partition *metadata.Partition) 
(*protocol.CommitOffsetResponseB2C, error) {
+       m := &metadata.Metadata{}
+       node := &metadata.Node{}
+       node.SetHost(util.GetLocalHost())
+       node.SetAddress(partition.GetBroker().GetAddress())
+       m.SetNode(node)
+       sub := &metadata.SubscribeInfo{}
+       sub.SetGroup(c.config.Consumer.Group)
+       sub.SetPartition(partition)
+       m.SetSubscribeInfo(sub)
+
+       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout-500)
+       defer cancel()
+
+       rsp, err := c.client.CommitOffsetRequestC2B(ctx, m, c.subInfo)
+       return rsp, err
+}
+
+func parsePartitionKeyToTopic(partitionKey string) (string, error) {
+       pos1 := strings.Index(partitionKey, ":")
+       if pos1 == -1 {
+               return "", fmt.Errorf("illegel confirm_context content: 
unregular index key value format")
+       }
+       topic := partitionKey[pos1+1:]
+       pos2 := strings.LastIndex(topic, ":")
+       if pos2 == -1 {
+               return "", fmt.Errorf("illegel confirm_context content: 
unregular index's topic key value format")
+       }
+       topic = topic[:pos2]
+       return topic, nil
 }
 
 // GetCurrConsumedInfo implementation of TubeMQ consumer.
diff --git a/tubemq-client-twins/tubemq-client-go/errs/errs.go 
b/tubemq-client-twins/tubemq-client-go/errs/errs.go
index 20427b5..ec94229 100644
--- a/tubemq-client-twins/tubemq-client-go/errs/errs.go
+++ b/tubemq-client-twins/tubemq-client-go/errs/errs.go
@@ -37,6 +37,7 @@ const (
        RetSelectorNotExist        = 6
        RetSuccess                 = 200
        RetErrMoved                = 301
+       RetBadRequest              = 400
        RetErrForbidden            = 403
        RetErrNotFound             = 404
        RetErrNoPartAssigned       = 406
@@ -49,6 +50,7 @@ const (
        RetConsumeContentForbidden = 455
        RetErrServiceUnavailable   = 503
        RetErrConsumeSpeedLimit    = 550
+       RetErrConfirmTimeout       = 2004
 )
 
 // ErrAssertionFailure represents RetAssertionFailure error.
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go 
b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
index cbf8bae..f9a951d 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/partition.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
@@ -97,9 +97,10 @@ func NewPartition(partition string) (*Partition, error) {
                }
        }
        return &Partition{
-               topic:       topic,
-               broker:      b,
-               partitionID: int32(partitionID),
+               topic:        topic,
+               broker:       b,
+               partitionID:  int32(partitionID),
+               strategyData: &strategyData{},
        }, nil
 }
 
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go 
b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 873acad..a5310e7 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -473,3 +473,27 @@ func (r *RmtDataCache) BookConsumeData(partitionKey 
string, data *metadata.Consu
                partition.BookConsumeData(data)
        }
 }
+
+func (r *RmtDataCache) IsPartitionInUse(partitionKey string, bookedTime int64) 
bool {
+       r.metaMu.Lock()
+       defer r.metaMu.Unlock()
+
+       bt, ok := r.usedPartitions[partitionKey]
+       if !ok {
+               return false
+       }
+       if bt != bookedTime {
+               return false
+       }
+       return true
+}
+
+func (r *RmtDataCache) GetPartition(key string) *metadata.Partition {
+       r.metaMu.Lock()
+       defer r.metaMu.Unlock()
+
+       if partition, ok := r.partitions[key]; ok {
+               return partition
+       }
+       return nil
+}

Reply via email to