This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new 7ca091d  [INLONG-1529]Go SDK should reset heartbeat if register to 
master successfully (#1530)
7ca091d is described below

commit 7ca091df453d3d22bfbe13fc3be553d20ee296aa
Author: Zijie Lu <[email protected]>
AuthorDate: Tue Sep 7 15:37:07 2021 +0800

    [INLONG-1529]Go SDK should reset heartbeat if register to master 
successfully (#1530)
    
    Signed-off-by: Zijie Lu <[email protected]>
---
 .../tubemq-client-go/client/consumer_impl.go             |  1 +
 tubemq-client-twins/tubemq-client-go/client/heartbeat.go | 16 +++++++++++++---
 tubemq-client-twins/tubemq-client-go/client/version.go   |  2 +-
 3 files changed, 15 insertions(+), 4 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go 
b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 537aa3a..3dd9111 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -494,6 +494,7 @@ func newClient(group string) string {
                strconv.Itoa(os.Getpid()) + "-" +
                strconv.Itoa(int(time.Now().Unix()*1000)) + "-" +
                strconv.Itoa(int(atomic.AddUint64(&clientID, 1))) + "-" +
+               "go-" +
                tubeMQClientVersion
 }
 
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go 
b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index d74b320..281cd4c 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -108,14 +108,20 @@ func (h *heartbeatManager) consumerHB2Master() {
                        if rsp.GetErrCode() == errs.RetErrHBNoNode || 
strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 {
                                log.Warnf("[CONSUMER] hb2master found no-node 
or standby, re-register, client=%s", h.consumer.clientID)
                                address := h.consumer.master.Address
-                               go h.consumer.register2Master(rsp.GetErrCode() 
!= errs.RetErrHBNoNode)
+                               go func() {
+                                       err := 
h.consumer.register2Master(rsp.GetErrCode() != errs.RetErrHBNoNode)
+                                       if err != nil {
+                                               return
+                                       }
+                                       h.resetMasterHeartbeat()
+                               }()
                                if rsp.GetErrCode() != errs.RetErrHBNoNode {
+                                       h.mu.Lock()
+                                       defer h.mu.Unlock()
                                        hm := h.heartbeats[address]
                                        hm.numConnections--
                                        if hm.numConnections == 0 {
-                                               h.mu.Lock()
                                                delete(h.heartbeats, address)
-                                               h.mu.Unlock()
                                        }
                                        return
                                }
@@ -126,6 +132,10 @@ func (h *heartbeatManager) consumerHB2Master() {
        }
        h.consumer.masterHBRetry = 0
        h.processHBResponseM2C(rsp)
+       h.resetMasterHeartbeat()
+}
+
+func (h *heartbeatManager) resetMasterHeartbeat() {
        h.mu.Lock()
        defer h.mu.Unlock()
        hm := h.heartbeats[h.consumer.master.Address]
diff --git a/tubemq-client-twins/tubemq-client-go/client/version.go 
b/tubemq-client-twins/tubemq-client-go/client/version.go
index 44de222..fad542e 100644
--- a/tubemq-client-twins/tubemq-client-go/client/version.go
+++ b/tubemq-client-twins/tubemq-client-go/client/version.go
@@ -18,5 +18,5 @@
 package client
 
 const (
-       tubeMQClientVersion = "go_0.1.0"
+       tubeMQClientVersion = "0.1.0"
 )

Reply via email to