This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-25 by this push:
new f9cc5ad [INLONG-628]Go SDK Confirm API (#483)
f9cc5ad is described below
commit f9cc5addbea6ca1419bc433d81156685e73e0171
Author: Zijie Lu <[email protected]>
AuthorDate: Thu Jul 1 15:37:29 2021 +0800
[INLONG-628]Go SDK Confirm API (#483)
---
.../tubemq-client-go/client/consumer_impl.go | 73 +++++++++++++++++++++-
tubemq-client-twins/tubemq-client-go/errs/errs.go | 2 +
.../tubemq-client-go/metadata/partition.go | 7 ++-
.../tubemq-client-go/remote/remote.go | 24 +++++++
4 files changed, 102 insertions(+), 4 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 6db4f40..031d761 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -20,9 +20,11 @@ package client
import (
"context"
"encoding/binary"
+ "fmt"
"hash/crc32"
"os"
"strconv"
+ "strings"
"sync/atomic"
"time"
@@ -241,7 +243,76 @@ func (c *consumer) GetMessage() (*ConsumerResult, error) {
// Confirm implementation of TubeMQ consumer.
func (c *consumer) Confirm(confirmContext string, consumed bool)
(*ConsumerResult, error) {
- panic("implement me")
+ partitionKey, bookedTime, err :=
util.ParseConfirmContext(confirmContext)
+ if err != nil {
+ return nil, errs.New(errs.RetBadRequest, "illegel
confirm_context content: unregular confirm_context value format")
+ }
+ topic, err := parsePartitionKeyToTopic(partitionKey)
+ if err != nil {
+ return nil, errs.New(errs.RetBadRequest, err.Error())
+ }
+ if !c.rmtDataCache.IsPartitionInUse(partitionKey, bookedTime) {
+ return nil, errs.New(errs.RetErrConfirmTimeout, "The
confirm_context's value invalid!")
+ }
+ partition := c.rmtDataCache.GetPartition(partitionKey)
+ if partition == nil {
+ return nil, errs.New(errs.RetErrConfirmTimeout, "Not found the
partition by confirm_context!")
+ }
+
+ rsp, err := c.sendConfirmReq2Broker(partition)
+ if err != nil {
+ return nil, err
+ }
+
+ pi := &PeerInfo{
+ brokerHost: partition.GetBroker().GetHost(),
+ partitionID: uint32(partition.GetPartitionID()),
+ partitionKey: partition.GetPartitionKey(),
+ currOffset: util.InvalidValue,
+ }
+ cs := &ConsumerResult{
+ topicName: partition.GetTopic(),
+ peerInfo: pi,
+ }
+ if !rsp.GetSuccess() {
+ return cs, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
+ }
+ currOffset := rsp.GetCurrOffset()
+ c.rmtDataCache.BookPartitionInfo(partitionKey, currOffset)
+ err = c.rmtDataCache.ReleasePartition(true,
c.subInfo.IsFiltered(topic), confirmContext, consumed)
+ return cs, err
+}
+
+func (c *consumer) sendConfirmReq2Broker(partition *metadata.Partition)
(*protocol.CommitOffsetResponseB2C, error) {
+ m := &metadata.Metadata{}
+ node := &metadata.Node{}
+ node.SetHost(util.GetLocalHost())
+ node.SetAddress(partition.GetBroker().GetAddress())
+ m.SetNode(node)
+ sub := &metadata.SubscribeInfo{}
+ sub.SetGroup(c.config.Consumer.Group)
+ sub.SetPartition(partition)
+ m.SetSubscribeInfo(sub)
+
+ ctx, cancel := context.WithTimeout(context.Background(),
c.config.Net.ReadTimeout-500)
+ defer cancel()
+
+ rsp, err := c.client.CommitOffsetRequestC2B(ctx, m, c.subInfo)
+ return rsp, err
+}
+
+func parsePartitionKeyToTopic(partitionKey string) (string, error) {
+ pos1 := strings.Index(partitionKey, ":")
+ if pos1 == -1 {
+ return "", fmt.Errorf("illegel confirm_context content:
unregular index key value format")
+ }
+ topic := partitionKey[pos1+1:]
+ pos2 := strings.LastIndex(topic, ":")
+ if pos2 == -1 {
+ return "", fmt.Errorf("illegel confirm_context content:
unregular index's topic key value format")
+ }
+ topic = topic[:pos2]
+ return topic, nil
}
// GetCurrConsumedInfo implementation of TubeMQ consumer.
diff --git a/tubemq-client-twins/tubemq-client-go/errs/errs.go
b/tubemq-client-twins/tubemq-client-go/errs/errs.go
index 20427b5..ec94229 100644
--- a/tubemq-client-twins/tubemq-client-go/errs/errs.go
+++ b/tubemq-client-twins/tubemq-client-go/errs/errs.go
@@ -37,6 +37,7 @@ const (
RetSelectorNotExist = 6
RetSuccess = 200
RetErrMoved = 301
+ RetBadRequest = 400
RetErrForbidden = 403
RetErrNotFound = 404
RetErrNoPartAssigned = 406
@@ -49,6 +50,7 @@ const (
RetConsumeContentForbidden = 455
RetErrServiceUnavailable = 503
RetErrConsumeSpeedLimit = 550
+ RetErrConfirmTimeout = 2004
)
// ErrAssertionFailure represents RetAssertionFailure error.
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go
b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
index cbf8bae..f9a951d 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/partition.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
@@ -97,9 +97,10 @@ func NewPartition(partition string) (*Partition, error) {
}
}
return &Partition{
- topic: topic,
- broker: b,
- partitionID: int32(partitionID),
+ topic: topic,
+ broker: b,
+ partitionID: int32(partitionID),
+ strategyData: &strategyData{},
}, nil
}
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go
b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 873acad..a5310e7 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -473,3 +473,27 @@ func (r *RmtDataCache) BookConsumeData(partitionKey
string, data *metadata.Consu
partition.BookConsumeData(data)
}
}
+
+func (r *RmtDataCache) IsPartitionInUse(partitionKey string, bookedTime int64)
bool {
+ r.metaMu.Lock()
+ defer r.metaMu.Unlock()
+
+ bt, ok := r.usedPartitions[partitionKey]
+ if !ok {
+ return false
+ }
+ if bt != bookedTime {
+ return false
+ }
+ return true
+}
+
+func (r *RmtDataCache) GetPartition(key string) *metadata.Partition {
+ r.metaMu.Lock()
+ defer r.metaMu.Unlock()
+
+ if partition, ok := r.partitions[key]; ok {
+ return partition
+ }
+ return nil
+}