This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new 9753c44  [INLONG-627]Go SDK GetMessage API (#481)
9753c44 is described below

commit 9753c447da90307a4ae62c3c3186baa710063a44
Author: Zijie Lu <[email protected]>
AuthorDate: Wed Jun 23 17:28:53 2021 +0800

    [INLONG-627]Go SDK GetMessage API (#481)
    
    * [INLONG-624]Go SDK consumer interface
    
    Signed-off-by: Zijie Lu <[email protected]>
---
 .../tubemq-client-go/client/consumer.go            |   4 +
 .../tubemq-client-go/client/consumer_impl.go       | 181 +++++++-
 .../client/{consumer.go => message.go}             |  27 +-
 .../client/{consumer.go => peer.go}                |  25 +-
 tubemq-client-twins/tubemq-client-go/errs/errs.go  |  13 +
 .../tubemq-client-go/flowctrl/handler.go           | 460 +++++++++++++++++++++
 .../tubemq-client-go/flowctrl/item.go              | 126 ++++++
 .../{client/consumer.go => flowctrl/result.go}     |  44 +-
 .../tubemq-client-go/metadata/partition.go         | 124 ++++++
 .../tubemq-client-go/remote/remote.go              | 143 ++++++-
 tubemq-client-twins/tubemq-client-go/util/util.go  |  55 +++
 11 files changed, 1137 insertions(+), 65 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go 
b/tubemq-client-twins/tubemq-client-go/client/consumer.go
index 27a63c6..a4eec0b 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -21,6 +21,10 @@ package client
 
 // ConsumerResult of a consumption.
 type ConsumerResult struct {
+       topicName      string
+       confirmContext string
+       peerInfo       *PeerInfo
+       messages       []*Message
 }
 
 // ConsumerOffset of a consumption,
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go 
b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index f4211c5..6db4f40 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -19,6 +19,8 @@ package client
 
 import (
        "context"
+       "encoding/binary"
+       "hash/crc32"
        "os"
        "strconv"
        "sync/atomic"
@@ -188,7 +190,53 @@ func (c *consumer) processAuthorizedToken(info 
*protocol.MasterAuthorizedInfo) {
 
 // GetMessage implementation of TubeMQ consumer.
 func (c *consumer) GetMessage() (*ConsumerResult, error) {
-       panic("implement me")
+       err := c.checkPartitionErr()
+       if err != nil {
+               return nil, err
+       }
+       partition, err := c.rmtDataCache.SelectPartition()
+       if err != nil {
+               return nil, err
+       }
+       confirmContext := partition.GetPartitionKey() + "@" + 
strconv.Itoa(int(time.Now().UnixNano()/int64(time.Millisecond)))
+       isFiltered := c.subInfo.IsFiltered(partition.GetTopic())
+       pi := &PeerInfo{
+               brokerHost:   partition.GetBroker().GetHost(),
+               partitionID:  uint32(partition.GetPartitionID()),
+               partitionKey: partition.GetPartitionKey(),
+               currOffset:   util.InvalidValue,
+       }
+       m := &metadata.Metadata{}
+       node := &metadata.Node{}
+       node.SetHost(util.GetLocalHost())
+       node.SetAddress(partition.GetBroker().GetAddress())
+       m.SetNode(node)
+       sub := &metadata.SubscribeInfo{}
+       sub.SetGroup(c.config.Consumer.Group)
+       sub.SetPartition(partition)
+       m.SetSubscribeInfo(sub)
+
+       ctx, cancel := context.WithTimeout(context.Background(), 
c.config.Net.ReadTimeout)
+       defer cancel()
+       rsp, err := c.client.GetMessageRequestC2B(ctx, m, c.subInfo, 
c.rmtDataCache)
+       if err != nil {
+               return nil, err
+       }
+       cs := &ConsumerResult{
+               topicName:      partition.GetTopic(),
+               confirmContext: confirmContext,
+               peerInfo:       pi,
+       }
+       if !rsp.GetSuccess() {
+               err := c.rmtDataCache.ReleasePartition(true, isFiltered, 
confirmContext, false)
+               return cs, err
+       }
+       msgs, err := c.processGetMessageRspB2C(pi, isFiltered, partition, 
confirmContext, rsp)
+       if err != nil {
+               return cs, err
+       }
+       cs.messages = msgs
+       return cs, err
 }
 
 // Confirm implementation of TubeMQ consumer.
@@ -280,7 +328,6 @@ func (c *consumer) connect2Broker(event 
*metadata.ConsumerEvent) {
                                rsp, err := c.sendRegisterReq2Broker(partition, 
node)
                                if err != nil {
                                        //todo add log
-                                       return
                                }
                                if !rsp.GetSuccess() {
                                        //todo add log
@@ -372,3 +419,133 @@ func (c *consumer) getConsumeReadStatus(isFirstReg bool) 
int32 {
        }
        return int32(readStatus)
 }
+
+func (c *consumer) checkPartitionErr() error {
+       startTime := time.Now().UnixNano() / int64(time.Millisecond)
+       for {
+               ret := c.rmtDataCache.GetCurConsumeStatus()
+               if ret == 0 {
+                       return nil
+               }
+               if c.config.Consumer.MaxPartCheckPeriod >= 0 &&
+                       time.Now().UnixNano()/int64(time.Millisecond)-startTime 
>= c.config.Consumer.MaxPartCheckPeriod.Milliseconds() {
+                       switch ret {
+                       case errs.RetErrNoPartAssigned:
+                               return errs.ErrNoPartAssigned
+                       case errs.RetErrAllPartInUse:
+                               return errs.ErrAllPartInUse
+                       case errs.RetErrAllPartWaiting:
+                               return errs.ErrAllPartWaiting
+                       }
+               }
+               time.Sleep(c.config.Consumer.PartCheckSlice)
+       }
+}
+
+func (c *consumer) processGetMessageRspB2C(pi *PeerInfo, filtered bool, 
partition *metadata.Partition, confirmContext string, rsp 
*protocol.GetMessageResponseB2C) ([]*Message, error) {
+       limitDlt := int64(300)
+       escLimit := rsp.GetEscFlowCtrl()
+       now := time.Now().UnixNano() / int64(time.Millisecond)
+       switch rsp.GetErrCode() {
+       case errs.RetSuccess:
+               dataDleVal := util.InvalidValue
+               if rsp.GetCurrDataDlt() >= 0 {
+                       dataDleVal = rsp.GetCurrDataDlt()
+               }
+               currOffset := util.InvalidValue
+               if rsp.GetCurrOffset() >= 0 {
+                       currOffset = rsp.GetCurrOffset()
+               }
+               msgSize, msgs := c.convertMessages(filtered, 
partition.GetTopic(), rsp)
+               c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), 
currOffset)
+               cd := metadata.NewConsumeData(now, 200, escLimit, 
int32(msgSize), 0, dataDleVal, rsp.GetRequireSlow())
+               c.rmtDataCache.BookConsumeData(partition.GetPartitionKey(), cd)
+               pi.currOffset = currOffset
+               return msgs, nil
+       case errs.RetErrHBNoNode, errs.RetCertificateFailure, 
errs.RetErrDuplicatePartition:
+               partitionKey, _, err := util.ParseConfirmContext(confirmContext)
+               if err != nil {
+                       return nil, err
+               }
+               c.rmtDataCache.RemovePartition([]string{partitionKey})
+               return nil, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
+       case errs.RetErrConsumeSpeedLimit:
+               defDltTime := int64(rsp.GetMinLimitTime())
+               if defDltTime == 0 {
+                       defDltTime = 
c.config.Consumer.MsgNotFoundWait.Milliseconds()
+               }
+               cd := metadata.NewConsumeData(now, rsp.GetErrCode(), false, 0, 
limitDlt, defDltTime, rsp.GetRequireSlow())
+               c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), 
util.InvalidValue)
+               c.rmtDataCache.BookConsumeData(partition.GetPartitionKey(), cd)
+               return nil, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
+       case errs.RetErrNotFound:
+               limitDlt = c.config.Consumer.MsgNotFoundWait.Milliseconds()
+       case errs.RetErrForbidden:
+               limitDlt = 2000
+       case errs.RetErrMoved:
+               limitDlt = 200
+       case errs.RetErrServiceUnavailable:
+       }
+       if rsp.GetErrCode() != errs.RetSuccess {
+               cd := metadata.NewConsumeData(now, rsp.GetErrCode(), false, 0, 
limitDlt, util.InvalidValue, rsp.GetRequireSlow())
+               c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), 
util.InvalidValue)
+               c.rmtDataCache.BookConsumeData(partition.GetPartitionKey(), cd)
+               c.rmtDataCache.ReleasePartition(true, filtered, confirmContext, 
false)
+               return nil, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
+       }
+       return nil, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
+}
+
+func (c *consumer) convertMessages(filtered bool, topic string, rsp 
*protocol.GetMessageResponseB2C) (int, []*Message) {
+       msgSize := 0
+       if len(rsp.GetMessages()) == 0 {
+               return msgSize, nil
+       }
+
+       msgs := make([]*Message, 0, len(rsp.GetMessages()))
+       for _, m := range rsp.GetMessages() {
+               checkSum := uint64(crc32.Update(0, crc32.IEEETable, 
m.GetPayLoadData())) & 0x7FFFFFFFF
+               if int32(checkSum) != m.GetCheckSum() {
+                       continue
+               }
+               readPos := 0
+               dataLen := len(m.GetPayLoadData())
+               var properties map[string]string
+               if m.GetFlag()&0x01 == 1 {
+                       if len(m.GetPayLoadData()) < 4 {
+                               continue
+                       }
+                       attrLen := 
int(binary.BigEndian.Uint64(m.GetPayLoadData()))
+                       readPos += 4
+                       dataLen -= 4
+
+                       attribute := m.GetPayLoadData()[readPos : 
readPos+attrLen]
+                       readPos -= attrLen
+                       dataLen -= attrLen
+                       properties := util.SplitToMap(string(attribute), ",", 
"=")
+                       if filtered {
+                               topicFilters := c.subInfo.GetTopicFilters()
+                               if msgKey, ok := properties["$msgType$"]; ok {
+                                       if filters, ok := topicFilters[topic]; 
ok {
+                                               for _, filter := range filters {
+                                                       if filter == msgKey {
+                                                               continue
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
+               }
+               msg := &Message{
+                       topic:      topic,
+                       flag:       m.GetFlag(),
+                       id:         m.GetMessageId(),
+                       properties: properties,
+                       dataLen:    int32(dataLen),
+                       data:       string(m.GetPayLoadData()[:readPos]),
+               }
+               msgs = append(msgs, msg)
+               msgSize += dataLen
+       }
+       return msgSize, msgs
+}
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go 
b/tubemq-client-twins/tubemq-client-go/client/message.go
similarity index 55%
copy from tubemq-client-twins/tubemq-client-go/client/consumer.go
copy to tubemq-client-twins/tubemq-client-go/client/message.go
index 27a63c6..f511647 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/message.go
@@ -15,26 +15,13 @@
  * limitations under the License.
  */
 
-// Package client defines the api and information
-// which can be exposed to user.
 package client
 
-// ConsumerResult of a consumption.
-type ConsumerResult struct {
-}
-
-// ConsumerOffset of a consumption,
-type ConsumerOffset struct {
-}
-
-var clientID uint64
-
-// Consumer is an interface that abstracts behavior of TubeMQ's consumer
-type Consumer interface {
-       // GetMessage receive a single message.
-       GetMessage() (*ConsumerResult, error)
-       // Confirm the consumption of a message.
-       Confirm(confirmContext string, consumed bool) (*ConsumerResult, error)
-       // GetCurrConsumedInfo returns the consumptions of the consumer.
-       GetCurrConsumedInfo() (map[string]*ConsumerOffset, error)
+type Message struct {
+       topic      string
+       data       string
+       dataLen    int32
+       id         int64
+       flag       int32
+       properties map[string]string
 }
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go 
b/tubemq-client-twins/tubemq-client-go/client/peer.go
similarity index 55%
copy from tubemq-client-twins/tubemq-client-go/client/consumer.go
copy to tubemq-client-twins/tubemq-client-go/client/peer.go
index 27a63c6..2635d7d 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/peer.go
@@ -15,26 +15,11 @@
  * limitations under the License.
  */
 
-// Package client defines the api and information
-// which can be exposed to user.
 package client
 
-// ConsumerResult of a consumption.
-type ConsumerResult struct {
-}
-
-// ConsumerOffset of a consumption,
-type ConsumerOffset struct {
-}
-
-var clientID uint64
-
-// Consumer is an interface that abstracts behavior of TubeMQ's consumer
-type Consumer interface {
-       // GetMessage receive a single message.
-       GetMessage() (*ConsumerResult, error)
-       // Confirm the consumption of a message.
-       Confirm(confirmContext string, consumed bool) (*ConsumerResult, error)
-       // GetCurrConsumedInfo returns the consumptions of the consumer.
-       GetCurrConsumedInfo() (map[string]*ConsumerOffset, error)
+type PeerInfo struct {
+       partitionID  uint32
+       brokerHost   string
+       partitionKey string
+       currOffset   int64
 }
diff --git a/tubemq-client-twins/tubemq-client-go/errs/errs.go 
b/tubemq-client-twins/tubemq-client-go/errs/errs.go
index eb45179..20427b5 100644
--- a/tubemq-client-twins/tubemq-client-go/errs/errs.go
+++ b/tubemq-client-twins/tubemq-client-go/errs/errs.go
@@ -35,15 +35,28 @@ const (
        RetRequestFailure = 5
        // RetSelectorNotExist represents the selector not exists.
        RetSelectorNotExist        = 6
+       RetSuccess                 = 200
+       RetErrMoved                = 301
+       RetErrForbidden            = 403
+       RetErrNotFound             = 404
+       RetErrNoPartAssigned       = 406
+       RetErrAllPartWaiting       = 407
+       RetErrAllPartInUse         = 408
        RetErrHBNoNode             = 411
+       RetErrDuplicatePartition   = 412
        RetCertificateFailure      = 415
        RetConsumeGroupForbidden   = 450
        RetConsumeContentForbidden = 455
+       RetErrServiceUnavailable   = 503
+       RetErrConsumeSpeedLimit    = 550
 )
 
 // ErrAssertionFailure represents RetAssertionFailure error.
 var (
        ErrAssertionFailure = New(RetAssertionFailure, "AssertionFailure")
+       ErrNoPartAssigned   = New(RetErrNoPartAssigned, "No partition info in 
local cache, please retry later!")
+       ErrAllPartWaiting   = New(RetErrAllPartWaiting, "All partitions reach 
max position, please retry later!")
+       ErrAllPartInUse     = New(RetErrAllPartInUse, "No idle partition to 
consume, please retry later!")
 )
 
 // Error provides a TubeMQ-specific error container
diff --git a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go 
b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
new file mode 100644
index 0000000..d9fd9bc
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// package flowctrl defines the rule and handle logic of flow control.
+package flowctrl
+
+import (
+       "encoding/json"
+       "fmt"
+       "math"
+       "sort"
+       "strconv"
+       "strings"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+type RuleHandler struct {
+       lastUpdate         int64
+       flowCtrlInfo       string
+       qrypriorityID      int64
+       dataSizeLimit      int64
+       flowCtrlID         int64
+       minZeroCount       int64
+       minDataDltLimit    int64
+       dataLimitStartTime int64
+       dataLimitEndTime   int64
+       configMu           sync.Mutex
+       flowCtrlRules      map[int32][]*Item
+       flowCtrlItem       *Item
+}
+
+// NewRuleHandler returns the a default RuleHandler.
+func NewRuleHandler() *RuleHandler {
+       return &RuleHandler{
+               flowCtrlID:         util.InvalidValue,
+               minZeroCount:       math.MaxInt64,
+               qrypriorityID:      util.InvalidValue,
+               minDataDltLimit:    math.MaxInt64,
+               dataLimitStartTime: 2500,
+               dataLimitEndTime:   util.InvalidValue,
+               lastUpdate:         time.Now().UnixNano() / 
int64(time.Millisecond),
+       }
+}
+
+// GetQryPriorityID returns the qrypriorityID.
+func (h *RuleHandler) GetQryPriorityID() int64 {
+       return atomic.LoadInt64(&h.qrypriorityID)
+}
+
+// GetFlowCtrID returns the flowCtrlID.
+func (h *RuleHandler) GetFlowCtrID() int64 {
+       return atomic.LoadInt64(&h.flowCtrlID)
+}
+
+// SetQryPriorityID sets the qrypriorityID
+func (h *RuleHandler) SetQryPriorityID(qryPriorityID int64) {
+       atomic.StoreInt64(&h.qrypriorityID, qryPriorityID)
+}
+
+// UpdateDefFlowCtrlInfo updates the flow control information.
+func (h *RuleHandler) UpdateDefFlowCtrlInfo(isDefault bool, qrypriorityID 
int64, flowCtrlID int64, info string) error {
+       if atomic.LoadInt64(&h.flowCtrlID) == flowCtrlID {
+               return nil
+       }
+       //curFlowCtrlID := atomic.LoadInt64(&h.flowCtrlID)
+       var flowCtrlItems map[int32][]*Item
+       var err error
+       if len(info) > 0 {
+               flowCtrlItems, err = parseFlowCtrlInfo(info)
+               if err != nil {
+                       return err
+               }
+       }
+       h.configMu.Lock()
+       defer h.configMu.Unlock()
+       h.clearStatisticData()
+       atomic.StoreInt64(&h.qrypriorityID, qrypriorityID)
+       if len(flowCtrlItems) == 0 {
+               h.flowCtrlRules = make(map[int32][]*Item)
+               info = ""
+       } else {
+               h.flowCtrlRules = flowCtrlItems
+               h.flowCtrlInfo = info
+               h.initStatisticData()
+       }
+       h.lastUpdate = time.Now().UnixNano() / int64(time.Millisecond)
+       if isDefault {
+               // todo log
+               // LOG_INFO("[Flow Ctrl] Default FlowCtrl's flowctrl_id from 
%ld to %ld\n", curr_flowctrl_id,
+               //             flowctrl_id)
+       } else {
+               // todo log
+               // LOG_INFO("[Flow Ctrl] Group FlowCtrl's flowctrl_id from %ld 
to %ld\n", curr_flowctrl_id,
+               //             flowctrl_id)
+       }
+       return nil
+}
+
+func (h *RuleHandler) initStatisticData() {
+       for id, rules := range h.flowCtrlRules {
+               if id == 0 {
+                       for _, rule := range rules {
+                               if rule.tp != 0 {
+                                       continue
+                               }
+
+                               if rule.datadlt < 
atomic.LoadInt64(&h.minDataDltLimit) {
+                                       atomic.StoreInt64(&h.minDataDltLimit, 
rule.datadlt)
+                               }
+                               if rule.startTime < 
atomic.LoadInt64(&h.dataLimitStartTime) {
+                                       
atomic.StoreInt64(&h.dataLimitStartTime, rule.startTime)
+                               }
+                               if rule.endTime < 
atomic.LoadInt64(&h.dataLimitEndTime) {
+                                       atomic.StoreInt64(&h.dataLimitEndTime, 
rule.endTime)
+                               }
+                       }
+               }
+               if id == 1 {
+                       for _, rule := range rules {
+                               if rule.tp != 1 {
+                                       continue
+                               }
+                               if rule.zeroCnt < 
atomic.LoadInt64(&h.minZeroCount) {
+                                       atomic.StoreInt64(&h.minZeroCount, 
rule.zeroCnt)
+                               }
+                       }
+               }
+               if id == 3 {
+                       for _, rule := range rules {
+                               if rule.tp != 3 {
+                                       continue
+                               }
+                               h.flowCtrlItem.SetTp(3)
+                               
h.flowCtrlItem.SetDataSizeLimit(rule.dataSizeLimit)
+                               h.flowCtrlItem.SetFreqLimit(rule.freqMsLimit)
+                               h.flowCtrlItem.SetZeroCnt(rule.zeroCnt)
+                       }
+               }
+       }
+}
+
+func (h *RuleHandler) clearStatisticData() {
+       atomic.StoreInt64(&h.minZeroCount, util.InvalidValue)
+       atomic.StoreInt64(&h.minDataDltLimit, math.MaxInt64)
+       atomic.StoreInt64(&h.qrypriorityID, util.InvalidValue)
+       atomic.StoreInt64(&h.dataSizeLimit, 2500)
+       atomic.StoreInt64(&h.dataLimitEndTime, util.InvalidValue)
+       h.flowCtrlItem.clear()
+}
+
+// GetCurDataLimit returns the flow control result based on lastDataDlt.
+func (h *RuleHandler) GetCurDataLimit(lastDataDlt int64) *Result {
+       l := time.FixedZone("GMT", 3600*8)
+       now := time.Now()
+       hour := now.In(l).Hour()
+       min := now.In(l).Minute()
+       curTime := int64(hour*100 + min)
+       if lastDataDlt < atomic.LoadInt64(&h.minDataDltLimit) ||
+               curTime < atomic.LoadInt64(&h.dataLimitStartTime) ||
+               curTime > atomic.LoadInt64(&h.dataLimitEndTime) {
+               return nil
+       }
+       h.configMu.Lock()
+       defer h.configMu.Lock()
+       if _, ok := h.flowCtrlRules[0]; !ok {
+               return nil
+       }
+       for _, rule := range h.flowCtrlRules[0] {
+               result := rule.GetDataLimit(lastDataDlt, curTime)
+               if result != nil {
+                       return result
+               }
+       }
+       return nil
+}
+
+// GetFilterCtrlItem returns the flow control item.
+func (h *RuleHandler) GetFilterCtrlItem() *Item {
+       h.configMu.Lock()
+       defer h.configMu.Unlock()
+       return h.flowCtrlItem
+}
+
+// GetCurFreqLimitTim returns curFreqLimitTime.
+func (h *RuleHandler) GetCurFreqLimitTime(msgZeroCnt int32, receivedLimit 
int64) int64 {
+       limitData := receivedLimit
+       if int64(msgZeroCnt) < atomic.LoadInt64(&h.minZeroCount) {
+               return limitData
+       }
+       h.configMu.Lock()
+       defer h.configMu.Unlock()
+       if _, ok := h.flowCtrlRules[1]; !ok {
+               return limitData
+       }
+       for _, rule := range h.flowCtrlRules[1] {
+               limitData = rule.getFreLimit(msgZeroCnt)
+               if limitData >= 0 {
+                       break
+               }
+       }
+       return limitData
+}
+
+// getMinZeroCnt returns the minZeroCount,
+func (h *RuleHandler) GetMinZeroCnt() int64 {
+       return atomic.LoadInt64(&h.minZeroCount)
+}
+
+func parseFlowCtrlInfo(info string) (map[int32][]*Item, error) {
+       if len(info) == 0 {
+               return nil, nil
+       }
+       var docs []map[string]interface{}
+       if err := json.Unmarshal([]byte(info), &docs); err != nil {
+               return nil, err
+       }
+       flowCtrlMap := make(map[int32][]*Item)
+       for _, doc := range docs {
+               if _, ok := doc["type"]; !ok {
+                       return nil, fmt.Errorf("field not existed")
+               }
+               tp, err := parseInt(doc, "type", false, util.InvalidValue)
+               if err != nil {
+                       return nil, err
+               }
+
+               if tp < 0 || tp > 3 {
+                       return nil, fmt.Errorf("illegal value, not required 
value content")
+               }
+
+               rule, err := parseRule(doc)
+               if err != nil {
+                       return nil, err
+               }
+               switch tp {
+               case 1:
+                       flowCtrlItems, err := parseFreqLimit(rule)
+                       if err != nil {
+                               return nil, err
+                       }
+                       flowCtrlMap[1] = flowCtrlItems
+               case 3:
+                       flowCtrlItems, err := parseLowFetchLimit(rule)
+                       if err != nil {
+                               return nil, err
+                       }
+                       flowCtrlMap[3] = flowCtrlItems
+               case 0:
+                       flowCtrlItems, err := parseDataLimit(rule)
+                       if err != nil {
+                               return nil, err
+                       }
+                       flowCtrlMap[0] = flowCtrlItems
+               }
+       }
+       return flowCtrlMap, nil
+}
+
+func parseRule(doc map[string]interface{}) ([]byte, error) {
+       if _, ok := doc["rule"]; !ok {
+               return nil, fmt.Errorf("rule field not existed")
+       }
+       if _, ok := doc["rule"].([]byte); !ok {
+               return nil, fmt.Errorf("rule should be a string")
+       }
+       v := doc["rule"].([]byte)
+       return v, nil
+}
+
+func parseDataLimit(b []byte) ([]*Item, error) {
+       var rules []map[string]interface{}
+       if err := json.Unmarshal(b, &rules); err != nil {
+               return nil, err
+       }
+       items := make([]*Item, 0, len(rules))
+       for i, rule := range rules {
+               start, err := parseTime(rule, "start")
+               if err != nil {
+                       return nil, err
+               }
+               end, err := parseTime(rule, "end")
+               if err != nil {
+                       return nil, err
+               }
+               if start > end {
+                       return nil, fmt.Errorf("start value must lower than the 
End value in index(%d) of data limit rule", i)
+               }
+               datadlt, err := parseInt(rule, "dltInM", false, -1)
+               if err != nil {
+                       return nil, fmt.Errorf("dltInM key is required in 
index(%d) of data limit rule", i)
+               }
+               datadlt = datadlt * 1024 * 204
+               dataSizeLimit, err := parseInt(rule, "limitInM", false, -1)
+               if err != nil {
+                       return nil, fmt.Errorf("limitInM key is required in 
index(%d) of data limit rule", i)
+               }
+               if dataSizeLimit < 0 {
+                       return nil, fmt.Errorf("limitInM value must over than 
equal or bigger than zero in index(%d) of data limit rule", i)
+               }
+               dataSizeLimit = dataSizeLimit * 1024 * 1024
+               freqMsLimit, err := parseInt(rule, "freqInMs", false, -1)
+               if err != nil {
+                       return nil, fmt.Errorf("freqInMs key is required in 
index((%d) of data limit rule", i)
+               }
+               if freqMsLimit < 200 {
+                       return nil, fmt.Errorf("freqInMs value must over than 
equal or bigger than 200 in index(%d) of data limit rule", i)
+               }
+               item := NewItem()
+               item.SetTp(0)
+               item.SetStartTime(start)
+               item.SetEndTime(end)
+               item.SetDatadlt(datadlt)
+               item.SetDataSizeLimit(dataSizeLimit)
+               item.SetFreqLimit(freqMsLimit)
+               items = append(items, item)
+       }
+       return items, nil
+}
+
+func parseLowFetchLimit(b []byte) ([]*Item, error) {
+       var rules []map[string]interface{}
+       if err := json.Unmarshal(b, &rules); err != nil {
+               return nil, err
+       }
+       items := make([]*Item, 0, len(rules))
+       for _, rule := range rules {
+               var filterFreqMs int64
+               var err error
+               if _, ok := rule["filterFreqInMs"]; ok {
+                       filterFreqMs, err = parseInt(rule, "filterFreqInMs", 
false, -1)
+                       if err != nil {
+                               return nil, fmt.Errorf("decode failure: %s of 
filterFreqInMs field in parse low fetch limit", err.Error())
+                       }
+               }
+               if filterFreqMs < 0 || filterFreqMs > 300000 {
+                       return nil, fmt.Errorf("decode failure: filterFreqInMs 
value must in [0, 300000] in index(%d) of low fetch limit rule", filterFreqMs)
+               }
+
+               var minFilteFreqMs int64
+               if _, ok := rule["minDataFilterFreqInMs"]; ok {
+                       minFilteFreqMs, err = parseInt(rule, 
"minDataFilterFreqInMs", false, -1)
+                       if err != nil {
+                               return nil, fmt.Errorf("decode failure: %s of 
minDataFilterFreqInMs field in parse low fetch limit", err.Error())
+                       }
+               }
+               if minFilteFreqMs < 0 || minFilteFreqMs > 300000 {
+                       return nil, fmt.Errorf("decode failure: 
minDataFilterFreqInMs value must in [0, 300000] in index(%d) of low fetch limit 
rule", filterFreqMs)
+               }
+               if minFilteFreqMs < filterFreqMs {
+                       return nil, fmt.Errorf("decode failure: 
minDataFilterFreqInMs must lower than filterFreqInMs in index(%d) of low fetch 
limit rule", filterFreqMs)
+               }
+               var normFreqMs int64
+               if _, ok := rule["normFreqInMs"]; ok {
+                       normFreqMs, err = parseInt(rule, "normFreqInMs", false, 
-1)
+                       if err != nil {
+                               return nil, fmt.Errorf("decode failure: %s of 
normFreqInMs field in parse low fetch limit", err.Error())
+                       }
+                       if normFreqMs < 0 || normFreqMs > 30000 {
+                               return nil, fmt.Errorf("decode failure: 
normFreqInMs value must in [0, 300000] in index(%d) of low fetch limit rule", 
filterFreqMs)
+                       }
+               }
+               item := NewItem()
+               item.SetTp(3)
+               item.SetDataSizeLimit(normFreqMs)
+               item.SetFreqLimit(filterFreqMs)
+               item.SetZeroCnt(minFilteFreqMs)
+               items = append(items, item)
+       }
+       return items, nil
+}
+
+func parseFreqLimit(b []byte) ([]*Item, error) {
+       var rules []map[string]interface{}
+       if err := json.Unmarshal(b, &rules); err != nil {
+               return nil, err
+       }
+       items := make([]*Item, 0, len(rules))
+       for _, rule := range rules {
+               zeroCnt, err := parseInt(rule, "zeroCnt", false, 
util.InvalidValue)
+               if err != nil {
+                       return nil, err
+               }
+               freqLimit, err := parseInt(rule, "freqInMs", false, 
util.InvalidValue)
+               if err != nil {
+                       return nil, err
+               }
+               item := NewItem()
+               item.SetTp(1)
+               item.SetZeroCnt(zeroCnt)
+               item.SetFreqLimit(freqLimit)
+               items = append(items, item)
+       }
+       if len(items) > 0 {
+               sort.Slice(items, func(i, j int) bool {
+                       return items[i].zeroCnt < items[j].zeroCnt
+               })
+       }
+       return items, nil
+}
+
+func parseInt(doc map[string]interface{}, key string, compare bool, required 
int64) (int64, error) {
+       if _, ok := doc[key]; !ok {
+               return util.InvalidValue, fmt.Errorf("field not existed")
+       }
+       if _, ok := doc[key].(int64); !ok {
+               return util.InvalidValue, fmt.Errorf("illegal value, must be 
int type")
+       }
+       v := doc[key].(int64)
+       if compare && v != required {
+               return util.InvalidValue, fmt.Errorf("illegal value, not 
required value content")
+       }
+       return v, nil
+}
+
+func parseTime(doc map[string]interface{}, key string) (int64, error) {
+       if _, ok := doc[key]; !ok {
+               return util.InvalidValue, fmt.Errorf("field %s not existed", 
key)
+       }
+       if _, ok := doc[key].(string); !ok {
+               return util.InvalidValue, fmt.Errorf("field %s must be int 
type", key)
+       }
+       s := doc[key].(string)
+       pos1 := strings.Index(s, ":")
+       if pos1 == -1 {
+               return util.InvalidValue, fmt.Errorf("field %s must be 'aa:bb' 
and 'aa','bb' must be int value format", key)
+       }
+       h, err := strconv.Atoi(s[:pos1])
+       if err != nil {
+               return util.InvalidValue, err
+       }
+       if h < 0 || h > 24 {
+               return util.InvalidValue, fmt.Errorf("field %s -hour value must 
in [0,23]", key)
+       }
+       m, err := strconv.Atoi(s[pos1+1:])
+       if err != nil {
+               return util.InvalidValue, err
+       }
+       if m < 0 || m > 59 {
+               return util.InvalidValue, fmt.Errorf("field %s -minute value 
must in [0,59]", key)
+       }
+       return int64(h*100 + m), nil
+}
diff --git a/tubemq-client-twins/tubemq-client-go/flowctrl/item.go 
b/tubemq-client-twins/tubemq-client-go/flowctrl/item.go
new file mode 100644
index 0000000..2a2ec6c
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/flowctrl/item.go
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package flowctrl
+
+import (
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+// Item defines a flow control item.
+type Item struct {
+       tp            int32
+       startTime     int64
+       endTime       int64
+       datadlt       int64
+       dataSizeLimit int64
+       freqMsLimit   int64
+       zeroCnt       int64
+}
+
+// NewItem returns a default flow control item.
+func NewItem() *Item {
+       return &Item{
+               startTime:     2500,
+               endTime:       util.InvalidValue,
+               datadlt:       util.InvalidValue,
+               dataSizeLimit: util.InvalidValue,
+               freqMsLimit:   util.InvalidValue,
+               zeroCnt:       util.InvalidValue,
+       }
+}
+
+// SetTp sets the type.
+func (i *Item) SetTp(tp int32) {
+       i.tp = tp
+}
+
+// SetStartTime sets the startTime.
+func (i *Item) SetStartTime(startTime int64) {
+       i.startTime = startTime
+}
+
+// SetEndTime sets the endTime.
+func (i *Item) SetEndTime(endTime int64) {
+       i.endTime = endTime
+}
+
+// SetDataDlt sets the datadlt.
+func (i *Item) SetDatadlt(datadlt int64) {
+       i.datadlt = datadlt
+}
+
+// SetDataSizeLimit sets the dataSizeLimit.
+func (i *Item) SetDataSizeLimit(dataSizeLimit int64) {
+       i.dataSizeLimit = dataSizeLimit
+}
+
+// SetFreqLimit sets the freqLimit.
+func (i *Item) SetFreqLimit(freqLimit int64) {
+       i.freqMsLimit = freqLimit
+}
+
+// SetZeroCnt sets the zeroCnt.
+func (i *Item) SetZeroCnt(zeroCnt int64) {
+       i.zeroCnt = zeroCnt
+}
+
+func (i *Item) clear() {
+       i.tp = 0
+       i.startTime = 2500
+       i.endTime = util.InvalidValue
+       i.datadlt = util.InvalidValue
+       i.dataSizeLimit = util.InvalidValue
+       i.freqMsLimit = util.InvalidValue
+       i.zeroCnt = util.InvalidValue
+}
+
+// GetDataLimit returns the flow control result.
+func (i *Item) GetDataLimit(dlt int64, time int64) *Result {
+       if i.tp != 0 || dlt <= i.datadlt {
+               return nil
+       }
+       if time < i.startTime || time > i.endTime {
+               return nil
+       }
+       return NewResult(i.dataSizeLimit, int32(i.freqMsLimit))
+}
+
+// GetFreqMsLimit returns the freqMsLimit.
+func (i *Item) GetFreqMsLimit() int64 {
+       return i.freqMsLimit
+}
+
+func (i *Item) getFreLimit(cnt int32) int64 {
+       if i.tp != 1 {
+               return -1
+       }
+       if int64(cnt) >= i.zeroCnt {
+               return i.freqMsLimit
+       }
+       return -1
+}
+
+// GetDataSizeLimit returns the dataSizeLimit.
+func (i *Item) GetDataSizeLimit() int64 {
+       return i.dataSizeLimit
+}
+
+// GetZeroCnt returns the zeroCnt.
+func (i *Item) GetZeroCnt() int64 {
+       return i.zeroCnt
+}
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go 
b/tubemq-client-twins/tubemq-client-go/flowctrl/result.go
similarity index 50%
copy from tubemq-client-twins/tubemq-client-go/client/consumer.go
copy to tubemq-client-twins/tubemq-client-go/flowctrl/result.go
index 27a63c6..c0b3ffd 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/flowctrl/result.go
@@ -15,26 +15,38 @@
  * limitations under the License.
  */
 
-// Package client defines the api and information
-// which can be exposed to user.
-package client
+package flowctrl
 
-// ConsumerResult of a consumption.
-type ConsumerResult struct {
+// Result defines a flow control result.
+type Result struct {
+       dataSizeLimit int64
+       freqMsLimit   int32
 }
 
-// ConsumerOffset of a consumption,
-type ConsumerOffset struct {
+// NewResult returns a new flow control result.
+func NewResult(dataSizeLimit int64, freqMsLimit int32) *Result {
+       return &Result{
+               dataSizeLimit: dataSizeLimit,
+               freqMsLimit:   freqMsLimit,
+       }
 }
 
-var clientID uint64
+// SetDataSizeLimit sets the dataSizeLimit.
+func (r *Result) SetDataSizeLimit(dataSizeLimit int64) {
+       r.dataSizeLimit = dataSizeLimit
+}
+
+// SetFreqMsLimit sets the freqMsLimit.
+func (r *Result) SetFreqMsLimit(freqMsLimit int32) {
+       r.freqMsLimit = freqMsLimit
+}
+
+// GetDataSizeLimit returns the dataSizeLimit.
+func (r *Result) GetDataSizeLimit() int64 {
+       return r.dataSizeLimit
+}
 
-// Consumer is an interface that abstracts behavior of TubeMQ's consumer
-type Consumer interface {
-       // GetMessage receive a single message.
-       GetMessage() (*ConsumerResult, error)
-       // Confirm the consumption of a message.
-       Confirm(confirmContext string, consumed bool) (*ConsumerResult, error)
-       // GetCurrConsumedInfo returns the consumptions of the consumer.
-       GetCurrConsumedInfo() (map[string]*ConsumerOffset, error)
+// GetFreqMsLimit returns the freqMsLimit.
+func (r *Result) GetFreqMsLimit() int64 {
+       return int64(r.freqMsLimit)
 }
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go 
b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
index d01b570..cbf8bae 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/partition.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
@@ -18,8 +18,14 @@
 package metadata
 
 import (
+       "math"
        "strconv"
        "strings"
+       "time"
+
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/flowctrl"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
 )
 
 // Partition represents the metadata of a partition.
@@ -30,6 +36,41 @@ type Partition struct {
        partitionKey string
        offset       int64
        lastConsumed bool
+       consumeData  *ConsumeData
+       flowCtrl     *flowctrl.Result
+       freqCtrl     *flowctrl.Item
+       strategyData *strategyData
+       totalZeroCnt int32
+}
+
+type ConsumeData struct {
+       time        int64
+       errCode     int32
+       escLimit    bool
+       msgSize     int32
+       dltLimit    int64
+       curDataDlt  int64
+       requireSlow bool
+}
+
+type strategyData struct {
+       nextStageUpdate   int64
+       nextSliceUpdate   int64
+       limitSliceMsgSize int64
+       curStageMsgSize   int64
+       curSliceMsgSize   int64
+}
+
+func NewConsumeData(time int64, errCode int32, escLimit bool, msgSize int32, 
dltLimit int64, curDataDlt int64, requireSlow bool) *ConsumeData {
+       return &ConsumeData{
+               time:        time,
+               errCode:     errCode,
+               escLimit:    escLimit,
+               msgSize:     msgSize,
+               dltLimit:    dltLimit,
+               curDataDlt:  curDataDlt,
+               requireSlow: requireSlow,
+       }
 }
 
 // NewPartition parses a partition from the given string.
@@ -96,3 +137,86 @@ func (p *Partition) String() string {
 func (p *Partition) SetLastConsumed(lastConsumed bool) {
        p.lastConsumed = lastConsumed
 }
+
+// BookConsumeData sets the consumeData.
+func (p *Partition) BookConsumeData(data *ConsumeData) {
+       p.consumeData = data
+}
+
+// ProcConsumeResult processes the consume result.
+func (p *Partition) ProcConsumeResult(defHandler *flowctrl.RuleHandler, 
groupHandler *flowctrl.RuleHandler, filterConsume bool, lastConsumed bool) 
int64 {
+       dltTime := time.Now().UnixNano()/int64(time.Millisecond) - 
p.consumeData.time
+       p.updateStrategyData(defHandler, groupHandler)
+       p.lastConsumed = lastConsumed
+       switch p.consumeData.errCode {
+       case errs.RetSuccess, errs.RetErrNotFound:
+               if p.consumeData.msgSize == 0 && p.consumeData.errCode == 200 {
+                       p.totalZeroCnt++
+               } else {
+                       p.totalZeroCnt = 0
+               }
+               if p.totalZeroCnt > 0 {
+                       if groupHandler.GetMinZeroCnt() != math.MaxInt64 {
+                               return 
groupHandler.GetCurFreqLimitTime(p.totalZeroCnt, p.consumeData.dltLimit) - 
dltTime
+                       } else {
+                               return 
defHandler.GetCurFreqLimitTime(p.totalZeroCnt, p.consumeData.dltLimit) - dltTime
+                       }
+               }
+               if p.consumeData.escLimit {
+                       return 0
+               } else {
+                       if p.strategyData.curStageMsgSize >= 
p.flowCtrl.GetDataSizeLimit() ||
+                               p.strategyData.curStageMsgSize >= 
p.strategyData.limitSliceMsgSize {
+                               if p.flowCtrl.GetFreqMsLimit() > 
p.consumeData.dltLimit {
+                                       return p.flowCtrl.GetFreqMsLimit() - 
dltTime
+                               }
+                               return p.consumeData.dltLimit - dltTime
+                       }
+                       if p.consumeData.errCode == errs.RetSuccess {
+                               if filterConsume && p.freqCtrl.GetFreqMsLimit() 
>= 0 {
+                                       if p.consumeData.requireSlow {
+                                               return p.freqCtrl.GetZeroCnt() 
- dltTime
+                                       } else {
+                                               return 
p.freqCtrl.GetFreqMsLimit() - dltTime
+                                       }
+                               } else if !filterConsume && 
p.freqCtrl.GetDataSizeLimit() >= 0 {
+                                       return p.freqCtrl.GetDataSizeLimit() - 
dltTime
+                               }
+                       }
+                       return p.consumeData.dltLimit - dltTime
+               }
+       default:
+               return p.consumeData.curDataDlt - dltTime
+       }
+}
+
+func (p *Partition) updateStrategyData(defHandler *flowctrl.RuleHandler, 
groupHandler *flowctrl.RuleHandler) {
+       p.strategyData.curStageMsgSize += int64(p.consumeData.msgSize)
+       p.strategyData.curSliceMsgSize += int64(p.consumeData.msgSize)
+       curTime := time.Now().UnixNano() / int64(time.Millisecond)
+       if curTime > p.strategyData.nextStageUpdate {
+               p.strategyData.curStageMsgSize = 0
+               p.strategyData.curSliceMsgSize = 0
+               if p.consumeData.curDataDlt >= 0 {
+                       p.flowCtrl = 
groupHandler.GetCurDataLimit(p.consumeData.curDataDlt)
+                       if p.flowCtrl == nil {
+                               p.flowCtrl = 
defHandler.GetCurDataLimit(p.consumeData.curDataDlt)
+                       }
+                       if p.flowCtrl == nil {
+                               p.flowCtrl.SetDataSizeLimit(util.InvalidValue)
+                               p.flowCtrl.SetFreqMsLimit(0)
+                       }
+                       p.freqCtrl = groupHandler.GetFilterCtrlItem()
+                       if p.freqCtrl.GetFreqMsLimit() < 0 {
+                               p.freqCtrl = defHandler.GetFilterCtrlItem()
+                       }
+                       curTime = time.Now().UnixNano() / 
int64(time.Millisecond)
+               }
+               p.strategyData.limitSliceMsgSize = 
p.flowCtrl.GetDataSizeLimit() / 12
+               p.strategyData.nextStageUpdate = curTime + 60000
+               p.strategyData.nextSliceUpdate = curTime + 5000
+       } else if curTime > p.strategyData.nextSliceUpdate {
+               p.strategyData.curSliceMsgSize = 0
+               p.strategyData.nextSliceUpdate = curTime + 5000
+       }
+}
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go 
b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index bfc63b7..873acad 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -19,9 +19,14 @@
 package remote
 
 import (
+       "fmt"
+       "math"
        "sync"
+       "sync/atomic"
        "time"
 
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/flowctrl"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
 )
@@ -30,7 +35,8 @@ import (
 type RmtDataCache struct {
        consumerID         string
        groupName          string
-       underGroupCtrl     bool
+       underGroupCtrl     int32
+       lastCheck          int64
        defFlowCtrlID      int64
        groupFlowCtrlID    int64
        partitionSubInfo   map[string]*metadata.SubscribeInfo
@@ -47,6 +53,9 @@ type RmtDataCache struct {
        partitionTimeouts  map[string]*time.Timer
        topicPartitions    map[string]map[string]bool
        partitionRegBooked map[string]bool
+       partitionOffset    map[string]int64
+       groupHandler       *flowctrl.RuleHandler
+       defHandler         *flowctrl.RuleHandler
        // EventCh is the channel for consumer to consume
        EventCh chan *metadata.ConsumerEvent
 }
@@ -66,6 +75,8 @@ func NewRmtDataCache() *RmtDataCache {
                partitionTimeouts:  make(map[string]*time.Timer),
                topicPartitions:    make(map[string]map[string]bool),
                partitionRegBooked: make(map[string]bool),
+               groupHandler:       flowctrl.NewRuleHandler(),
+               defHandler:         flowctrl.NewRuleHandler(),
                EventCh:            make(chan *metadata.ConsumerEvent, 1),
        }
        return r
@@ -73,17 +84,17 @@ func NewRmtDataCache() *RmtDataCache {
 
 // GetUnderGroupCtrl returns the underGroupCtrl.
 func (r *RmtDataCache) GetUnderGroupCtrl() bool {
-       return r.underGroupCtrl
+       return atomic.LoadInt32(&r.underGroupCtrl) == 0
 }
 
 // GetDefFlowCtrlID returns the defFlowCtrlID.
 func (r *RmtDataCache) GetDefFlowCtrlID() int64 {
-       return r.defFlowCtrlID
+       return r.defHandler.GetFlowCtrID()
 }
 
 // GetGroupFlowCtrlID returns the groupFlowCtrlID.
 func (r *RmtDataCache) GetGroupFlowCtrlID() int64 {
-       return r.groupFlowCtrlID
+       return r.groupHandler.GetFlowCtrID()
 }
 
 // GetGroupName returns the group name.
@@ -104,7 +115,7 @@ func (r *RmtDataCache) GetSubscribeInfo() 
[]*metadata.SubscribeInfo {
 
 // GetQryPriorityID returns the QryPriorityID.
 func (r *RmtDataCache) GetQryPriorityID() int32 {
-       return r.qryPriorityID
+       return int32(r.groupHandler.GetQryPriorityID())
 }
 
 // PollEventResult polls the first event result from the rebalanceResults.
@@ -142,12 +153,29 @@ func (r *RmtDataCache) SetConsumerInfo(consumerID string, 
group string) {
 
 // UpdateDefFlowCtrlInfo updates the defFlowCtrlInfo.
 func (r *RmtDataCache) UpdateDefFlowCtrlInfo(flowCtrlID int64, flowCtrlInfo 
string) {
-
+       if flowCtrlID != r.defHandler.GetFlowCtrID() {
+               r.defHandler.UpdateDefFlowCtrlInfo(true, util.InvalidValue, 
flowCtrlID, flowCtrlInfo)
+       }
 }
 
 // UpdateGroupFlowCtrlInfo updates the groupFlowCtrlInfo.
 func (r *RmtDataCache) UpdateGroupFlowCtrlInfo(qryPriorityID int32, flowCtrlID 
int64, flowCtrlInfo string) {
-
+       if flowCtrlID != r.defHandler.GetFlowCtrID() {
+               r.groupHandler.UpdateDefFlowCtrlInfo(false, 
int64(qryPriorityID), flowCtrlID, flowCtrlInfo)
+       }
+       if int64(qryPriorityID) != r.groupHandler.GetQryPriorityID() {
+               r.groupHandler.SetQryPriorityID(int64(qryPriorityID))
+       }
+       cur := time.Now().UnixNano() / int64(time.Millisecond)
+       if cur-atomic.LoadInt64(&r.lastCheck) > 10000 {
+               result := r.groupHandler.GetCurDataLimit(math.MaxInt64)
+               if result != nil {
+                       atomic.StoreInt32(&r.underGroupCtrl, 1)
+               } else {
+                       atomic.StoreInt32(&r.underGroupCtrl, 0)
+               }
+               atomic.StoreInt64(&r.lastCheck, cur)
+       }
 }
 
 // OfferEventAndNotify offers an consumer event and notifies the consumer 
method and notify the consumer to consume.
@@ -334,6 +362,91 @@ func (r *RmtDataCache) IsFirstRegister(partitionKey 
string) bool {
        return r.partitionRegBooked[partitionKey]
 }
 
+// GetCurConsumeStatus returns the current consumption status.
+func (r *RmtDataCache) GetCurConsumeStatus() int32 {
+       r.metaMu.Lock()
+       defer r.metaMu.Unlock()
+
+       if len(r.partitions) == 0 {
+               return errs.RetErrNoPartAssigned
+       }
+       if len(r.indexPartitions) == 0 {
+               if len(r.usedPartitions) == 0 {
+                       return errs.RetErrAllPartInUse
+               } else {
+                       return errs.RetErrAllPartWaiting
+               }
+       }
+       return 0
+}
+
+// SelectPartition returns a partition which is available to be consumed.
+// If no partition can be use, an error will be returned.
+func (r *RmtDataCache) SelectPartition() (*metadata.Partition, error) {
+       r.metaMu.Lock()
+       defer r.metaMu.Unlock()
+
+       if len(r.partitions) == 0 {
+               return nil, errs.ErrNoPartAssigned
+       } else {
+               if len(r.indexPartitions) == 0 {
+                       if len(r.usedPartitions) == 0 {
+                               return nil, errs.ErrAllPartInUse
+                       } else {
+                               return nil, errs.ErrAllPartWaiting
+                       }
+               }
+       }
+
+       partitionKey := r.indexPartitions[0]
+       r.indexPartitions = r.indexPartitions[1:]
+       if partition, ok := r.partitions[partitionKey]; !ok {
+               return nil, errs.ErrAllPartInUse
+       } else {
+               r.usedPartitions[partitionKey] = time.Now().UnixNano() / 
int64(time.Millisecond)
+               return partition, nil
+       }
+}
+
+func (r *RmtDataCache) ReleasePartition(checkDelay bool, filterConsume bool, 
confirmContext string, isConsumed bool) error {
+       partitionKey, bookedTime, err := 
util.ParseConfirmContext(confirmContext)
+       if err != nil {
+               return err
+       }
+       r.metaMu.Lock()
+       defer r.metaMu.Unlock()
+
+       if partition, ok := r.partitions[partitionKey]; !ok {
+               delete(r.usedPartitions, partitionKey)
+               r.removeFromIndexPartitions(partitionKey)
+               return fmt.Errorf("not found the partition in Consume Partition 
set")
+       } else {
+               if t, ok := r.usedPartitions[partitionKey]; !ok {
+                       r.removeFromIndexPartitions(partitionKey)
+                       r.indexPartitions = append(r.indexPartitions, 
partitionKey)
+               } else {
+                       if t == bookedTime {
+                               delete(r.usedPartitions, partitionKey)
+                               r.removeFromIndexPartitions(partitionKey)
+                               delay := int64(0)
+                               if checkDelay {
+                                       delay = 
partition.ProcConsumeResult(r.defHandler, r.groupHandler, filterConsume, 
isConsumed)
+                               }
+                               if delay > 10 {
+                                       r.partitionTimeouts[partitionKey] = 
time.AfterFunc(time.Duration(delay)*time.Millisecond, func() {
+                                               
r.resetIdlePartition(partitionKey, true)
+                                       })
+                               } else {
+                                       r.indexPartitions = 
append(r.indexPartitions, partitionKey)
+                               }
+                       } else {
+                               return fmt.Errorf("illegal confirmContext 
content: context not equal")
+                       }
+               }
+       }
+       return nil
+}
+
 func (r *RmtDataCache) removeFromIndexPartitions(partitionKey string) {
        pos := 0
        for i, p := range r.indexPartitions {
@@ -344,3 +457,19 @@ func (r *RmtDataCache) 
removeFromIndexPartitions(partitionKey string) {
        }
        r.indexPartitions = append(r.indexPartitions[:pos], 
r.indexPartitions[pos+1:]...)
 }
+
+func (r *RmtDataCache) BookPartitionInfo(partitionKey string, currOffset 
int64) {
+       if currOffset >= 0 {
+               r.dataBookMu.Lock()
+               defer r.dataBookMu.Unlock()
+               r.partitionOffset[partitionKey] = currOffset
+       }
+}
+
+func (r *RmtDataCache) BookConsumeData(partitionKey string, data 
*metadata.ConsumeData) {
+       r.metaMu.Lock()
+       defer r.metaMu.Unlock()
+       if partition, ok := r.partitions[partitionKey]; ok {
+               partition.BookConsumeData(data)
+       }
+}
diff --git a/tubemq-client-twins/tubemq-client-go/util/util.go 
b/tubemq-client-twins/tubemq-client-go/util/util.go
index 1157d39..e3b74f8 100644
--- a/tubemq-client-twins/tubemq-client-go/util/util.go
+++ b/tubemq-client-twins/tubemq-client-go/util/util.go
@@ -19,7 +19,10 @@
 package util
 
 import (
+       "fmt"
        "net"
+       "strconv"
+       "strings"
 )
 
 // InValidValue of TubeMQ config.
@@ -72,3 +75,55 @@ func GenBrokerAuthenticateToken(username string, password 
string) string {
 func GenMasterAuthenticateToken(username string, password string) string {
        return ""
 }
+
+// ParseConfirmContext parses the confirmcontext to partition key and 
bookedTime.
+func ParseConfirmContext(confirmContext string) (string, int64, error) {
+       res := strings.Split(confirmContext, "@")
+       if len(res) == 0 {
+               return "", 0, fmt.Errorf("illegal confirmContext content: 
unregular value format")
+       }
+       partitionKey := res[0]
+       bookedTime, err := strconv.ParseInt(res[1], 10, 64)
+       if err != nil {
+               return "", 0, err
+       }
+       return partitionKey, bookedTime, nil
+}
+
+// SplitToMap split the given string by the two step strings to map.
+func SplitToMap(source string, step1 string, step2 string) map[string]string {
+       pos1 := 0
+       pos2 := strings.Index(source, step1)
+       pos3 := 0
+       m := make(map[string]string)
+       for pos2 != -1 {
+               itemStr := strings.TrimSpace(source[pos1 : pos2-pos1])
+               if len(itemStr) == 0 {
+                       continue
+               }
+               pos1 = pos2 + len(step1)
+               pos2 = strings.Index(source[pos1:], step1)
+               pos3 = strings.Index(itemStr, step2)
+               if pos3 == -1 {
+                       continue
+               }
+               key := strings.TrimSpace(itemStr[:pos3])
+               val := strings.TrimSpace(itemStr[pos3+len(step2):])
+               if len(key) == 0 {
+                       continue
+               }
+               m[key] = val
+       }
+       if pos1 != len(source) {
+               itemStr := strings.TrimSpace(source[pos1:])
+               pos3 = strings.Index(itemStr, step2)
+               if pos3 != -1 {
+                       key := strings.TrimSpace(itemStr[:pos3])
+                       val := strings.TrimSpace(itemStr[pos3+len(step2):])
+                       if len(key) > 0 {
+                               m[key] = val
+                       }
+               }
+       }
+       return m
+}

Reply via email to