This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new eb9bd8d  [INLONG-1531]Go SDK should init the flow control item of the 
partition (#1532)
eb9bd8d is described below

commit eb9bd8da85b88d61d92f356ec972b9fa8bf9544d
Author: Zijie Lu <[email protected]>
AuthorDate: Tue Sep 7 16:52:21 2021 +0800

    [INLONG-1531]Go SDK should init the flow control item of the partition 
(#1532)
    
    Signed-off-by: Zijie Lu <[email protected]>
---
 tubemq-client-twins/tubemq-client-go/flowctrl/handler.go   | 8 ++++----
 tubemq-client-twins/tubemq-client-go/flowctrl/item.go      | 9 ++++++++-
 tubemq-client-twins/tubemq-client-go/metadata/partition.go | 3 +++
 3 files changed, 15 insertions(+), 5 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go 
b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
index eb05482..8f820b4 100644
--- a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
+++ b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
@@ -148,7 +148,7 @@ func (h *RuleHandler) initStatisticData() {
                                if rule.tp != 3 {
                                        continue
                                }
-                               h.flowCtrlItem.SetTp(3)
+                               h.flowCtrlItem.SetTp(RequestFrequencyControl)
                                
h.flowCtrlItem.SetDataSizeLimit(rule.dataSizeLimit)
                                h.flowCtrlItem.SetFreqLimit(rule.freqMsLimit)
                                h.flowCtrlItem.SetZeroCnt(rule.zeroCnt)
@@ -324,7 +324,7 @@ func parseDataLimit(rules []interface{}) ([]*Item, error) {
                        return nil, fmt.Errorf("freqInMs value must over than 
equal or bigger than 200 in index(%d) of data limit rule", i)
                }
                item := NewItem()
-               item.SetTp(0)
+               item.SetTp(CurrentLimit)
                item.SetStartTime(start)
                item.SetEndTime(end)
                item.SetDatadlt(datadlt)
@@ -378,7 +378,7 @@ func parseLowFetchLimit(rules []interface{}) ([]*Item, 
error) {
                        }
                }
                item := NewItem()
-               item.SetTp(3)
+               item.SetTp(RequestFrequencyControl)
                item.SetDataSizeLimit(normFreqMs)
                item.SetFreqLimit(filterFreqMs)
                item.SetZeroCnt(minFilteFreqMs)
@@ -403,7 +403,7 @@ func parseFreqLimit(rules []interface{}) ([]*Item, error) {
                        return nil, err
                }
                item := NewItem()
-               item.SetTp(1)
+               item.SetTp(FrequencyLimit)
                item.SetZeroCnt(zeroCnt)
                item.SetFreqLimit(freqLimit)
                items = append(items, item)
diff --git a/tubemq-client-twins/tubemq-client-go/flowctrl/item.go 
b/tubemq-client-twins/tubemq-client-go/flowctrl/item.go
index 9983321..63c9162 100644
--- a/tubemq-client-twins/tubemq-client-go/flowctrl/item.go
+++ b/tubemq-client-twins/tubemq-client-go/flowctrl/item.go
@@ -21,6 +21,13 @@ import (
        
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
 )
 
+const (
+       CurrentLimit = iota
+       FrequencyLimit
+       SSDTransfer
+       RequestFrequencyControl
+)
+
 // Item defines a flow control item.
 type Item struct {
        tp            int32
@@ -59,7 +66,7 @@ func (i *Item) SetEndTime(endTime int64) {
        i.endTime = endTime
 }
 
-// SetDataDlt sets the datadlt.
+// SetDatadlt sets the datadlt.
 func (i *Item) SetDatadlt(datadlt int64) {
        i.datadlt = datadlt
 }
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go 
b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
index 8314502..c256f70 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/partition.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
@@ -94,6 +94,8 @@ func NewPartition(partition string) (*Partition, error) {
                return nil, err
        }
        partitionKey := strconv.Itoa(int(b.id)) + ":" + topic + ":" + 
strconv.Itoa(partitionID)
+       item := flowctrl.NewItem()
+       item.SetTp(flowctrl.RequestFrequencyControl)
        return &Partition{
                topic:        topic,
                broker:       b,
@@ -103,6 +105,7 @@ func NewPartition(partition string) (*Partition, error) {
                consumeData: &ConsumeData{
                        curDataDlt: util.InvalidValue,
                },
+               freqCtrl: item,
        }, nil
 }
 

Reply via email to