This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new cadb730 Fix possible slot range loss in the importing API (#189)
cadb730 is described below
commit cadb730714b898d43a3427c2c66da6a5e702717f
Author: hulk <[email protected]>
AuthorDate: Sun Jun 23 00:04:09 2024 +0800
Fix possible slot range loss in the importing API (#189)
---
store/cluster.go | 23 ++++++++++++++---------
store/cluster_node_test.go | 13 +++++++++++--
2 files changed, 25 insertions(+), 11 deletions(-)
diff --git a/store/cluster.go b/store/cluster.go
index fe8240e..9173611 100644
--- a/store/cluster.go
+++ b/store/cluster.go
@@ -17,6 +17,7 @@
* under the License.
*
*/
+
package store
import (
@@ -218,10 +219,10 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context,
slot int, targetShardId
return nil
}
-func (c *Cluster) SetSlot(ctx context.Context, slot int, targetNodeID string)
error {
- version := c.Version.Inc()
- for i := 0; i < len(c.Shards); i++ {
- for _, node := range c.Shards[i].Nodes {
+func (cluster *Cluster) SetSlot(ctx context.Context, slot int, targetNodeID
string) error {
+ version := cluster.Version.Inc()
+ for i := 0; i < len(cluster.Shards); i++ {
+ for _, node := range cluster.Shards[i].Nodes {
clusterNode, ok := node.(*ClusterNode)
if !ok {
continue
@@ -274,13 +275,17 @@ func ParseCluster(clusterStr string) (*Cluster, error) {
if len(fields) < 9 {
return nil, fmt.Errorf("master node element
less 9, node info[%q]", nodeString)
}
- slots, err := ParseSlotRange(fields[8])
- if err != nil {
- return nil, fmt.Errorf("master node parser slot
error, node info[%q]", nodeString)
- }
shard := NewShard()
shard.Nodes = append(shard.Nodes, node)
- shard.SlotRanges = append(shard.SlotRanges, *slots)
+
+ // remain fields are slot ranges
+ for i := 8; i < len(fields); i++ {
+ slotRange, err := ParseSlotRange(fields[i])
+ if err != nil {
+ return nil, fmt.Errorf("parse slots
error for node[%s]: %w", nodeString, err)
+ }
+ shard.SlotRanges = append(shard.SlotRanges,
*slotRange)
+ }
shards = append(shards, shard)
} else if node.role == RoleSlave {
slaveNodes[fields[3]] = append(slaveNodes[fields[3]],
node)
diff --git a/store/cluster_node_test.go b/store/cluster_node_test.go
index e308651..ce08598 100644
--- a/store/cluster_node_test.go
+++ b/store/cluster_node_test.go
@@ -45,8 +45,12 @@ func TestClusterNode(t *testing.T) {
require.NoError(t, redisCli.Do(ctx, "CLUSTER", "RESET").Err())
// set the cluster topology
- cluster := &Cluster{Shards: Shards{
- {Nodes: []Node{node}, SlotRanges: []SlotRange{{Start:
0, Stop: 16383}}},
+ cluster := &Cluster{Shards: Shards{{
+ Nodes: []Node{node}, SlotRanges: []SlotRange{
+ {Start: 0, Stop: 100},
+ {Start: 102, Stop: 300},
+ {Start: 302, Stop: 16383},
+ }},
}}
cluster.Version.Store(1)
require.NoError(t, node.SyncClusterInfo(ctx, cluster))
@@ -70,6 +74,11 @@ func TestClusterNode(t *testing.T) {
require.EqualValues(t, 1, clusterNodes.Version.Load())
require.Len(t, clusterNodes.Shards, 1)
require.Len(t, clusterNodes.Shards[0].Nodes, 1)
+ require.EqualValues(t, []SlotRange{
+ {Start: 0, Stop: 100},
+ {Start: 102, Stop: 300},
+ {Start: 302, Stop: 16383},
+ }, clusterNodes.Shards[0].SlotRanges)
require.EqualValues(t, defaultNodeAddr,
clusterNodes.Shards[0].Nodes[0].Addr())
require.EqualValues(t, node.ID(),
clusterNodes.Shards[0].Nodes[0].ID())
})