This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new 11353eb  [INLONG-1525]Fix bug which will cause Go SDK fail to parse 
SubscribeInfo (#1526)
11353eb is described below

commit 11353eb2856c26c92e49c5d9837bf6bd0020444c
Author: Zijie Lu <[email protected]>
AuthorDate: Mon Sep 6 14:03:28 2021 +0800

    [INLONG-1525]Fix bug which will cause Go SDK fail to parse SubscribeInfo 
(#1526)
    
    Signed-off-by: Zijie Lu <[email protected]>
---
 .../tubemq-client-go/client/consumer_impl.go                | 13 +++++++++----
 tubemq-client-twins/tubemq-client-go/client/heartbeat.go    |  4 ++++
 .../tubemq-client-go/metadata/subscribe_info.go             |  2 +-
 tubemq-client-twins/tubemq-client-go/remote/remote.go       |  2 +-
 tubemq-client-twins/tubemq-client-go/util/util.go           |  5 +++--
 5 files changed, 18 insertions(+), 8 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go 
b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index e42319b..a7fb27c 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -162,6 +162,10 @@ func (c *consumer) sendRegRequest2Master() 
(*protocol.RegisterResponseM2C, error
        node := &metadata.Node{}
        node.SetHost(util.GetLocalHost())
        node.SetAddress(c.master.Address)
+       auth := &protocol.AuthenticateInfo{}
+       if c.needGenMasterCertificateInfo(true) {
+               util.GenMasterAuthenticateToken(auth, 
c.config.Net.Auth.UserName, c.config.Net.Auth.Password)
+       }
        m.SetNode(node)
        sub := &metadata.SubscribeInfo{}
        sub.SetGroup(c.config.Consumer.Group)
@@ -512,7 +516,7 @@ func (c *consumer) genBrokerAuthenticInfo(force bool) 
*protocol.AuthorizedInfo {
        return auth
 }
 
-func (c *consumer) genMasterAuthenticateToken(auth *protocol.AuthenticateInfo, 
force bool) {
+func (c *consumer) needGenMasterCertificateInfo(force bool) bool {
        needAdd := false
        if c.config.Net.Auth.Enable {
                if force {
@@ -526,6 +530,7 @@ func (c *consumer) genMasterAuthenticateToken(auth 
*protocol.AuthenticateInfo, f
                if needAdd {
                }
        }
+       return needAdd
 }
 
 func (c *consumer) getConsumeReadStatus(isFirstReg bool) int32 {
@@ -700,10 +705,10 @@ func (c *consumer) close2Master() error {
        sub := &metadata.SubscribeInfo{}
        sub.SetGroup(c.config.Consumer.Group)
        m.SetSubscribeInfo(sub)
+       mci := &protocol.MasterCertificateInfo{}
        auth := &protocol.AuthenticateInfo{}
-       c.genMasterAuthenticateToken(auth, true)
-       mci := &protocol.MasterCertificateInfo{
-               AuthInfo: auth,
+       if c.needGenMasterCertificateInfo(true) {
+               util.GenMasterAuthenticateToken(auth, 
c.config.Net.Auth.UserName, c.config.Net.Auth.Password)
        }
        c.subInfo.SetMasterCertificateInfo(mci)
        rsp, err := c.client.CloseRequestC2M(ctx, m, c.subInfo)
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go 
b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index 49d4f7c..d74b320 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -88,6 +88,10 @@ func (h *heartbeatManager) consumerHB2Master() {
        sub := &metadata.SubscribeInfo{}
        sub.SetGroup(h.consumer.config.Consumer.Group)
        m.SetSubscribeInfo(sub)
+       auth := &protocol.AuthenticateInfo{}
+       if h.consumer.needGenMasterCertificateInfo(true) {
+               util.GenMasterAuthenticateToken(auth, 
h.consumer.config.Net.Auth.UserName, h.consumer.config.Net.Auth.Password)
+       }
        h.consumer.unreportedTimes++
        if h.consumer.unreportedTimes > 
h.consumer.config.Consumer.MaxSubInfoReportInterval {
                m.SetReportTimes(true)
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/subscribe_info.go 
b/tubemq-client-twins/tubemq-client-go/metadata/subscribe_info.go
index 3670204..6544a4d 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/subscribe_info.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/subscribe_info.go
@@ -55,7 +55,7 @@ func (s *SubscribeInfo) String() string {
 // If the given is invalid, it will return error.
 // The format of subscribeInfo string: 
consumerId@group#broker_info#topic:partitionId
 func NewSubscribeInfo(subscribeInfo string) (*SubscribeInfo, error) {
-       s := strings.Split(subscribeInfo, "#")
+       s := strings.SplitN(subscribeInfo, "#", 2)
        if len(s) == 1 {
                return nil, errs.ErrInvalidSubscribeInfoString
        }
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go 
b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 6e66af9..5b26b0e 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -463,7 +463,7 @@ func (r *RmtDataCache) 
removeFromIndexPartitions(partitionKey string) {
                r.indexPartitions = nil
                return
        }
-       if pos >= len(r.indexPartitions) {
+       if pos == -1 || pos >= len(r.indexPartitions) {
                return
        }
        r.indexPartitions = append(r.indexPartitions[:pos], 
r.indexPartitions[pos+1:]...)
diff --git a/tubemq-client-twins/tubemq-client-go/util/util.go 
b/tubemq-client-twins/tubemq-client-go/util/util.go
index 156540c..4fa1def 100644
--- a/tubemq-client-twins/tubemq-client-go/util/util.go
+++ b/tubemq-client-twins/tubemq-client-go/util/util.go
@@ -23,6 +23,8 @@ import (
        "net"
        "strconv"
        "strings"
+
+       
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
 )
 
 // InvalidValue defines the invalid value of TubeMQ config.
@@ -72,8 +74,7 @@ func GenBrokerAuthenticateToken(username string, password 
string) string {
 }
 
 // GenMasterAuthenticateToken generates the master authenticate token.
-func GenMasterAuthenticateToken(username string, password string) string {
-       return ""
+func GenMasterAuthenticateToken(authInfo *protocol.AuthenticateInfo, username 
string, password string) {
 }
 
 // ParseConfirmContext parses the confirm context to partition key and 
bookedTime.

Reply via email to