This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-25 by this push:
new 9753c44 [INLONG-627]Go SDK GetMessage API (#481)
9753c44 is described below
commit 9753c447da90307a4ae62c3c3186baa710063a44
Author: Zijie Lu <[email protected]>
AuthorDate: Wed Jun 23 17:28:53 2021 +0800
[INLONG-627]Go SDK GetMessage API (#481)
* [INLONG-624]Go SDK consumer interface
Signed-off-by: Zijie Lu <[email protected]>
---
.../tubemq-client-go/client/consumer.go | 4 +
.../tubemq-client-go/client/consumer_impl.go | 181 +++++++-
.../client/{consumer.go => message.go} | 27 +-
.../client/{consumer.go => peer.go} | 25 +-
tubemq-client-twins/tubemq-client-go/errs/errs.go | 13 +
.../tubemq-client-go/flowctrl/handler.go | 460 +++++++++++++++++++++
.../tubemq-client-go/flowctrl/item.go | 126 ++++++
.../{client/consumer.go => flowctrl/result.go} | 44 +-
.../tubemq-client-go/metadata/partition.go | 124 ++++++
.../tubemq-client-go/remote/remote.go | 143 ++++++-
tubemq-client-twins/tubemq-client-go/util/util.go | 55 +++
11 files changed, 1137 insertions(+), 65 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go
b/tubemq-client-twins/tubemq-client-go/client/consumer.go
index 27a63c6..a4eec0b 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -21,6 +21,10 @@ package client
// ConsumerResult of a consumption.
type ConsumerResult struct {
+ topicName string
+ confirmContext string
+ peerInfo *PeerInfo
+ messages []*Message
}
// ConsumerOffset of a consumption,
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index f4211c5..6db4f40 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -19,6 +19,8 @@ package client
import (
"context"
+ "encoding/binary"
+ "hash/crc32"
"os"
"strconv"
"sync/atomic"
@@ -188,7 +190,53 @@ func (c *consumer) processAuthorizedToken(info
*protocol.MasterAuthorizedInfo) {
// GetMessage implementation of TubeMQ consumer.
func (c *consumer) GetMessage() (*ConsumerResult, error) {
- panic("implement me")
+ err := c.checkPartitionErr()
+ if err != nil {
+ return nil, err
+ }
+ partition, err := c.rmtDataCache.SelectPartition()
+ if err != nil {
+ return nil, err
+ }
+ confirmContext := partition.GetPartitionKey() + "@" +
strconv.Itoa(int(time.Now().UnixNano()/int64(time.Millisecond)))
+ isFiltered := c.subInfo.IsFiltered(partition.GetTopic())
+ pi := &PeerInfo{
+ brokerHost: partition.GetBroker().GetHost(),
+ partitionID: uint32(partition.GetPartitionID()),
+ partitionKey: partition.GetPartitionKey(),
+ currOffset: util.InvalidValue,
+ }
+ m := &metadata.Metadata{}
+ node := &metadata.Node{}
+ node.SetHost(util.GetLocalHost())
+ node.SetAddress(partition.GetBroker().GetAddress())
+ m.SetNode(node)
+ sub := &metadata.SubscribeInfo{}
+ sub.SetGroup(c.config.Consumer.Group)
+ sub.SetPartition(partition)
+ m.SetSubscribeInfo(sub)
+
+ ctx, cancel := context.WithTimeout(context.Background(),
c.config.Net.ReadTimeout)
+ defer cancel()
+ rsp, err := c.client.GetMessageRequestC2B(ctx, m, c.subInfo,
c.rmtDataCache)
+ if err != nil {
+ return nil, err
+ }
+ cs := &ConsumerResult{
+ topicName: partition.GetTopic(),
+ confirmContext: confirmContext,
+ peerInfo: pi,
+ }
+ if !rsp.GetSuccess() {
+ err := c.rmtDataCache.ReleasePartition(true, isFiltered,
confirmContext, false)
+ return cs, err
+ }
+ msgs, err := c.processGetMessageRspB2C(pi, isFiltered, partition,
confirmContext, rsp)
+ if err != nil {
+ return cs, err
+ }
+ cs.messages = msgs
+ return cs, err
}
// Confirm implementation of TubeMQ consumer.
@@ -280,7 +328,6 @@ func (c *consumer) connect2Broker(event
*metadata.ConsumerEvent) {
rsp, err := c.sendRegisterReq2Broker(partition,
node)
if err != nil {
//todo add log
- return
}
if !rsp.GetSuccess() {
//todo add log
@@ -372,3 +419,133 @@ func (c *consumer) getConsumeReadStatus(isFirstReg bool)
int32 {
}
return int32(readStatus)
}
+
+func (c *consumer) checkPartitionErr() error {
+ startTime := time.Now().UnixNano() / int64(time.Millisecond)
+ for {
+ ret := c.rmtDataCache.GetCurConsumeStatus()
+ if ret == 0 {
+ return nil
+ }
+ if c.config.Consumer.MaxPartCheckPeriod >= 0 &&
+ time.Now().UnixNano()/int64(time.Millisecond)-startTime
>= c.config.Consumer.MaxPartCheckPeriod.Milliseconds() {
+ switch ret {
+ case errs.RetErrNoPartAssigned:
+ return errs.ErrNoPartAssigned
+ case errs.RetErrAllPartInUse:
+ return errs.ErrAllPartInUse
+ case errs.RetErrAllPartWaiting:
+ return errs.ErrAllPartWaiting
+ }
+ }
+ time.Sleep(c.config.Consumer.PartCheckSlice)
+ }
+}
+
+func (c *consumer) processGetMessageRspB2C(pi *PeerInfo, filtered bool,
partition *metadata.Partition, confirmContext string, rsp
*protocol.GetMessageResponseB2C) ([]*Message, error) {
+ limitDlt := int64(300)
+ escLimit := rsp.GetEscFlowCtrl()
+ now := time.Now().UnixNano() / int64(time.Millisecond)
+ switch rsp.GetErrCode() {
+ case errs.RetSuccess:
+ dataDleVal := util.InvalidValue
+ if rsp.GetCurrDataDlt() >= 0 {
+ dataDleVal = rsp.GetCurrDataDlt()
+ }
+ currOffset := util.InvalidValue
+ if rsp.GetCurrOffset() >= 0 {
+ currOffset = rsp.GetCurrOffset()
+ }
+ msgSize, msgs := c.convertMessages(filtered,
partition.GetTopic(), rsp)
+ c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(),
currOffset)
+ cd := metadata.NewConsumeData(now, 200, escLimit,
int32(msgSize), 0, dataDleVal, rsp.GetRequireSlow())
+ c.rmtDataCache.BookConsumeData(partition.GetPartitionKey(), cd)
+ pi.currOffset = currOffset
+ return msgs, nil
+ case errs.RetErrHBNoNode, errs.RetCertificateFailure,
errs.RetErrDuplicatePartition:
+ partitionKey, _, err := util.ParseConfirmContext(confirmContext)
+ if err != nil {
+ return nil, err
+ }
+ c.rmtDataCache.RemovePartition([]string{partitionKey})
+ return nil, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
+ case errs.RetErrConsumeSpeedLimit:
+ defDltTime := int64(rsp.GetMinLimitTime())
+ if defDltTime == 0 {
+ defDltTime =
c.config.Consumer.MsgNotFoundWait.Milliseconds()
+ }
+ cd := metadata.NewConsumeData(now, rsp.GetErrCode(), false, 0,
limitDlt, defDltTime, rsp.GetRequireSlow())
+ c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(),
util.InvalidValue)
+ c.rmtDataCache.BookConsumeData(partition.GetPartitionKey(), cd)
+ return nil, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
+ case errs.RetErrNotFound:
+ limitDlt = c.config.Consumer.MsgNotFoundWait.Milliseconds()
+ case errs.RetErrForbidden:
+ limitDlt = 2000
+ case errs.RetErrMoved:
+ limitDlt = 200
+ case errs.RetErrServiceUnavailable:
+ }
+ if rsp.GetErrCode() != errs.RetSuccess {
+ cd := metadata.NewConsumeData(now, rsp.GetErrCode(), false, 0,
limitDlt, util.InvalidValue, rsp.GetRequireSlow())
+ c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(),
util.InvalidValue)
+ c.rmtDataCache.BookConsumeData(partition.GetPartitionKey(), cd)
+ c.rmtDataCache.ReleasePartition(true, filtered, confirmContext,
false)
+ return nil, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
+ }
+ return nil, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
+}
+
+func (c *consumer) convertMessages(filtered bool, topic string, rsp
*protocol.GetMessageResponseB2C) (int, []*Message) {
+ msgSize := 0
+ if len(rsp.GetMessages()) == 0 {
+ return msgSize, nil
+ }
+
+ msgs := make([]*Message, 0, len(rsp.GetMessages()))
+ for _, m := range rsp.GetMessages() {
+ checkSum := uint64(crc32.Update(0, crc32.IEEETable,
m.GetPayLoadData())) & 0x7FFFFFFFF
+ if int32(checkSum) != m.GetCheckSum() {
+ continue
+ }
+ readPos := 0
+ dataLen := len(m.GetPayLoadData())
+ var properties map[string]string
+ if m.GetFlag()&0x01 == 1 {
+ if len(m.GetPayLoadData()) < 4 {
+ continue
+ }
+ attrLen :=
int(binary.BigEndian.Uint64(m.GetPayLoadData()))
+ readPos += 4
+ dataLen -= 4
+
+ attribute := m.GetPayLoadData()[readPos :
readPos+attrLen]
+ readPos -= attrLen
+ dataLen -= attrLen
+ properties := util.SplitToMap(string(attribute), ",",
"=")
+ if filtered {
+ topicFilters := c.subInfo.GetTopicFilters()
+ if msgKey, ok := properties["$msgType$"]; ok {
+ if filters, ok := topicFilters[topic];
ok {
+ for _, filter := range filters {
+ if filter == msgKey {
+ continue
+ }
+ }
+ }
+ }
+ }
+ }
+ msg := &Message{
+ topic: topic,
+ flag: m.GetFlag(),
+ id: m.GetMessageId(),
+ properties: properties,
+ dataLen: int32(dataLen),
+ data: string(m.GetPayLoadData()[:readPos]),
+ }
+ msgs = append(msgs, msg)
+ msgSize += dataLen
+ }
+ return msgSize, msgs
+}
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go
b/tubemq-client-twins/tubemq-client-go/client/message.go
similarity index 55%
copy from tubemq-client-twins/tubemq-client-go/client/consumer.go
copy to tubemq-client-twins/tubemq-client-go/client/message.go
index 27a63c6..f511647 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/message.go
@@ -15,26 +15,13 @@
* limitations under the License.
*/
-// Package client defines the api and information
-// which can be exposed to user.
package client
-// ConsumerResult of a consumption.
-type ConsumerResult struct {
-}
-
-// ConsumerOffset of a consumption,
-type ConsumerOffset struct {
-}
-
-var clientID uint64
-
-// Consumer is an interface that abstracts behavior of TubeMQ's consumer
-type Consumer interface {
- // GetMessage receive a single message.
- GetMessage() (*ConsumerResult, error)
- // Confirm the consumption of a message.
- Confirm(confirmContext string, consumed bool) (*ConsumerResult, error)
- // GetCurrConsumedInfo returns the consumptions of the consumer.
- GetCurrConsumedInfo() (map[string]*ConsumerOffset, error)
+type Message struct {
+ topic string
+ data string
+ dataLen int32
+ id int64
+ flag int32
+ properties map[string]string
}
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go
b/tubemq-client-twins/tubemq-client-go/client/peer.go
similarity index 55%
copy from tubemq-client-twins/tubemq-client-go/client/consumer.go
copy to tubemq-client-twins/tubemq-client-go/client/peer.go
index 27a63c6..2635d7d 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/peer.go
@@ -15,26 +15,11 @@
* limitations under the License.
*/
-// Package client defines the api and information
-// which can be exposed to user.
package client
-// ConsumerResult of a consumption.
-type ConsumerResult struct {
-}
-
-// ConsumerOffset of a consumption,
-type ConsumerOffset struct {
-}
-
-var clientID uint64
-
-// Consumer is an interface that abstracts behavior of TubeMQ's consumer
-type Consumer interface {
- // GetMessage receive a single message.
- GetMessage() (*ConsumerResult, error)
- // Confirm the consumption of a message.
- Confirm(confirmContext string, consumed bool) (*ConsumerResult, error)
- // GetCurrConsumedInfo returns the consumptions of the consumer.
- GetCurrConsumedInfo() (map[string]*ConsumerOffset, error)
+type PeerInfo struct {
+ partitionID uint32
+ brokerHost string
+ partitionKey string
+ currOffset int64
}
diff --git a/tubemq-client-twins/tubemq-client-go/errs/errs.go
b/tubemq-client-twins/tubemq-client-go/errs/errs.go
index eb45179..20427b5 100644
--- a/tubemq-client-twins/tubemq-client-go/errs/errs.go
+++ b/tubemq-client-twins/tubemq-client-go/errs/errs.go
@@ -35,15 +35,28 @@ const (
RetRequestFailure = 5
// RetSelectorNotExist represents the selector not exists.
RetSelectorNotExist = 6
+ RetSuccess = 200
+ RetErrMoved = 301
+ RetErrForbidden = 403
+ RetErrNotFound = 404
+ RetErrNoPartAssigned = 406
+ RetErrAllPartWaiting = 407
+ RetErrAllPartInUse = 408
RetErrHBNoNode = 411
+ RetErrDuplicatePartition = 412
RetCertificateFailure = 415
RetConsumeGroupForbidden = 450
RetConsumeContentForbidden = 455
+ RetErrServiceUnavailable = 503
+ RetErrConsumeSpeedLimit = 550
)
// ErrAssertionFailure represents RetAssertionFailure error.
var (
ErrAssertionFailure = New(RetAssertionFailure, "AssertionFailure")
+ ErrNoPartAssigned = New(RetErrNoPartAssigned, "No partition info in
local cache, please retry later!")
+ ErrAllPartWaiting = New(RetErrAllPartWaiting, "All partitions reach
max position, please retry later!")
+ ErrAllPartInUse = New(RetErrAllPartInUse, "No idle partition to
consume, please retry later!")
)
// Error provides a TubeMQ-specific error container
diff --git a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
new file mode 100644
index 0000000..d9fd9bc
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// package flowctrl defines the rule and handle logic of flow control.
+package flowctrl
+
+import (
+ "encoding/json"
+ "fmt"
+ "math"
+ "sort"
+ "strconv"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+type RuleHandler struct {
+ lastUpdate int64
+ flowCtrlInfo string
+ qrypriorityID int64
+ dataSizeLimit int64
+ flowCtrlID int64
+ minZeroCount int64
+ minDataDltLimit int64
+ dataLimitStartTime int64
+ dataLimitEndTime int64
+ configMu sync.Mutex
+ flowCtrlRules map[int32][]*Item
+ flowCtrlItem *Item
+}
+
+// NewRuleHandler returns the a default RuleHandler.
+func NewRuleHandler() *RuleHandler {
+ return &RuleHandler{
+ flowCtrlID: util.InvalidValue,
+ minZeroCount: math.MaxInt64,
+ qrypriorityID: util.InvalidValue,
+ minDataDltLimit: math.MaxInt64,
+ dataLimitStartTime: 2500,
+ dataLimitEndTime: util.InvalidValue,
+ lastUpdate: time.Now().UnixNano() /
int64(time.Millisecond),
+ }
+}
+
+// GetQryPriorityID returns the qrypriorityID.
+func (h *RuleHandler) GetQryPriorityID() int64 {
+ return atomic.LoadInt64(&h.qrypriorityID)
+}
+
+// GetFlowCtrID returns the flowCtrlID.
+func (h *RuleHandler) GetFlowCtrID() int64 {
+ return atomic.LoadInt64(&h.flowCtrlID)
+}
+
+// SetQryPriorityID sets the qrypriorityID
+func (h *RuleHandler) SetQryPriorityID(qryPriorityID int64) {
+ atomic.StoreInt64(&h.qrypriorityID, qryPriorityID)
+}
+
+// UpdateDefFlowCtrlInfo updates the flow control information.
+func (h *RuleHandler) UpdateDefFlowCtrlInfo(isDefault bool, qrypriorityID
int64, flowCtrlID int64, info string) error {
+ if atomic.LoadInt64(&h.flowCtrlID) == flowCtrlID {
+ return nil
+ }
+ //curFlowCtrlID := atomic.LoadInt64(&h.flowCtrlID)
+ var flowCtrlItems map[int32][]*Item
+ var err error
+ if len(info) > 0 {
+ flowCtrlItems, err = parseFlowCtrlInfo(info)
+ if err != nil {
+ return err
+ }
+ }
+ h.configMu.Lock()
+ defer h.configMu.Unlock()
+ h.clearStatisticData()
+ atomic.StoreInt64(&h.qrypriorityID, qrypriorityID)
+ if len(flowCtrlItems) == 0 {
+ h.flowCtrlRules = make(map[int32][]*Item)
+ info = ""
+ } else {
+ h.flowCtrlRules = flowCtrlItems
+ h.flowCtrlInfo = info
+ h.initStatisticData()
+ }
+ h.lastUpdate = time.Now().UnixNano() / int64(time.Millisecond)
+ if isDefault {
+ // todo log
+ // LOG_INFO("[Flow Ctrl] Default FlowCtrl's flowctrl_id from
%ld to %ld\n", curr_flowctrl_id,
+ // flowctrl_id)
+ } else {
+ // todo log
+ // LOG_INFO("[Flow Ctrl] Group FlowCtrl's flowctrl_id from %ld
to %ld\n", curr_flowctrl_id,
+ // flowctrl_id)
+ }
+ return nil
+}
+
+func (h *RuleHandler) initStatisticData() {
+ for id, rules := range h.flowCtrlRules {
+ if id == 0 {
+ for _, rule := range rules {
+ if rule.tp != 0 {
+ continue
+ }
+
+ if rule.datadlt <
atomic.LoadInt64(&h.minDataDltLimit) {
+ atomic.StoreInt64(&h.minDataDltLimit,
rule.datadlt)
+ }
+ if rule.startTime <
atomic.LoadInt64(&h.dataLimitStartTime) {
+
atomic.StoreInt64(&h.dataLimitStartTime, rule.startTime)
+ }
+ if rule.endTime <
atomic.LoadInt64(&h.dataLimitEndTime) {
+ atomic.StoreInt64(&h.dataLimitEndTime,
rule.endTime)
+ }
+ }
+ }
+ if id == 1 {
+ for _, rule := range rules {
+ if rule.tp != 1 {
+ continue
+ }
+ if rule.zeroCnt <
atomic.LoadInt64(&h.minZeroCount) {
+ atomic.StoreInt64(&h.minZeroCount,
rule.zeroCnt)
+ }
+ }
+ }
+ if id == 3 {
+ for _, rule := range rules {
+ if rule.tp != 3 {
+ continue
+ }
+ h.flowCtrlItem.SetTp(3)
+
h.flowCtrlItem.SetDataSizeLimit(rule.dataSizeLimit)
+ h.flowCtrlItem.SetFreqLimit(rule.freqMsLimit)
+ h.flowCtrlItem.SetZeroCnt(rule.zeroCnt)
+ }
+ }
+ }
+}
+
+func (h *RuleHandler) clearStatisticData() {
+ atomic.StoreInt64(&h.minZeroCount, util.InvalidValue)
+ atomic.StoreInt64(&h.minDataDltLimit, math.MaxInt64)
+ atomic.StoreInt64(&h.qrypriorityID, util.InvalidValue)
+ atomic.StoreInt64(&h.dataSizeLimit, 2500)
+ atomic.StoreInt64(&h.dataLimitEndTime, util.InvalidValue)
+ h.flowCtrlItem.clear()
+}
+
+// GetCurDataLimit returns the flow control result based on lastDataDlt.
+func (h *RuleHandler) GetCurDataLimit(lastDataDlt int64) *Result {
+ l := time.FixedZone("GMT", 3600*8)
+ now := time.Now()
+ hour := now.In(l).Hour()
+ min := now.In(l).Minute()
+ curTime := int64(hour*100 + min)
+ if lastDataDlt < atomic.LoadInt64(&h.minDataDltLimit) ||
+ curTime < atomic.LoadInt64(&h.dataLimitStartTime) ||
+ curTime > atomic.LoadInt64(&h.dataLimitEndTime) {
+ return nil
+ }
+ h.configMu.Lock()
+ defer h.configMu.Lock()
+ if _, ok := h.flowCtrlRules[0]; !ok {
+ return nil
+ }
+ for _, rule := range h.flowCtrlRules[0] {
+ result := rule.GetDataLimit(lastDataDlt, curTime)
+ if result != nil {
+ return result
+ }
+ }
+ return nil
+}
+
+// GetFilterCtrlItem returns the flow control item.
+func (h *RuleHandler) GetFilterCtrlItem() *Item {
+ h.configMu.Lock()
+ defer h.configMu.Unlock()
+ return h.flowCtrlItem
+}
+
+// GetCurFreqLimitTim returns curFreqLimitTime.
+func (h *RuleHandler) GetCurFreqLimitTime(msgZeroCnt int32, receivedLimit
int64) int64 {
+ limitData := receivedLimit
+ if int64(msgZeroCnt) < atomic.LoadInt64(&h.minZeroCount) {
+ return limitData
+ }
+ h.configMu.Lock()
+ defer h.configMu.Unlock()
+ if _, ok := h.flowCtrlRules[1]; !ok {
+ return limitData
+ }
+ for _, rule := range h.flowCtrlRules[1] {
+ limitData = rule.getFreLimit(msgZeroCnt)
+ if limitData >= 0 {
+ break
+ }
+ }
+ return limitData
+}
+
+// getMinZeroCnt returns the minZeroCount,
+func (h *RuleHandler) GetMinZeroCnt() int64 {
+ return atomic.LoadInt64(&h.minZeroCount)
+}
+
+func parseFlowCtrlInfo(info string) (map[int32][]*Item, error) {
+ if len(info) == 0 {
+ return nil, nil
+ }
+ var docs []map[string]interface{}
+ if err := json.Unmarshal([]byte(info), &docs); err != nil {
+ return nil, err
+ }
+ flowCtrlMap := make(map[int32][]*Item)
+ for _, doc := range docs {
+ if _, ok := doc["type"]; !ok {
+ return nil, fmt.Errorf("field not existed")
+ }
+ tp, err := parseInt(doc, "type", false, util.InvalidValue)
+ if err != nil {
+ return nil, err
+ }
+
+ if tp < 0 || tp > 3 {
+ return nil, fmt.Errorf("illegal value, not required
value content")
+ }
+
+ rule, err := parseRule(doc)
+ if err != nil {
+ return nil, err
+ }
+ switch tp {
+ case 1:
+ flowCtrlItems, err := parseFreqLimit(rule)
+ if err != nil {
+ return nil, err
+ }
+ flowCtrlMap[1] = flowCtrlItems
+ case 3:
+ flowCtrlItems, err := parseLowFetchLimit(rule)
+ if err != nil {
+ return nil, err
+ }
+ flowCtrlMap[3] = flowCtrlItems
+ case 0:
+ flowCtrlItems, err := parseDataLimit(rule)
+ if err != nil {
+ return nil, err
+ }
+ flowCtrlMap[0] = flowCtrlItems
+ }
+ }
+ return flowCtrlMap, nil
+}
+
+func parseRule(doc map[string]interface{}) ([]byte, error) {
+ if _, ok := doc["rule"]; !ok {
+ return nil, fmt.Errorf("rule field not existed")
+ }
+ if _, ok := doc["rule"].([]byte); !ok {
+ return nil, fmt.Errorf("rule should be a string")
+ }
+ v := doc["rule"].([]byte)
+ return v, nil
+}
+
+func parseDataLimit(b []byte) ([]*Item, error) {
+ var rules []map[string]interface{}
+ if err := json.Unmarshal(b, &rules); err != nil {
+ return nil, err
+ }
+ items := make([]*Item, 0, len(rules))
+ for i, rule := range rules {
+ start, err := parseTime(rule, "start")
+ if err != nil {
+ return nil, err
+ }
+ end, err := parseTime(rule, "end")
+ if err != nil {
+ return nil, err
+ }
+ if start > end {
+ return nil, fmt.Errorf("start value must lower than the
End value in index(%d) of data limit rule", i)
+ }
+ datadlt, err := parseInt(rule, "dltInM", false, -1)
+ if err != nil {
+ return nil, fmt.Errorf("dltInM key is required in
index(%d) of data limit rule", i)
+ }
+ datadlt = datadlt * 1024 * 204
+ dataSizeLimit, err := parseInt(rule, "limitInM", false, -1)
+ if err != nil {
+ return nil, fmt.Errorf("limitInM key is required in
index(%d) of data limit rule", i)
+ }
+ if dataSizeLimit < 0 {
+ return nil, fmt.Errorf("limitInM value must over than
equal or bigger than zero in index(%d) of data limit rule", i)
+ }
+ dataSizeLimit = dataSizeLimit * 1024 * 1024
+ freqMsLimit, err := parseInt(rule, "freqInMs", false, -1)
+ if err != nil {
+ return nil, fmt.Errorf("freqInMs key is required in
index((%d) of data limit rule", i)
+ }
+ if freqMsLimit < 200 {
+ return nil, fmt.Errorf("freqInMs value must over than
equal or bigger than 200 in index(%d) of data limit rule", i)
+ }
+ item := NewItem()
+ item.SetTp(0)
+ item.SetStartTime(start)
+ item.SetEndTime(end)
+ item.SetDatadlt(datadlt)
+ item.SetDataSizeLimit(dataSizeLimit)
+ item.SetFreqLimit(freqMsLimit)
+ items = append(items, item)
+ }
+ return items, nil
+}
+
+func parseLowFetchLimit(b []byte) ([]*Item, error) {
+ var rules []map[string]interface{}
+ if err := json.Unmarshal(b, &rules); err != nil {
+ return nil, err
+ }
+ items := make([]*Item, 0, len(rules))
+ for _, rule := range rules {
+ var filterFreqMs int64
+ var err error
+ if _, ok := rule["filterFreqInMs"]; ok {
+ filterFreqMs, err = parseInt(rule, "filterFreqInMs",
false, -1)
+ if err != nil {
+ return nil, fmt.Errorf("decode failure: %s of
filterFreqInMs field in parse low fetch limit", err.Error())
+ }
+ }
+ if filterFreqMs < 0 || filterFreqMs > 300000 {
+ return nil, fmt.Errorf("decode failure: filterFreqInMs
value must in [0, 300000] in index(%d) of low fetch limit rule", filterFreqMs)
+ }
+
+ var minFilteFreqMs int64
+ if _, ok := rule["minDataFilterFreqInMs"]; ok {
+ minFilteFreqMs, err = parseInt(rule,
"minDataFilterFreqInMs", false, -1)
+ if err != nil {
+ return nil, fmt.Errorf("decode failure: %s of
minDataFilterFreqInMs field in parse low fetch limit", err.Error())
+ }
+ }
+ if minFilteFreqMs < 0 || minFilteFreqMs > 300000 {
+ return nil, fmt.Errorf("decode failure:
minDataFilterFreqInMs value must in [0, 300000] in index(%d) of low fetch limit
rule", filterFreqMs)
+ }
+ if minFilteFreqMs < filterFreqMs {
+ return nil, fmt.Errorf("decode failure:
minDataFilterFreqInMs must lower than filterFreqInMs in index(%d) of low fetch
limit rule", filterFreqMs)
+ }
+ var normFreqMs int64
+ if _, ok := rule["normFreqInMs"]; ok {
+ normFreqMs, err = parseInt(rule, "normFreqInMs", false,
-1)
+ if err != nil {
+ return nil, fmt.Errorf("decode failure: %s of
normFreqInMs field in parse low fetch limit", err.Error())
+ }
+ if normFreqMs < 0 || normFreqMs > 30000 {
+ return nil, fmt.Errorf("decode failure:
normFreqInMs value must in [0, 300000] in index(%d) of low fetch limit rule",
filterFreqMs)
+ }
+ }
+ item := NewItem()
+ item.SetTp(3)
+ item.SetDataSizeLimit(normFreqMs)
+ item.SetFreqLimit(filterFreqMs)
+ item.SetZeroCnt(minFilteFreqMs)
+ items = append(items, item)
+ }
+ return items, nil
+}
+
+func parseFreqLimit(b []byte) ([]*Item, error) {
+ var rules []map[string]interface{}
+ if err := json.Unmarshal(b, &rules); err != nil {
+ return nil, err
+ }
+ items := make([]*Item, 0, len(rules))
+ for _, rule := range rules {
+ zeroCnt, err := parseInt(rule, "zeroCnt", false,
util.InvalidValue)
+ if err != nil {
+ return nil, err
+ }
+ freqLimit, err := parseInt(rule, "freqInMs", false,
util.InvalidValue)
+ if err != nil {
+ return nil, err
+ }
+ item := NewItem()
+ item.SetTp(1)
+ item.SetZeroCnt(zeroCnt)
+ item.SetFreqLimit(freqLimit)
+ items = append(items, item)
+ }
+ if len(items) > 0 {
+ sort.Slice(items, func(i, j int) bool {
+ return items[i].zeroCnt < items[j].zeroCnt
+ })
+ }
+ return items, nil
+}
+
+func parseInt(doc map[string]interface{}, key string, compare bool, required
int64) (int64, error) {
+ if _, ok := doc[key]; !ok {
+ return util.InvalidValue, fmt.Errorf("field not existed")
+ }
+ if _, ok := doc[key].(int64); !ok {
+ return util.InvalidValue, fmt.Errorf("illegal value, must be
int type")
+ }
+ v := doc[key].(int64)
+ if compare && v != required {
+ return util.InvalidValue, fmt.Errorf("illegal value, not
required value content")
+ }
+ return v, nil
+}
+
+func parseTime(doc map[string]interface{}, key string) (int64, error) {
+ if _, ok := doc[key]; !ok {
+ return util.InvalidValue, fmt.Errorf("field %s not existed",
key)
+ }
+ if _, ok := doc[key].(string); !ok {
+ return util.InvalidValue, fmt.Errorf("field %s must be int
type", key)
+ }
+ s := doc[key].(string)
+ pos1 := strings.Index(s, ":")
+ if pos1 == -1 {
+ return util.InvalidValue, fmt.Errorf("field %s must be 'aa:bb'
and 'aa','bb' must be int value format", key)
+ }
+ h, err := strconv.Atoi(s[:pos1])
+ if err != nil {
+ return util.InvalidValue, err
+ }
+ if h < 0 || h > 24 {
+ return util.InvalidValue, fmt.Errorf("field %s -hour value must
in [0,23]", key)
+ }
+ m, err := strconv.Atoi(s[pos1+1:])
+ if err != nil {
+ return util.InvalidValue, err
+ }
+ if m < 0 || m > 59 {
+ return util.InvalidValue, fmt.Errorf("field %s -minute value
must in [0,59]", key)
+ }
+ return int64(h*100 + m), nil
+}
diff --git a/tubemq-client-twins/tubemq-client-go/flowctrl/item.go
b/tubemq-client-twins/tubemq-client-go/flowctrl/item.go
new file mode 100644
index 0000000..2a2ec6c
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/flowctrl/item.go
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package flowctrl
+
+import (
+
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+// Item defines a flow control item.
+type Item struct {
+ tp int32
+ startTime int64
+ endTime int64
+ datadlt int64
+ dataSizeLimit int64
+ freqMsLimit int64
+ zeroCnt int64
+}
+
+// NewItem returns a default flow control item.
+func NewItem() *Item {
+ return &Item{
+ startTime: 2500,
+ endTime: util.InvalidValue,
+ datadlt: util.InvalidValue,
+ dataSizeLimit: util.InvalidValue,
+ freqMsLimit: util.InvalidValue,
+ zeroCnt: util.InvalidValue,
+ }
+}
+
+// SetTp sets the type.
+func (i *Item) SetTp(tp int32) {
+ i.tp = tp
+}
+
+// SetStartTime sets the startTime.
+func (i *Item) SetStartTime(startTime int64) {
+ i.startTime = startTime
+}
+
+// SetEndTime sets the endTime.
+func (i *Item) SetEndTime(endTime int64) {
+ i.endTime = endTime
+}
+
+// SetDataDlt sets the datadlt.
+func (i *Item) SetDatadlt(datadlt int64) {
+ i.datadlt = datadlt
+}
+
+// SetDataSizeLimit sets the dataSizeLimit.
+func (i *Item) SetDataSizeLimit(dataSizeLimit int64) {
+ i.dataSizeLimit = dataSizeLimit
+}
+
+// SetFreqLimit sets the freqLimit.
+func (i *Item) SetFreqLimit(freqLimit int64) {
+ i.freqMsLimit = freqLimit
+}
+
+// SetZeroCnt sets the zeroCnt.
+func (i *Item) SetZeroCnt(zeroCnt int64) {
+ i.zeroCnt = zeroCnt
+}
+
+func (i *Item) clear() {
+ i.tp = 0
+ i.startTime = 2500
+ i.endTime = util.InvalidValue
+ i.datadlt = util.InvalidValue
+ i.dataSizeLimit = util.InvalidValue
+ i.freqMsLimit = util.InvalidValue
+ i.zeroCnt = util.InvalidValue
+}
+
+// GetDataLimit returns the flow control result.
+func (i *Item) GetDataLimit(dlt int64, time int64) *Result {
+ if i.tp != 0 || dlt <= i.datadlt {
+ return nil
+ }
+ if time < i.startTime || time > i.endTime {
+ return nil
+ }
+ return NewResult(i.dataSizeLimit, int32(i.freqMsLimit))
+}
+
+// GetFreqMsLimit returns the freqMsLimit.
+func (i *Item) GetFreqMsLimit() int64 {
+ return i.freqMsLimit
+}
+
+func (i *Item) getFreLimit(cnt int32) int64 {
+ if i.tp != 1 {
+ return -1
+ }
+ if int64(cnt) >= i.zeroCnt {
+ return i.freqMsLimit
+ }
+ return -1
+}
+
+// GetDataSizeLimit returns the dataSizeLimit.
+func (i *Item) GetDataSizeLimit() int64 {
+ return i.dataSizeLimit
+}
+
+// GetZeroCnt returns the zeroCnt.
+func (i *Item) GetZeroCnt() int64 {
+ return i.zeroCnt
+}
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go
b/tubemq-client-twins/tubemq-client-go/flowctrl/result.go
similarity index 50%
copy from tubemq-client-twins/tubemq-client-go/client/consumer.go
copy to tubemq-client-twins/tubemq-client-go/flowctrl/result.go
index 27a63c6..c0b3ffd 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/flowctrl/result.go
@@ -15,26 +15,38 @@
* limitations under the License.
*/
-// Package client defines the api and information
-// which can be exposed to user.
-package client
+package flowctrl
-// ConsumerResult of a consumption.
-type ConsumerResult struct {
+// Result defines a flow control result.
+type Result struct {
+ dataSizeLimit int64
+ freqMsLimit int32
}
-// ConsumerOffset of a consumption,
-type ConsumerOffset struct {
+// NewResult returns a new flow control result.
+func NewResult(dataSizeLimit int64, freqMsLimit int32) *Result {
+ return &Result{
+ dataSizeLimit: dataSizeLimit,
+ freqMsLimit: freqMsLimit,
+ }
}
-var clientID uint64
+// SetDataSizeLimit sets the dataSizeLimit.
+func (r *Result) SetDataSizeLimit(dataSizeLimit int64) {
+ r.dataSizeLimit = dataSizeLimit
+}
+
+// SetFreqMsLimit sets the freqMsLimit.
+func (r *Result) SetFreqMsLimit(freqMsLimit int32) {
+ r.freqMsLimit = freqMsLimit
+}
+
+// GetDataSizeLimit returns the dataSizeLimit.
+func (r *Result) GetDataSizeLimit() int64 {
+ return r.dataSizeLimit
+}
-// Consumer is an interface that abstracts behavior of TubeMQ's consumer
-type Consumer interface {
- // GetMessage receive a single message.
- GetMessage() (*ConsumerResult, error)
- // Confirm the consumption of a message.
- Confirm(confirmContext string, consumed bool) (*ConsumerResult, error)
- // GetCurrConsumedInfo returns the consumptions of the consumer.
- GetCurrConsumedInfo() (map[string]*ConsumerOffset, error)
+// GetFreqMsLimit returns the freqMsLimit.
+func (r *Result) GetFreqMsLimit() int64 {
+ return int64(r.freqMsLimit)
}
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go
b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
index d01b570..cbf8bae 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/partition.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
@@ -18,8 +18,14 @@
package metadata
import (
+ "math"
"strconv"
"strings"
+ "time"
+
+
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/flowctrl"
+
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
)
// Partition represents the metadata of a partition.
@@ -30,6 +36,41 @@ type Partition struct {
partitionKey string
offset int64
lastConsumed bool
+ consumeData *ConsumeData
+ flowCtrl *flowctrl.Result
+ freqCtrl *flowctrl.Item
+ strategyData *strategyData
+ totalZeroCnt int32
+}
+
+type ConsumeData struct {
+ time int64
+ errCode int32
+ escLimit bool
+ msgSize int32
+ dltLimit int64
+ curDataDlt int64
+ requireSlow bool
+}
+
+type strategyData struct {
+ nextStageUpdate int64
+ nextSliceUpdate int64
+ limitSliceMsgSize int64
+ curStageMsgSize int64
+ curSliceMsgSize int64
+}
+
+func NewConsumeData(time int64, errCode int32, escLimit bool, msgSize int32,
dltLimit int64, curDataDlt int64, requireSlow bool) *ConsumeData {
+ return &ConsumeData{
+ time: time,
+ errCode: errCode,
+ escLimit: escLimit,
+ msgSize: msgSize,
+ dltLimit: dltLimit,
+ curDataDlt: curDataDlt,
+ requireSlow: requireSlow,
+ }
}
// NewPartition parses a partition from the given string.
@@ -96,3 +137,86 @@ func (p *Partition) String() string {
func (p *Partition) SetLastConsumed(lastConsumed bool) {
p.lastConsumed = lastConsumed
}
+
+// BookConsumeData sets the consumeData.
+func (p *Partition) BookConsumeData(data *ConsumeData) {
+ p.consumeData = data
+}
+
+// ProcConsumeResult processes the consume result.
+func (p *Partition) ProcConsumeResult(defHandler *flowctrl.RuleHandler,
groupHandler *flowctrl.RuleHandler, filterConsume bool, lastConsumed bool)
int64 {
+ dltTime := time.Now().UnixNano()/int64(time.Millisecond) -
p.consumeData.time
+ p.updateStrategyData(defHandler, groupHandler)
+ p.lastConsumed = lastConsumed
+ switch p.consumeData.errCode {
+ case errs.RetSuccess, errs.RetErrNotFound:
+ if p.consumeData.msgSize == 0 && p.consumeData.errCode == 200 {
+ p.totalZeroCnt++
+ } else {
+ p.totalZeroCnt = 0
+ }
+ if p.totalZeroCnt > 0 {
+ if groupHandler.GetMinZeroCnt() != math.MaxInt64 {
+ return
groupHandler.GetCurFreqLimitTime(p.totalZeroCnt, p.consumeData.dltLimit) -
dltTime
+ } else {
+ return
defHandler.GetCurFreqLimitTime(p.totalZeroCnt, p.consumeData.dltLimit) - dltTime
+ }
+ }
+ if p.consumeData.escLimit {
+ return 0
+ } else {
+ if p.strategyData.curStageMsgSize >=
p.flowCtrl.GetDataSizeLimit() ||
+ p.strategyData.curStageMsgSize >=
p.strategyData.limitSliceMsgSize {
+ if p.flowCtrl.GetFreqMsLimit() >
p.consumeData.dltLimit {
+ return p.flowCtrl.GetFreqMsLimit() -
dltTime
+ }
+ return p.consumeData.dltLimit - dltTime
+ }
+ if p.consumeData.errCode == errs.RetSuccess {
+ if filterConsume && p.freqCtrl.GetFreqMsLimit()
>= 0 {
+ if p.consumeData.requireSlow {
+ return p.freqCtrl.GetZeroCnt()
- dltTime
+ } else {
+ return
p.freqCtrl.GetFreqMsLimit() - dltTime
+ }
+ } else if !filterConsume &&
p.freqCtrl.GetDataSizeLimit() >= 0 {
+ return p.freqCtrl.GetDataSizeLimit() -
dltTime
+ }
+ }
+ return p.consumeData.dltLimit - dltTime
+ }
+ default:
+ return p.consumeData.curDataDlt - dltTime
+ }
+}
+
+func (p *Partition) updateStrategyData(defHandler *flowctrl.RuleHandler,
groupHandler *flowctrl.RuleHandler) {
+ p.strategyData.curStageMsgSize += int64(p.consumeData.msgSize)
+ p.strategyData.curSliceMsgSize += int64(p.consumeData.msgSize)
+ curTime := time.Now().UnixNano() / int64(time.Millisecond)
+ if curTime > p.strategyData.nextStageUpdate {
+ p.strategyData.curStageMsgSize = 0
+ p.strategyData.curSliceMsgSize = 0
+ if p.consumeData.curDataDlt >= 0 {
+ p.flowCtrl =
groupHandler.GetCurDataLimit(p.consumeData.curDataDlt)
+ if p.flowCtrl == nil {
+ p.flowCtrl =
defHandler.GetCurDataLimit(p.consumeData.curDataDlt)
+ }
+ if p.flowCtrl == nil {
+ p.flowCtrl.SetDataSizeLimit(util.InvalidValue)
+ p.flowCtrl.SetFreqMsLimit(0)
+ }
+ p.freqCtrl = groupHandler.GetFilterCtrlItem()
+ if p.freqCtrl.GetFreqMsLimit() < 0 {
+ p.freqCtrl = defHandler.GetFilterCtrlItem()
+ }
+ curTime = time.Now().UnixNano() /
int64(time.Millisecond)
+ }
+ p.strategyData.limitSliceMsgSize =
p.flowCtrl.GetDataSizeLimit() / 12
+ p.strategyData.nextStageUpdate = curTime + 60000
+ p.strategyData.nextSliceUpdate = curTime + 5000
+ } else if curTime > p.strategyData.nextSliceUpdate {
+ p.strategyData.curSliceMsgSize = 0
+ p.strategyData.nextSliceUpdate = curTime + 5000
+ }
+}
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go
b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index bfc63b7..873acad 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -19,9 +19,14 @@
package remote
import (
+ "fmt"
+ "math"
"sync"
+ "sync/atomic"
"time"
+
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/flowctrl"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
)
@@ -30,7 +35,8 @@ import (
type RmtDataCache struct {
consumerID string
groupName string
- underGroupCtrl bool
+ underGroupCtrl int32
+ lastCheck int64
defFlowCtrlID int64
groupFlowCtrlID int64
partitionSubInfo map[string]*metadata.SubscribeInfo
@@ -47,6 +53,9 @@ type RmtDataCache struct {
partitionTimeouts map[string]*time.Timer
topicPartitions map[string]map[string]bool
partitionRegBooked map[string]bool
+ partitionOffset map[string]int64
+ groupHandler *flowctrl.RuleHandler
+ defHandler *flowctrl.RuleHandler
// EventCh is the channel for consumer to consume
EventCh chan *metadata.ConsumerEvent
}
@@ -66,6 +75,8 @@ func NewRmtDataCache() *RmtDataCache {
partitionTimeouts: make(map[string]*time.Timer),
topicPartitions: make(map[string]map[string]bool),
partitionRegBooked: make(map[string]bool),
+ groupHandler: flowctrl.NewRuleHandler(),
+ defHandler: flowctrl.NewRuleHandler(),
EventCh: make(chan *metadata.ConsumerEvent, 1),
}
return r
@@ -73,17 +84,17 @@ func NewRmtDataCache() *RmtDataCache {
// GetUnderGroupCtrl returns the underGroupCtrl.
func (r *RmtDataCache) GetUnderGroupCtrl() bool {
- return r.underGroupCtrl
+ return atomic.LoadInt32(&r.underGroupCtrl) == 0
}
// GetDefFlowCtrlID returns the defFlowCtrlID.
func (r *RmtDataCache) GetDefFlowCtrlID() int64 {
- return r.defFlowCtrlID
+ return r.defHandler.GetFlowCtrID()
}
// GetGroupFlowCtrlID returns the groupFlowCtrlID.
func (r *RmtDataCache) GetGroupFlowCtrlID() int64 {
- return r.groupFlowCtrlID
+ return r.groupHandler.GetFlowCtrID()
}
// GetGroupName returns the group name.
@@ -104,7 +115,7 @@ func (r *RmtDataCache) GetSubscribeInfo()
[]*metadata.SubscribeInfo {
// GetQryPriorityID returns the QryPriorityID.
func (r *RmtDataCache) GetQryPriorityID() int32 {
- return r.qryPriorityID
+ return int32(r.groupHandler.GetQryPriorityID())
}
// PollEventResult polls the first event result from the rebalanceResults.
@@ -142,12 +153,29 @@ func (r *RmtDataCache) SetConsumerInfo(consumerID string,
group string) {
// UpdateDefFlowCtrlInfo updates the defFlowCtrlInfo.
func (r *RmtDataCache) UpdateDefFlowCtrlInfo(flowCtrlID int64, flowCtrlInfo
string) {
-
+ if flowCtrlID != r.defHandler.GetFlowCtrID() {
+ r.defHandler.UpdateDefFlowCtrlInfo(true, util.InvalidValue,
flowCtrlID, flowCtrlInfo)
+ }
}
// UpdateGroupFlowCtrlInfo updates the groupFlowCtrlInfo.
func (r *RmtDataCache) UpdateGroupFlowCtrlInfo(qryPriorityID int32, flowCtrlID
int64, flowCtrlInfo string) {
-
+ if flowCtrlID != r.defHandler.GetFlowCtrID() {
+ r.groupHandler.UpdateDefFlowCtrlInfo(false,
int64(qryPriorityID), flowCtrlID, flowCtrlInfo)
+ }
+ if int64(qryPriorityID) != r.groupHandler.GetQryPriorityID() {
+ r.groupHandler.SetQryPriorityID(int64(qryPriorityID))
+ }
+ cur := time.Now().UnixNano() / int64(time.Millisecond)
+ if cur-atomic.LoadInt64(&r.lastCheck) > 10000 {
+ result := r.groupHandler.GetCurDataLimit(math.MaxInt64)
+ if result != nil {
+ atomic.StoreInt32(&r.underGroupCtrl, 1)
+ } else {
+ atomic.StoreInt32(&r.underGroupCtrl, 0)
+ }
+ atomic.StoreInt64(&r.lastCheck, cur)
+ }
}
// OfferEventAndNotify offers an consumer event and notifies the consumer
method and notify the consumer to consume.
@@ -334,6 +362,91 @@ func (r *RmtDataCache) IsFirstRegister(partitionKey
string) bool {
return r.partitionRegBooked[partitionKey]
}
+// GetCurConsumeStatus returns the current consumption status.
+func (r *RmtDataCache) GetCurConsumeStatus() int32 {
+ r.metaMu.Lock()
+ defer r.metaMu.Unlock()
+
+ if len(r.partitions) == 0 {
+ return errs.RetErrNoPartAssigned
+ }
+ if len(r.indexPartitions) == 0 {
+ if len(r.usedPartitions) == 0 {
+ return errs.RetErrAllPartInUse
+ } else {
+ return errs.RetErrAllPartWaiting
+ }
+ }
+ return 0
+}
+
+// SelectPartition returns a partition which is available to be consumed.
+// If no partition can be use, an error will be returned.
+func (r *RmtDataCache) SelectPartition() (*metadata.Partition, error) {
+ r.metaMu.Lock()
+ defer r.metaMu.Unlock()
+
+ if len(r.partitions) == 0 {
+ return nil, errs.ErrNoPartAssigned
+ } else {
+ if len(r.indexPartitions) == 0 {
+ if len(r.usedPartitions) == 0 {
+ return nil, errs.ErrAllPartInUse
+ } else {
+ return nil, errs.ErrAllPartWaiting
+ }
+ }
+ }
+
+ partitionKey := r.indexPartitions[0]
+ r.indexPartitions = r.indexPartitions[1:]
+ if partition, ok := r.partitions[partitionKey]; !ok {
+ return nil, errs.ErrAllPartInUse
+ } else {
+ r.usedPartitions[partitionKey] = time.Now().UnixNano() /
int64(time.Millisecond)
+ return partition, nil
+ }
+}
+
+func (r *RmtDataCache) ReleasePartition(checkDelay bool, filterConsume bool,
confirmContext string, isConsumed bool) error {
+ partitionKey, bookedTime, err :=
util.ParseConfirmContext(confirmContext)
+ if err != nil {
+ return err
+ }
+ r.metaMu.Lock()
+ defer r.metaMu.Unlock()
+
+ if partition, ok := r.partitions[partitionKey]; !ok {
+ delete(r.usedPartitions, partitionKey)
+ r.removeFromIndexPartitions(partitionKey)
+ return fmt.Errorf("not found the partition in Consume Partition
set")
+ } else {
+ if t, ok := r.usedPartitions[partitionKey]; !ok {
+ r.removeFromIndexPartitions(partitionKey)
+ r.indexPartitions = append(r.indexPartitions,
partitionKey)
+ } else {
+ if t == bookedTime {
+ delete(r.usedPartitions, partitionKey)
+ r.removeFromIndexPartitions(partitionKey)
+ delay := int64(0)
+ if checkDelay {
+ delay =
partition.ProcConsumeResult(r.defHandler, r.groupHandler, filterConsume,
isConsumed)
+ }
+ if delay > 10 {
+ r.partitionTimeouts[partitionKey] =
time.AfterFunc(time.Duration(delay)*time.Millisecond, func() {
+
r.resetIdlePartition(partitionKey, true)
+ })
+ } else {
+ r.indexPartitions =
append(r.indexPartitions, partitionKey)
+ }
+ } else {
+ return fmt.Errorf("illegal confirmContext
content: context not equal")
+ }
+ }
+ }
+ return nil
+}
+
func (r *RmtDataCache) removeFromIndexPartitions(partitionKey string) {
pos := 0
for i, p := range r.indexPartitions {
@@ -344,3 +457,19 @@ func (r *RmtDataCache)
removeFromIndexPartitions(partitionKey string) {
}
r.indexPartitions = append(r.indexPartitions[:pos],
r.indexPartitions[pos+1:]...)
}
+
+func (r *RmtDataCache) BookPartitionInfo(partitionKey string, currOffset
int64) {
+ if currOffset >= 0 {
+ r.dataBookMu.Lock()
+ defer r.dataBookMu.Unlock()
+ r.partitionOffset[partitionKey] = currOffset
+ }
+}
+
+func (r *RmtDataCache) BookConsumeData(partitionKey string, data
*metadata.ConsumeData) {
+ r.metaMu.Lock()
+ defer r.metaMu.Unlock()
+ if partition, ok := r.partitions[partitionKey]; ok {
+ partition.BookConsumeData(data)
+ }
+}
diff --git a/tubemq-client-twins/tubemq-client-go/util/util.go
b/tubemq-client-twins/tubemq-client-go/util/util.go
index 1157d39..e3b74f8 100644
--- a/tubemq-client-twins/tubemq-client-go/util/util.go
+++ b/tubemq-client-twins/tubemq-client-go/util/util.go
@@ -19,7 +19,10 @@
package util
import (
+ "fmt"
"net"
+ "strconv"
+ "strings"
)
// InValidValue of TubeMQ config.
@@ -72,3 +75,55 @@ func GenBrokerAuthenticateToken(username string, password
string) string {
func GenMasterAuthenticateToken(username string, password string) string {
return ""
}
+
+// ParseConfirmContext parses the confirmcontext to partition key and
bookedTime.
+func ParseConfirmContext(confirmContext string) (string, int64, error) {
+ res := strings.Split(confirmContext, "@")
+ if len(res) == 0 {
+ return "", 0, fmt.Errorf("illegal confirmContext content:
unregular value format")
+ }
+ partitionKey := res[0]
+ bookedTime, err := strconv.ParseInt(res[1], 10, 64)
+ if err != nil {
+ return "", 0, err
+ }
+ return partitionKey, bookedTime, nil
+}
+
+// SplitToMap split the given string by the two step strings to map.
+func SplitToMap(source string, step1 string, step2 string) map[string]string {
+ pos1 := 0
+ pos2 := strings.Index(source, step1)
+ pos3 := 0
+ m := make(map[string]string)
+ for pos2 != -1 {
+ itemStr := strings.TrimSpace(source[pos1 : pos2-pos1])
+ if len(itemStr) == 0 {
+ continue
+ }
+ pos1 = pos2 + len(step1)
+ pos2 = strings.Index(source[pos1:], step1)
+ pos3 = strings.Index(itemStr, step2)
+ if pos3 == -1 {
+ continue
+ }
+ key := strings.TrimSpace(itemStr[:pos3])
+ val := strings.TrimSpace(itemStr[pos3+len(step2):])
+ if len(key) == 0 {
+ continue
+ }
+ m[key] = val
+ }
+ if pos1 != len(source) {
+ itemStr := strings.TrimSpace(source[pos1:])
+ pos3 = strings.Index(itemStr, step2)
+ if pos3 != -1 {
+ key := strings.TrimSpace(itemStr[:pos3])
+ val := strings.TrimSpace(itemStr[pos3+len(step2):])
+ if len(key) > 0 {
+ m[key] = val
+ }
+ }
+ }
+ return m
+}