This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new f78fc87  [INLONG-1535]Go SDK should be closed before stopping the 
event processing goroutine (#1536)
f78fc87 is described below

commit f78fc87e1bf9561069bc6e66a570578806e616db
Author: Zijie Lu <[email protected]>
AuthorDate: Sat Sep 11 18:28:28 2021 +0800

    [INLONG-1535]Go SDK should be closed before stopping the event processing 
goroutine (#1536)
    
    Signed-off-by: Zijie Lu <[email protected]>
---
 tubemq-client-twins/tubemq-client-go/client/consumer_impl.go | 10 +++++-----
 tubemq-client-twins/tubemq-client-go/flowctrl/handler.go     | 11 +++++------
 tubemq-client-twins/tubemq-client-go/remote/remote.go        |  2 +-
 3 files changed, 11 insertions(+), 12 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go 
b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 3dd9111..8f48513 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -182,7 +182,7 @@ func (c *consumer) processRegisterResponseM2C(rsp 
*protocol.RegisterResponseM2C)
        if rsp.GetNotAllocated() {
                c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
        }
-       if rsp.GetDefFlowCheckId() != 0 || rsp.GetDefFlowCheckId() != 0 {
+       if rsp.GetDefFlowCheckId() != 0 || rsp.GetGroupFlowCheckId() != 0 {
                if rsp.GetDefFlowCheckId() != 0 {
                        
c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), 
rsp.GetDefFlowControlInfo())
                }
@@ -384,10 +384,10 @@ func (c *consumer) processRebalanceEvent() {
                        }
                case <-c.done:
                        log.Infof("[CONSUMER]Rebalance done, client=%s", 
c.clientID)
-                       break
+                       log.Info("[CONSUMER] Rebalance event Handler stopped!")
+                       return
                }
        }
-       log.Info("[CONSUMER] Rebalance event Handler stopped!")
 }
 
 func (c *consumer) disconnect2Broker(event *metadata.ConsumerEvent) {
@@ -542,10 +542,10 @@ func (c *consumer) getConsumeReadStatus(isFirstReg bool) 
int32 {
        if isFirstReg {
                if c.config.Consumer.ConsumePosition == 0 {
                        readStatus = consumeStatusFromMax
-                       log.Infof("[Consumer From Max Offset], client=", 
c.clientID)
+                       log.Infof("[Consumer From Max Offset], client=%s", 
c.clientID)
                } else if c.config.Consumer.ConsumePosition > 0 {
                        readStatus = consumeStatusFromMaxAlways
-                       log.Infof("[Consumer From Max Offset Always], client=", 
c.clientID)
+                       log.Infof("[Consumer From Max Offset Always], 
client=%s", c.clientID)
                }
        }
        return int32(readStatus)
diff --git a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go 
b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
index 8f820b4..a454e60 100644
--- a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
+++ b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
@@ -79,10 +79,10 @@ func (h *RuleHandler) SetQryPriorityID(qryPriorityID int64) 
{
 
 // UpdateDefFlowCtrlInfo updates the flow control information.
 func (h *RuleHandler) UpdateDefFlowCtrlInfo(isDefault bool, qrypriorityID 
int64, flowCtrlID int64, info string) error {
-       if atomic.LoadInt64(&h.flowCtrlID) == flowCtrlID {
+       curFlowCtrlID := atomic.LoadInt64(&h.flowCtrlID)
+       if curFlowCtrlID == flowCtrlID {
                return nil
        }
-       //curFlowCtrlID := atomic.LoadInt64(&h.flowCtrlID)
        var flowCtrlItems map[int32][]*Item
        var err error
        if len(info) > 0 {
@@ -94,6 +94,7 @@ func (h *RuleHandler) UpdateDefFlowCtrlInfo(isDefault bool, 
qrypriorityID int64,
        h.configMu.Lock()
        defer h.configMu.Unlock()
        h.clearStatisticData()
+       atomic.StoreInt64(&h.flowCtrlID, flowCtrlID)
        atomic.StoreInt64(&h.qrypriorityID, qrypriorityID)
        if len(flowCtrlItems) == 0 {
                h.flowCtrlRules = make(map[int32][]*Item)
@@ -105,11 +106,9 @@ func (h *RuleHandler) UpdateDefFlowCtrlInfo(isDefault 
bool, qrypriorityID int64,
        }
        h.lastUpdate = time.Now().UnixNano() / int64(time.Millisecond)
        if isDefault {
-               log.Infof("[Flow Ctrl] Default FlowCtrl's flow ctrl id from %d 
to %d", atomic.LoadInt64(&h.flowCtrlID),
-                       flowCtrlID)
+               log.Infof("[Flow Ctrl] Default FlowCtrl's flow ctrl id from %d 
to %d", curFlowCtrlID, flowCtrlID)
        } else {
-               log.Infof("[Flow Ctrl] Group FlowCtrl's flow ctrl id from %d to 
%d", atomic.LoadInt64(&h.flowCtrlID),
-                       flowCtrlID)
+               log.Infof("[Flow Ctrl] Group FlowCtrl's flow ctrl id from %d to 
%d", curFlowCtrlID, flowCtrlID)
        }
        return nil
 }
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go 
b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 5b26b0e..01d96ca 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -167,7 +167,7 @@ func (r *RmtDataCache) UpdateDefFlowCtrlInfo(flowCtrlID 
int64, flowCtrlInfo stri
 
 // UpdateGroupFlowCtrlInfo updates the groupFlowCtrlInfo.
 func (r *RmtDataCache) UpdateGroupFlowCtrlInfo(qryPriorityID int32, flowCtrlID 
int64, flowCtrlInfo string) {
-       if flowCtrlID != r.defHandler.GetFlowCtrID() {
+       if flowCtrlID != r.groupHandler.GetFlowCtrID() {
                r.groupHandler.UpdateDefFlowCtrlInfo(false, 
int64(qryPriorityID), flowCtrlID, flowCtrlInfo)
        }
        if int64(qryPriorityID) != r.groupHandler.GetQryPriorityID() {

Reply via email to