This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit d5ea12531f85625bf7412ab417e5e2e536b055e1
Author: Zijie Lu <[email protected]>
AuthorDate: Fri Jun 4 18:42:04 2021 +0800

    Add trim and use slice
    
    Signed-off-by: Zijie Lu <[email protected]>
---
 .../tubemq-client-go/metadata/partition.go            |  8 ++++----
 .../tubemq-client-go/metadata/subcribe_info.go        |  8 ++++----
 tubemq-client-twins/tubemq-client-go/remote/remote.go | 19 +++++++++++++++----
 3 files changed, 23 insertions(+), 12 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go 
b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
index fbe1086..3ce1ca5 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/partition.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
@@ -39,16 +39,16 @@ func NewPartition(partition string) (*Partition, error) {
        var err error
        pos := strings.Index(partition, "#")
        if pos != -1 {
-               broker := partition[:pos]
+               broker := strings.TrimSpace(partition[:pos])
                b, err = NewNode(true, broker)
                if err != nil {
                        return nil, err
                }
-               p := partition[pos+1:]
+               p := strings.TrimSpace(partition[pos+1:])
                pos = strings.Index(p, ":")
                if pos != -1 {
-                       topic = p[0:pos]
-                       partitionID, err = strconv.Atoi(p[pos+1:])
+                       topic = strings.TrimSpace(p[0:pos])
+                       partitionID, err = 
strconv.Atoi(strings.TrimSpace(p[pos+1:]))
                        if err != nil {
                                return nil, err
                        }
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go 
b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
index 90ae3ef..932659a 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
@@ -58,15 +58,15 @@ func NewSubscribeInfo(subscribeInfo string) 
(*SubscribeInfo, error) {
        var err error
        pos := strings.Index(subscribeInfo, "#")
        if pos != -1 {
-               consumerInfo := subscribeInfo[:pos]
-               partitionInfo := subscribeInfo[pos+1:]
+               consumerInfo := strings.TrimSpace(subscribeInfo[:pos])
+               partitionInfo := strings.TrimSpace(subscribeInfo[pos+1:])
                partition, err = NewPartition(partitionInfo)
                if err != nil {
                        return nil, err
                }
                pos = strings.Index(consumerInfo, "@")
-               consumerID = consumerInfo[:pos]
-               group = consumerInfo[pos+1:]
+               consumerID = strings.TrimSpace(consumerInfo[:pos])
+               group = strings.TrimSpace(consumerInfo[pos+1:])
        }
        return &SubscribeInfo{
                group:      group,
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go 
b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 1d82c89..355e09b 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -44,7 +44,7 @@ type RmtDataCache struct {
        qryPriorityID      int32
        partitions         map[string]*metadata.Partition
        usedPartitions     map[string]int64
-       indexPartitions    map[string]bool
+       indexPartitions    []string
        partitionTimeouts  map[string]*time.Timer
        topicPartitions    map[string]map[string]bool
        partitionRegBooked map[string]bool
@@ -61,7 +61,7 @@ func NewRmtDataCache() *RmtDataCache {
                brokerPartitions:   make(map[*metadata.Node]map[string]bool),
                partitions:         make(map[string]*metadata.Partition),
                usedPartitions:     make(map[string]int64),
-               indexPartitions:    make(map[string]bool),
+               indexPartitions:    make([]string, 0, 0),
                partitionTimeouts:  make(map[string]*time.Timer),
                topicPartitions:    make(map[string]map[string]bool),
                partitionRegBooked: make(map[string]bool),
@@ -240,10 +240,10 @@ func (r *RmtDataCache) resetIdlePartition(partitionKey 
string, reuse bool) {
                timer.Stop()
                delete(r.partitionTimeouts, partitionKey)
        }
-       delete(r.indexPartitions, partitionKey)
+       r.removeFromIndexPartitions(partitionKey)
        if reuse {
                if _, ok := r.partitions[partitionKey]; ok {
-                       r.indexPartitions[partitionKey] = true
+                       r.indexPartitions = append(r.indexPartitions, 
partitionKey)
                }
        }
 }
@@ -342,3 +342,14 @@ func (r *RmtDataCache) IsFirstRegister(partitionKey 
string) bool {
        }
        return r.partitionRegBooked[partitionKey]
 }
+
+func (r *RmtDataCache) removeFromIndexPartitions(partitionKey string) {
+       pos := 0
+       for i, p := range r.indexPartitions {
+               if p == partitionKey {
+                       pos = i
+                       break
+               }
+       }
+       r.indexPartitions = append(r.indexPartitions[:pos], 
r.indexPartitions[pos+1:]...)
+}

Reply via email to