This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch INLONG-25 in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit d5ea12531f85625bf7412ab417e5e2e536b055e1 Author: Zijie Lu <[email protected]> AuthorDate: Fri Jun 4 18:42:04 2021 +0800 Add trim and use slice Signed-off-by: Zijie Lu <[email protected]> --- .../tubemq-client-go/metadata/partition.go | 8 ++++---- .../tubemq-client-go/metadata/subcribe_info.go | 8 ++++---- tubemq-client-twins/tubemq-client-go/remote/remote.go | 19 +++++++++++++++---- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go b/tubemq-client-twins/tubemq-client-go/metadata/partition.go index fbe1086..3ce1ca5 100644 --- a/tubemq-client-twins/tubemq-client-go/metadata/partition.go +++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go @@ -39,16 +39,16 @@ func NewPartition(partition string) (*Partition, error) { var err error pos := strings.Index(partition, "#") if pos != -1 { - broker := partition[:pos] + broker := strings.TrimSpace(partition[:pos]) b, err = NewNode(true, broker) if err != nil { return nil, err } - p := partition[pos+1:] + p := strings.TrimSpace(partition[pos+1:]) pos = strings.Index(p, ":") if pos != -1 { - topic = p[0:pos] - partitionID, err = strconv.Atoi(p[pos+1:]) + topic = strings.TrimSpace(p[0:pos]) + partitionID, err = strconv.Atoi(strings.TrimSpace(p[pos+1:])) if err != nil { return nil, err } diff --git a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go index 90ae3ef..932659a 100644 --- a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go +++ b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go @@ -58,15 +58,15 @@ func NewSubscribeInfo(subscribeInfo string) (*SubscribeInfo, error) { var err error pos := strings.Index(subscribeInfo, "#") if pos != -1 { - consumerInfo := subscribeInfo[:pos] - partitionInfo := subscribeInfo[pos+1:] + consumerInfo := strings.TrimSpace(subscribeInfo[:pos]) + partitionInfo := strings.TrimSpace(subscribeInfo[pos+1:]) partition, err = NewPartition(partitionInfo) if err != nil { return nil, err } pos = strings.Index(consumerInfo, "@") - consumerID = consumerInfo[:pos] - group = consumerInfo[pos+1:] + consumerID = strings.TrimSpace(consumerInfo[:pos]) + group = strings.TrimSpace(consumerInfo[pos+1:]) } return &SubscribeInfo{ group: group, diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go index 1d82c89..355e09b 100644 --- a/tubemq-client-twins/tubemq-client-go/remote/remote.go +++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go @@ -44,7 +44,7 @@ type RmtDataCache struct { qryPriorityID int32 partitions map[string]*metadata.Partition usedPartitions map[string]int64 - indexPartitions map[string]bool + indexPartitions []string partitionTimeouts map[string]*time.Timer topicPartitions map[string]map[string]bool partitionRegBooked map[string]bool @@ -61,7 +61,7 @@ func NewRmtDataCache() *RmtDataCache { brokerPartitions: make(map[*metadata.Node]map[string]bool), partitions: make(map[string]*metadata.Partition), usedPartitions: make(map[string]int64), - indexPartitions: make(map[string]bool), + indexPartitions: make([]string, 0, 0), partitionTimeouts: make(map[string]*time.Timer), topicPartitions: make(map[string]map[string]bool), partitionRegBooked: make(map[string]bool), @@ -240,10 +240,10 @@ func (r *RmtDataCache) resetIdlePartition(partitionKey string, reuse bool) { timer.Stop() delete(r.partitionTimeouts, partitionKey) } - delete(r.indexPartitions, partitionKey) + r.removeFromIndexPartitions(partitionKey) if reuse { if _, ok := r.partitions[partitionKey]; ok { - r.indexPartitions[partitionKey] = true + r.indexPartitions = append(r.indexPartitions, partitionKey) } } } @@ -342,3 +342,14 @@ func (r *RmtDataCache) IsFirstRegister(partitionKey string) bool { } return r.partitionRegBooked[partitionKey] } + +func (r *RmtDataCache) removeFromIndexPartitions(partitionKey string) { + pos := 0 + for i, p := range r.indexPartitions { + if p == partitionKey { + pos = i + break + } + } + r.indexPartitions = append(r.indexPartitions[:pos], r.indexPartitions[pos+1:]...) +}
