This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-25 by this push:
new 7ca091d [INLONG-1529]Go SDK should reset heartbeat if register to
master successfully (#1530)
7ca091d is described below
commit 7ca091df453d3d22bfbe13fc3be553d20ee296aa
Author: Zijie Lu <[email protected]>
AuthorDate: Tue Sep 7 15:37:07 2021 +0800
[INLONG-1529]Go SDK should reset heartbeat if register to master
successfully (#1530)
Signed-off-by: Zijie Lu <[email protected]>
---
.../tubemq-client-go/client/consumer_impl.go | 1 +
tubemq-client-twins/tubemq-client-go/client/heartbeat.go | 16 +++++++++++++---
tubemq-client-twins/tubemq-client-go/client/version.go | 2 +-
3 files changed, 15 insertions(+), 4 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 537aa3a..3dd9111 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -494,6 +494,7 @@ func newClient(group string) string {
strconv.Itoa(os.Getpid()) + "-" +
strconv.Itoa(int(time.Now().Unix()*1000)) + "-" +
strconv.Itoa(int(atomic.AddUint64(&clientID, 1))) + "-" +
+ "go-" +
tubeMQClientVersion
}
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index d74b320..281cd4c 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -108,14 +108,20 @@ func (h *heartbeatManager) consumerHB2Master() {
if rsp.GetErrCode() == errs.RetErrHBNoNode ||
strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 {
log.Warnf("[CONSUMER] hb2master found no-node
or standby, re-register, client=%s", h.consumer.clientID)
address := h.consumer.master.Address
- go h.consumer.register2Master(rsp.GetErrCode()
!= errs.RetErrHBNoNode)
+ go func() {
+ err :=
h.consumer.register2Master(rsp.GetErrCode() != errs.RetErrHBNoNode)
+ if err != nil {
+ return
+ }
+ h.resetMasterHeartbeat()
+ }()
if rsp.GetErrCode() != errs.RetErrHBNoNode {
+ h.mu.Lock()
+ defer h.mu.Unlock()
hm := h.heartbeats[address]
hm.numConnections--
if hm.numConnections == 0 {
- h.mu.Lock()
delete(h.heartbeats, address)
- h.mu.Unlock()
}
return
}
@@ -126,6 +132,10 @@ func (h *heartbeatManager) consumerHB2Master() {
}
h.consumer.masterHBRetry = 0
h.processHBResponseM2C(rsp)
+ h.resetMasterHeartbeat()
+}
+
+func (h *heartbeatManager) resetMasterHeartbeat() {
h.mu.Lock()
defer h.mu.Unlock()
hm := h.heartbeats[h.consumer.master.Address]
diff --git a/tubemq-client-twins/tubemq-client-go/client/version.go
b/tubemq-client-twins/tubemq-client-go/client/version.go
index 44de222..fad542e 100644
--- a/tubemq-client-twins/tubemq-client-go/client/version.go
+++ b/tubemq-client-twins/tubemq-client-go/client/version.go
@@ -18,5 +18,5 @@
package client
const (
- tubeMQClientVersion = "go_0.1.0"
+ tubeMQClientVersion = "0.1.0"
)