This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new 08286ca Fix empty slot range is not allowed when importing a cluster
(#212)
08286ca is described below
commit 08286ca002306feb5b4b30936c30b1c6c3fae9b9
Author: hulk <[email protected]>
AuthorDate: Wed Oct 16 11:41:13 2024 +0800
Fix empty slot range is not allowed when importing a cluster (#212)
For a new added shard and its slot range will be empty, but we required
at least one slot range while parsing the cluster nodes string.
---
store/cluster.go | 4 ----
store/cluster_node_test.go | 37 +++++++++++++++++++++++--------------
store/cluster_shard.go | 5 +++++
store/cluster_shard_test.go | 5 ++++-
4 files changed, 32 insertions(+), 19 deletions(-)
diff --git a/store/cluster.go b/store/cluster.go
index f42c402..d4beb06 100644
--- a/store/cluster.go
+++ b/store/cluster.go
@@ -272,12 +272,8 @@ func ParseCluster(clusterStr string) (*Cluster, error) {
}
if node.role == RoleMaster {
- if len(fields) < 9 {
- return nil, fmt.Errorf("master node element
less 9, node info[%q]", nodeString)
- }
shard := NewShard()
shard.Nodes = append(shard.Nodes, node)
-
// remain fields are slot ranges
for i := 8; i < len(fields); i++ {
slotRange, err := ParseSlotRange(fields[i])
diff --git a/store/cluster_node_test.go b/store/cluster_node_test.go
index ce08598..41bf55f 100644
--- a/store/cluster_node_test.go
+++ b/store/cluster_node_test.go
@@ -29,9 +29,11 @@ import (
func TestClusterNode(t *testing.T) {
ctx := context.Background()
- defaultNodeAddr := "127.0.0.1:7770"
- node := NewClusterNode(defaultNodeAddr, "")
- redisCli := node.GetClient()
+ nodeAddr0 := "127.0.0.1:7770"
+ nodeAddr1 := "127.0.0.1:7771"
+ node0 := NewClusterNode(nodeAddr0, "")
+ node1 := NewClusterNode(nodeAddr1, "")
+ redisCli := node0.GetClient()
defer func() {
require.NoError(t, redisCli.FlushAll(ctx).Err())
@@ -40,47 +42,54 @@ func TestClusterNode(t *testing.T) {
}()
t.Run("Check the cluster mode", func(t *testing.T) {
- _, err := node.CheckClusterMode(ctx)
+ _, err := node0.CheckClusterMode(ctx)
require.NoError(t, err)
require.NoError(t, redisCli.Do(ctx, "CLUSTER", "RESET").Err())
// set the cluster topology
- cluster := &Cluster{Shards: Shards{{
- Nodes: []Node{node}, SlotRanges: []SlotRange{
+ cluster := &Cluster{Shards: Shards{
+ {Nodes: []Node{node0}, SlotRanges: []SlotRange{
{Start: 0, Stop: 100},
{Start: 102, Stop: 300},
{Start: 302, Stop: 16383},
}},
+ {Nodes: []Node{node1}, SlotRanges: []SlotRange{}},
}}
+
cluster.Version.Store(1)
- require.NoError(t, node.SyncClusterInfo(ctx, cluster))
- clusterInfo, err := node.GetClusterInfo(ctx)
+ require.NoError(t, node0.SyncClusterInfo(ctx, cluster))
+ clusterInfo, err := node0.GetClusterInfo(ctx)
require.NoError(t, err)
require.EqualValues(t, 1, clusterInfo.CurrentEpoch)
})
- t.Run("Check the cluster node info", func(t *testing.T) {
+ t.Run("Check the cluster node0 info", func(t *testing.T) {
require.NoError(t, redisCli.Set(ctx, "foo", "bar", 0).Err())
- info, err := node.GetClusterNodeInfo(ctx)
+ info, err := node0.GetClusterNodeInfo(ctx)
require.NoError(t, err)
require.True(t, info.Sequence > 0)
})
t.Run("Parse the cluster node info", func(t *testing.T) {
- clusterNodesStr, err := node.GetClusterNodesString(ctx)
+ clusterNodesStr, err := node0.GetClusterNodesString(ctx)
require.NoError(t, err)
clusterNodes, err := ParseCluster(clusterNodesStr)
require.NoError(t, err)
require.EqualValues(t, 1, clusterNodes.Version.Load())
- require.Len(t, clusterNodes.Shards, 1)
+ require.Len(t, clusterNodes.Shards, 2)
require.Len(t, clusterNodes.Shards[0].Nodes, 1)
require.EqualValues(t, []SlotRange{
{Start: 0, Stop: 100},
{Start: 102, Stop: 300},
{Start: 302, Stop: 16383},
}, clusterNodes.Shards[0].SlotRanges)
- require.EqualValues(t, defaultNodeAddr,
clusterNodes.Shards[0].Nodes[0].Addr())
- require.EqualValues(t, node.ID(),
clusterNodes.Shards[0].Nodes[0].ID())
+ require.EqualValues(t, nodeAddr0,
clusterNodes.Shards[0].Nodes[0].Addr())
+ require.EqualValues(t, node0.ID(),
clusterNodes.Shards[0].Nodes[0].ID())
+
+ // Ensure empty slot range is allowed in the cluster node info
+ require.Len(t, clusterNodes.Shards[1].Nodes, 1)
+ require.EqualValues(t, []SlotRange{},
clusterNodes.Shards[1].SlotRanges)
+ require.EqualValues(t, nodeAddr1,
clusterNodes.Shards[1].Nodes[0].Addr())
})
}
diff --git a/store/cluster_shard.go b/store/cluster_shard.go
index eb7164c..e15333b 100644
--- a/store/cluster_shard.go
+++ b/store/cluster_shard.go
@@ -49,6 +49,11 @@ func (s Shards) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s Shards) Less(i, j int) bool {
+ if len(s[i].SlotRanges) == 0 {
+ return false
+ } else if len(s[j].SlotRanges) == 0 {
+ return true
+ }
return s[i].SlotRanges[0].Start < s[j].SlotRanges[0].Start
}
diff --git a/store/cluster_shard_test.go b/store/cluster_shard_test.go
index b65cae2..43ea157 100644
--- a/store/cluster_shard_test.go
+++ b/store/cluster_shard_test.go
@@ -43,11 +43,14 @@ func TestShard_Sort(t *testing.T) {
shard1.SlotRanges = []SlotRange{{Start: 0, Stop: 400}}
shard2 := NewShard()
shard2.SlotRanges = []SlotRange{{Start: 101, Stop: 500}}
- shards := Shards{shard0, shard1, shard2}
+ shard3 := NewShard()
+ shard3.SlotRanges = []SlotRange{}
+ shards := Shards{shard0, shard1, shard2, shard3}
sort.Sort(shards)
require.EqualValues(t, 0, shards[0].SlotRanges[0].Start)
require.EqualValues(t, 101, shards[1].SlotRanges[0].Start)
require.EqualValues(t, 201, shards[2].SlotRanges[0].Start)
+ require.EqualValues(t, 0, len(shards[3].SlotRanges))
}
func TestShard_IsServicing(t *testing.T) {