This is an automated email from the ASF dual-hosted git repository. hulk pushed a commit to branch unstable in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push: new a6334b3 Add support of migrating with slot range (#304) a6334b3 is described below commit a6334b3812cd68bcac4d07dd17441325a272c4d1 Author: Byron Seto <byrons...@hotmail.com> AuthorDate: Sat May 3 07:57:25 2025 -0600 Add support of migrating with slot range (#304) --- cmd/client/command/helper.go | 4 +- cmd/client/command/migrate.go | 16 +-- consts/errors.go | 30 +++-- controller/cluster.go | 21 ++- controller/cluster_test.go | 6 +- server/api/cluster.go | 6 +- server/api/cluster_test.go | 16 ++- server/api/shard_test.go | 11 +- store/cluster.go | 18 +-- store/cluster_node.go | 14 +- store/cluster_shard.go | 12 +- store/cluster_shard_test.go | 8 +- store/cluster_test.go | 17 +-- store/slot.go | 163 +++++++++++++---------- store/slot_test.go | 292 ++++++++++++++++++++++++++++++++++-------- 15 files changed, 434 insertions(+), 200 deletions(-) diff --git a/cmd/client/command/helper.go b/cmd/client/command/helper.go index 1d07677..1d395ac 100644 --- a/cmd/client/command/helper.go +++ b/cmd/client/command/helper.go @@ -51,8 +51,8 @@ func printCluster(cluster *store.Cluster) { role = strings.ToUpper(store.RoleMaster) } migratingStatus := "NO" - if shard.MigratingSlot != -1 { - migratingStatus = fmt.Sprintf("%d --> %d", shard.MigratingSlot, shard.TargetShardIndex) + if shard.MigratingSlot != nil { + migratingStatus = fmt.Sprintf("%s --> %d", shard.MigratingSlot, shard.TargetShardIndex) } columns := []string{fmt.Sprintf("%d", i), node.ID(), node.Addr(), role, migratingStatus} writer.Append(columns) diff --git a/cmd/client/command/migrate.go b/cmd/client/command/migrate.go index c8ccbac..009b416 100644 --- a/cmd/client/command/migrate.go +++ b/cmd/client/command/migrate.go @@ -26,13 +26,14 @@ import ( "strconv" "strings" + "github.com/apache/kvrocks-controller/store" "github.com/spf13/cobra" ) type MigrationOptions struct { namespace string cluster string - slot int + slot string target int slotOnly bool } @@ -69,14 +70,11 @@ func migrationPreRun(_ *cobra.Command, args []string) error { if len(args) < 2 { return fmt.Errorf("the slot number should be specified") } - slot, err := strconv.Atoi(args[1]) + _, err := store.ParseSlotRange(args[1]) if err != nil { - return fmt.Errorf("invalid slot number: %s", args[1]) + return fmt.Errorf("invalid slot number: %s, error: %w", args[1], err) } - if slot < 0 || slot > 16383 { - return errors.New("slot number should be in range [0, 16383]") - } - migrateOptions.slot = slot + migrateOptions.slot = args[1] if migrateOptions.namespace == "" { return fmt.Errorf("namespace is required, please specify with -n or --namespace") @@ -106,12 +104,12 @@ func migrateSlot(client *client, options *MigrationOptions) error { if rsp.IsError() { return errors.New(rsp.String()) } - printLine("migrate slot[%d] task is submitted successfully.", options.slot) + printLine("migrate slot[%s] task is submitted successfully.", options.slot) return nil } func init() { - MigrateCommand.Flags().IntVar(&migrateOptions.slot, "slot", -1, "The slot to migrate") + MigrateCommand.Flags().StringVar(&migrateOptions.slot, "slot", "", "The slot to migrate") MigrateCommand.Flags().IntVar(&migrateOptions.target, "target", -1, "The target node") MigrateCommand.Flags().StringVarP(&migrateOptions.namespace, "namespace", "n", "", "The namespace") MigrateCommand.Flags().StringVarP(&migrateOptions.cluster, "cluster", "c", "", "The cluster") diff --git a/consts/errors.go b/consts/errors.go index 8ad1034..bf7e51d 100644 --- a/consts/errors.go +++ b/consts/errors.go @@ -23,18 +23,20 @@ package consts import "errors" var ( - ErrInvalidArgument = errors.New("invalid argument") - ErrNotFound = errors.New("not found") - ErrForbidden = errors.New("forbidden") - ErrAlreadyExists = errors.New("already exists") - ErrIndexOutOfRange = errors.New("index out of range") - ErrShardIsSame = errors.New("source and target shard is same") - ErrSlotOutOfRange = errors.New("slot out of range") - ErrSlotNotBelongToAnyShard = errors.New("slot not belong to any shard") - ErrNodeIsNotMaster = errors.New("the old node is not master") - ErrOldMasterNodeNotFound = errors.New("old master node not found") - ErrShardNoReplica = errors.New("no replica in shard") - ErrShardIsServicing = errors.New("shard is servicing") - ErrShardSlotIsMigrating = errors.New("shard slot is migrating") - ErrShardNoMatchNewMaster = errors.New("no match new master in shard") + ErrInvalidArgument = errors.New("invalid argument") + ErrNotFound = errors.New("not found") + ErrForbidden = errors.New("forbidden") + ErrAlreadyExists = errors.New("already exists") + ErrIndexOutOfRange = errors.New("index out of range") + ErrShardIsSame = errors.New("source and target shard is same") + ErrSlotOutOfRange = errors.New("slot out of range") + ErrSlotNotBelongToAnyShard = errors.New("slot not belong to any shard") + ErrSlotRangeBelongsToMultipleShards = errors.New("slot range belongs to multiple shards") + ErrNodeIsNotMaster = errors.New("the old node is not master") + ErrOldMasterNodeNotFound = errors.New("old master node not found") + ErrShardNoReplica = errors.New("no replica in shard") + ErrShardIsServicing = errors.New("shard is servicing") + ErrShardSlotIsMigrating = errors.New("shard slot is migrating") + ErrShardNoMatchNewMaster = errors.New("no match new master in shard") + ErrSlotStartAndStopEqual = errors.New("start and stop of a range cannot be equal") ) diff --git a/controller/cluster.go b/controller/cluster.go index cf60b9c..2d6802e 100755 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -324,10 +324,16 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu log.Error("Failed to get the cluster info from the source node", zap.Error(err)) return } - if sourceNodeClusterInfo.MigratingSlot != shard.MigratingSlot { + if sourceNodeClusterInfo.MigratingSlot == nil { + log.Error("The source migration slot is empty", + zap.String("migrating_slot", shard.MigratingSlot.String()), + ) + return + } + if !sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot) { log.Error("Mismatch migrating slot", - zap.Int("source_migrating_slot", sourceNodeClusterInfo.MigratingSlot), - zap.Int("migrating_slot", shard.MigratingSlot), + zap.String("source_migrating_slot", sourceNodeClusterInfo.MigratingSlot.String()), + zap.String("migrating_slot", shard.MigratingSlot.String()), ) return } @@ -347,17 +353,18 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu return } c.updateCluster(clonedCluster) - log.Warn("Failed to migrate the slot", zap.Int("slot", migratingSlot)) + log.Warn("Failed to migrate the slot", zap.String("slot", migratingSlot.String())) case "success": - clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, shard.MigratingSlot) + clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, *shard.MigratingSlot) clonedCluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges( - clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot) + clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, *shard.MigratingSlot, + ) migratedSlot := shard.MigratingSlot clonedCluster.Shards[i].ClearMigrateState() if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil { log.Error("Failed to update the cluster", zap.Error(err)) } else { - log.Info("Migrate the slot successfully", zap.Int("slot", migratedSlot)) + log.Info("Migrate the slot successfully", zap.String("slot", migratedSlot.String())) } c.updateCluster(clonedCluster) default: diff --git a/controller/cluster_test.go b/controller/cluster_test.go index 2fd7db7..d415f3f 100644 --- a/controller/cluster_test.go +++ b/controller/cluster_test.go @@ -109,7 +109,7 @@ func TestCluster_FailureCount(t *testing.T) { mockNode0, mockNode1, mockNode2, mockNode3, }, SlotRanges: []store.SlotRange{{Start: 0, Stop: 16383}}, - MigratingSlot: -1, + MigratingSlot: nil, TargetShardIndex: -1, }}, } @@ -219,7 +219,9 @@ func TestCluster_MigrateSlot(t *testing.T) { defer func() { require.NoError(t, cluster.Reset(ctx)) }() - require.NoError(t, cluster.MigrateSlot(ctx, 0, 1, false)) + slotRange, err := store.NewSlotRange(0, 0) + require.NoError(t, err) + require.NoError(t, cluster.MigrateSlot(ctx, *slotRange, 1, false)) s := NewMockClusterStore() require.NoError(t, s.CreateCluster(ctx, ns, cluster)) diff --git a/server/api/cluster.go b/server/api/cluster.go index 3d1e896..b539c90 100644 --- a/server/api/cluster.go +++ b/server/api/cluster.go @@ -32,9 +32,9 @@ import ( ) type MigrateSlotRequest struct { - Target int `json:"target" validate:"required"` - Slot int `json:"slot" validate:"required"` - SlotOnly bool `json:"slot_only"` + Target int `json:"target" validate:"required"` + Slot store.SlotRange `json:"slot" validate:"required"` + SlotOnly bool `json:"slot_only"` } type CreateClusterRequest struct { diff --git a/server/api/cluster_test.go b/server/api/cluster_test.go index 53995ba..b1f1761 100644 --- a/server/api/cluster_test.go +++ b/server/api/cluster_test.go @@ -126,8 +126,10 @@ func TestClusterBasics(t *testing.T) { ctx := GetTestContext(recorder) ctx.Set(consts.ContextKeyStore, handler.s) ctx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key: "cluster", Value: clusterName}} + slotRange, err := store.NewSlotRange(3, 3) + require.NoError(t, err) testMigrateReq := &MigrateSlotRequest{ - Slot: 3, + Slot: *slotRange, SlotOnly: true, Target: 1, } @@ -163,7 +165,6 @@ func TestClusterBasics(t *testing.T) { runRemove(t, "test-cluster", http.StatusNoContent) runRemove(t, "not-exist", http.StatusNotFound) }) - } func TestClusterImport(t *testing.T) { @@ -233,8 +234,10 @@ func TestClusterMigrateData(t *testing.T) { reqCtx := GetTestContext(recorder) reqCtx.Set(consts.ContextKeyStore, handler.s) reqCtx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key: "cluster", Value: clusterName}} + slotRange, err := store.NewSlotRange(0, 0) + require.NoError(t, err) testMigrateReq := &MigrateSlotRequest{ - Slot: 0, + Slot: *slotRange, Target: 1, } body, err := json.Marshal(testMigrateReq) @@ -248,14 +251,15 @@ func TestClusterMigrateData(t *testing.T) { require.NoError(t, err) require.EqualValues(t, 1, gotCluster.Version.Load()) require.Len(t, gotCluster.Shards[0].SlotRanges, 1) - require.EqualValues(t, 0, gotCluster.Shards[0].MigratingSlot) + require.EqualValues(t, &store.SlotRange{Start: 0, Stop: 0}, gotCluster.Shards[0].MigratingSlot) require.EqualValues(t, 1, gotCluster.Shards[0].TargetShardIndex) ctrl, err := controller.New(handler.s.(*store.ClusterStore), &config.ControllerConfig{ FailOver: &config.FailOverConfig{ PingIntervalSeconds: 1, MaxPingCount: 3, - }}) + }, + }) require.NoError(t, err) require.NoError(t, ctrl.Start(ctx)) ctrl.WaitForReady() @@ -268,6 +272,6 @@ func TestClusterMigrateData(t *testing.T) { if err != nil { return false } - return gotCluster.Shards[0].MigratingSlot == -1 + return gotCluster.Shards[0].MigratingSlot == nil }, 10*time.Second, 100*time.Millisecond) } diff --git a/server/api/shard_test.go b/server/api/shard_test.go index c4cd642..cdaa89b 100644 --- a/server/api/shard_test.go +++ b/server/api/shard_test.go @@ -84,7 +84,8 @@ func TestShardBasics(t *testing.T) { ctx.Params = []gin.Param{ {Key: "namespace", Value: ns}, {Key: "cluster", Value: clusterName}, - {Key: "shard", Value: strconv.Itoa(shardIndex)}} + {Key: "shard", Value: strconv.Itoa(shardIndex)}, + } middleware.RequiredClusterShard(ctx) require.Equal(t, http.StatusOK, recorder.Code) @@ -103,7 +104,8 @@ func TestShardBasics(t *testing.T) { ctx.Params = []gin.Param{ {Key: "namespace", Value: ns}, {Key: "cluster", Value: clusterName}, - {Key: "shard", Value: "1"}} + {Key: "shard", Value: "1"}, + } middleware.RequiredClusterShard(ctx) require.Equal(t, http.StatusOK, recorder.Code) @@ -124,7 +126,7 @@ func TestShardBasics(t *testing.T) { nodeAddrs = append(nodeAddrs, node.Addr()) } require.ElementsMatch(t, []string{"127.0.0.1:1235", "127.0.0.1:1236"}, nodeAddrs) - require.EqualValues(t, -1, rsp.Data.Shard.MigratingSlot) + require.Nil(t, rsp.Data.Shard.MigratingSlot) require.EqualValues(t, -1, rsp.Data.Shard.TargetShardIndex) }) @@ -172,7 +174,8 @@ func TestClusterFailover(t *testing.T) { ctx.Params = []gin.Param{ {Key: "namespace", Value: ns}, {Key: "cluster", Value: clusterName}, - {Key: "shard", Value: strconv.Itoa(shardIndex)}} + {Key: "shard", Value: strconv.Itoa(shardIndex)}, + } middleware.RequiredClusterShard(ctx) require.Equal(t, http.StatusOK, recorder.Code) diff --git a/store/cluster.go b/store/cluster.go index 3bde499..b4bfd9e 100644 --- a/store/cluster.go +++ b/store/cluster.go @@ -132,7 +132,8 @@ func (cluster *Cluster) RemoveNode(shardIndex int, nodeID string) error { } func (cluster *Cluster) PromoteNewMaster(ctx context.Context, - shardIdx int, masterNodeID, preferredNodeID string) (string, error) { + shardIdx int, masterNodeID, preferredNodeID string, +) (string, error) { shard, err := cluster.GetShard(shardIdx) if err != nil { return "", err @@ -175,17 +176,16 @@ func (cluster *Cluster) Reset(ctx context.Context) error { return nil } -func (cluster *Cluster) findShardIndexBySlot(slot int) (int, error) { - if slot < 0 || slot > MaxSlotID { - return -1, consts.ErrSlotOutOfRange - } +func (cluster *Cluster) findShardIndexBySlot(slot SlotRange) (int, error) { sourceShardIdx := -1 for i := 0; i < len(cluster.Shards); i++ { slotRanges := cluster.Shards[i].SlotRanges for _, slotRange := range slotRanges { - if slotRange.Contains(slot) { + if slotRange.HasOverlap(&slot) { + if sourceShardIdx != -1 { + return sourceShardIdx, consts.ErrSlotRangeBelongsToMultipleShards + } sourceShardIdx = i - break } } } @@ -195,7 +195,7 @@ func (cluster *Cluster) findShardIndexBySlot(slot int) (int, error) { return sourceShardIdx, nil } -func (cluster *Cluster) MigrateSlot(ctx context.Context, slot int, targetShardIdx int, slotOnly bool) error { +func (cluster *Cluster) MigrateSlot(ctx context.Context, slot SlotRange, targetShardIdx int, slotOnly bool) error { if targetShardIdx < 0 || targetShardIdx >= len(cluster.Shards) { return consts.ErrIndexOutOfRange } @@ -226,7 +226,7 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context, slot int, targetShardId } // Will start the data migration in the background - cluster.Shards[sourceShardIdx].MigratingSlot = slot + cluster.Shards[sourceShardIdx].MigratingSlot = &slot cluster.Shards[sourceShardIdx].TargetShardIndex = targetShardIdx return nil } diff --git a/store/cluster_node.go b/store/cluster_node.go index 3434595..e16ae83 100644 --- a/store/cluster_node.go +++ b/store/cluster_node.go @@ -68,7 +68,7 @@ type Node interface { GetClusterInfo(ctx context.Context) (*ClusterInfo, error) SyncClusterInfo(ctx context.Context, cluster *Cluster) error CheckClusterMode(ctx context.Context) (int64, error) - MigrateSlot(ctx context.Context, slot int, NodeID string) error + MigrateSlot(ctx context.Context, slot SlotRange, NodeID string) error MarshalJSON() ([]byte, error) UnmarshalJSON(data []byte) error @@ -85,9 +85,9 @@ type ClusterNode struct { } type ClusterInfo struct { - CurrentEpoch int64 `json:"cluster_current_epoch"` - MigratingSlot int `json:"migrating_slot"` - MigratingState string `json:"migrating_state"` + CurrentEpoch int64 `json:"cluster_current_epoch"` + MigratingSlot *SlotRange `json:"migrating_slot"` + MigratingState string `json:"migrating_state"` } type ClusterNodeInfo struct { @@ -195,7 +195,7 @@ func (n *ClusterNode) GetClusterInfo(ctx context.Context) (*ClusterInfo, error) } case "migrating_slot", "migrating_slot(s)": // TODO(@git-hulk): handle multiple migrating slots - clusterInfo.MigratingSlot, err = strconv.Atoi(fields[1]) + clusterInfo.MigratingSlot, err = ParseSlotRange(fields[1]) if err != nil { return nil, err } @@ -257,8 +257,8 @@ func (n *ClusterNode) Reset(ctx context.Context) error { return n.GetClient().ClusterResetHard(ctx).Err() } -func (n *ClusterNode) MigrateSlot(ctx context.Context, slot int, targetNodeID string) error { - return n.GetClient().Do(ctx, "CLUSTERX", "MIGRATE", slot, targetNodeID).Err() +func (n *ClusterNode) MigrateSlot(ctx context.Context, slot SlotRange, targetNodeID string) error { + return n.GetClient().Do(ctx, "CLUSTERX", "MIGRATE", slot.String(), targetNodeID).Err() } func (n *ClusterNode) MarshalJSON() ([]byte, error) { diff --git a/store/cluster_shard.go b/store/cluster_shard.go index 1de4dc5..62c8826 100644 --- a/store/cluster_shard.go +++ b/store/cluster_shard.go @@ -37,7 +37,7 @@ type Shard struct { Nodes []Node `json:"nodes"` SlotRanges []SlotRange `json:"slot_ranges"` TargetShardIndex int `json:"target_shard_index"` - MigratingSlot int `json:"migrating_slot"` + MigratingSlot *SlotRange `json:"migrating_slot"` } type Shards []*Shard @@ -45,9 +45,11 @@ type Shards []*Shard func (s Shards) Len() int { return len(s) } + func (s Shards) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + func (s Shards) Less(i, j int) bool { if len(s[i].SlotRanges) == 0 { return false @@ -61,7 +63,7 @@ func NewShard() *Shard { return &Shard{ Nodes: make([]Node, 0), SlotRanges: make([]SlotRange, 0), - MigratingSlot: -1, + MigratingSlot: nil, TargetShardIndex: -1, } } @@ -78,7 +80,7 @@ func (shard *Shard) Clone() *Shard { } func (shard *Shard) ClearMigrateState() { - shard.MigratingSlot = -1 + shard.MigratingSlot = nil shard.TargetShardIndex = -1 } @@ -110,7 +112,7 @@ func (shard *Shard) addNode(addr, role, password string) error { } func (shard *Shard) IsMigrating() bool { - return shard.MigratingSlot != -1 && shard.TargetShardIndex != -1 + return shard.MigratingSlot != nil && shard.TargetShardIndex != -1 } func (shard *Shard) GetMasterNode() Node { @@ -259,7 +261,7 @@ func (shard *Shard) UnmarshalJSON(bytes []byte) error { var data struct { SlotRanges []SlotRange `json:"slot_ranges"` TargetShardIndex int `json:"target_shard_index"` - MigratingSlot int `json:"migrating_slot"` + MigratingSlot *SlotRange `json:"migrating_slot"` Nodes []*ClusterNode `json:"nodes"` } if err := json.Unmarshal(bytes, &data); err != nil { diff --git a/store/cluster_shard_test.go b/store/cluster_shard_test.go index 43ea157..994046f 100644 --- a/store/cluster_shard_test.go +++ b/store/cluster_shard_test.go @@ -54,17 +54,19 @@ func TestShard_Sort(t *testing.T) { } func TestShard_IsServicing(t *testing.T) { + var err error shard := NewShard() shard.TargetShardIndex = 0 - shard.MigratingSlot = -1 + shard.MigratingSlot = nil require.False(t, shard.IsServicing()) shard.TargetShardIndex = 0 - shard.MigratingSlot = 0 + shard.MigratingSlot, err = NewSlotRange(1, 1) + require.Nil(t, err) require.True(t, shard.IsServicing()) shard.TargetShardIndex = -1 - shard.MigratingSlot = -1 + shard.MigratingSlot = nil shard.SlotRanges = []SlotRange{{Start: 0, Stop: 100}} require.True(t, shard.IsServicing()) diff --git a/store/cluster_test.go b/store/cluster_test.go index cb62545..6b6e45c 100644 --- a/store/cluster_test.go +++ b/store/cluster_test.go @@ -42,22 +42,23 @@ func TestCluster_FindIndexShardBySlot(t *testing.T) { cluster, err := NewCluster("test", []string{"node1", "node2", "node3"}, 1) require.NoError(t, err) - shard, err := cluster.findShardIndexBySlot(0) + slotRange, err := NewSlotRange(0, 0) + require.NoError(t, err) + shard, err := cluster.findShardIndexBySlot(*slotRange) require.NoError(t, err) require.Equal(t, 0, shard) - shard, err = cluster.findShardIndexBySlot(MaxSlotID/3 + 1) + slotRange, err = NewSlotRange(MaxSlotID/3+1, MaxSlotID/3+1) + require.NoError(t, err) + shard, err = cluster.findShardIndexBySlot(*slotRange) require.NoError(t, err) require.Equal(t, 1, shard) - shard, err = cluster.findShardIndexBySlot(MaxSlotID) + slotRange, err = NewSlotRange(MaxSlotID, MaxSlotID) + require.NoError(t, err) + shard, err = cluster.findShardIndexBySlot(*slotRange) require.NoError(t, err) require.Equal(t, 2, shard) - - _, err = cluster.findShardIndexBySlot(-1) - require.ErrorIs(t, err, consts.ErrSlotOutOfRange) - _, err = cluster.findShardIndexBySlot(MaxSlotID + 1) - require.ErrorIs(t, err, consts.ErrSlotOutOfRange) } func TestCluster_PromoteNewMaster(t *testing.T) { diff --git a/store/slot.go b/store/slot.go index 8544aa6..04a2b52 100644 --- a/store/slot.go +++ b/store/slot.go @@ -22,9 +22,12 @@ package store import ( "encoding/json" "errors" + "fmt" "sort" "strconv" "strings" + + "github.com/apache/kvrocks-controller/consts" ) const ( @@ -35,15 +38,15 @@ const ( var ErrSlotOutOfRange = errors.New("slot id was out of range, should be between 0 and 16383") type SlotRange struct { - Start int `json:"start"` - Stop int `json:"stop"` + Start int `json:"start"` // inclusive + Stop int `json:"stop"` // inclusive } type SlotRanges []SlotRange func NewSlotRange(start, stop int) (*SlotRange, error) { if start > stop { - return nil, errors.New("start was larger than Shutdown") + return nil, errors.New("start was larger than stop") } if (start < MinSlotID || start > MaxSlotID) || (stop < MinSlotID || stop > MaxSlotID) { @@ -55,8 +58,21 @@ func NewSlotRange(start, stop int) (*SlotRange, error) { }, nil } +func (slotRange *SlotRange) Equal(that *SlotRange) bool { + if that == nil { + return false + } + if slotRange.Start != that.Start { + return false + } + if slotRange.Stop != that.Stop { + return false + } + return true +} + func (slotRange *SlotRange) HasOverlap(that *SlotRange) bool { - return !(slotRange.Stop < that.Start || slotRange.Start > that.Stop) + return slotRange.Stop >= that.Start && slotRange.Start <= that.Stop } func (slotRange *SlotRange) Contains(slot int) bool { @@ -88,6 +104,11 @@ func (slotRange *SlotRange) UnmarshalJSON(data []byte) error { } func ParseSlotRange(s string) (*SlotRange, error) { + numberOfRanges := strings.Count(s, "-") + if numberOfRanges > 1 { + return nil, fmt.Errorf("%w, cannot have more than one range", consts.ErrInvalidArgument) + } + index := strings.IndexByte(s, '-') if index == -1 { start, err := strconv.Atoi(s) @@ -112,7 +133,7 @@ func ParseSlotRange(s string) (*SlotRange, error) { return nil, err } if start > stop { - return nil, errors.New("start slot id greater than Shutdown slot id") + return nil, errors.New("start slot id greater than stop slot id") } if (start < MinSlotID || start > MaxSlotID) || (stop < MinSlotID || stop > MaxSlotID) { @@ -133,87 +154,89 @@ func (SlotRanges *SlotRanges) Contains(slot int) bool { return false } -func AddSlotToSlotRanges(source SlotRanges, slot int) SlotRanges { - sort.Slice(source, func(i, j int) bool { - return source[i].Start < source[j].Start - }) - if len(source) == 0 { - return append(source, SlotRange{Start: slot, Stop: slot}) +func (SlotRanges *SlotRanges) HasOverlap(slotRange SlotRange) bool { + for _, slotRange := range *SlotRanges { + if slotRange.HasOverlap(&slotRange) { + return true + } } - if source[0].Start-1 > slot { - return append([]SlotRange{{Start: slot, Stop: slot}}, source...) + return false +} + +// CanMerge will return true if the given SlotRanges are adjacent with each other +func CanMerge(a, b SlotRange) bool { + // Ensure a starts before b for easier comparison + if a.Start > b.Start { + a, b = b, a } - if source[len(source)-1].Stop+1 < slot { - return append(source, SlotRange{Start: slot, Stop: slot}) + // If the end of `a` is at least one less than the start of `b`, they can merge + return a.Stop+1 >= b.Start +} + +func MergeSlotRanges(a SlotRange, b SlotRange) SlotRange { + return SlotRange{ + Start: min(a.Start, b.Start), + Stop: max(a.Stop, b.Stop), } +} - // first run is to find the fittest slot range and create a new one if necessary - for i, slotRange := range source { - if slotRange.Contains(slot) { - return source - } - // check next slot range, it won't be the last one since we have checked it before - if slotRange.Stop+1 < slot { - continue - } - if slotRange.Start == slot+1 { - source[i].Start = slot - } else if slotRange.Stop == slot-1 { - source[i].Stop = slot - } else if slotRange.Start > slot { - // no suitable slot range, create a new one before the current slot range - tmp := make(SlotRanges, len(source)+1) - copy(tmp, source[0:i]) - tmp[i] = SlotRange{Start: slot, Stop: slot} - copy(tmp[i+1:], source[i:]) - source = tmp +// Implemented following leetcode solution: +// https://leetcode.com/problems/merge-intervals/solutions/1805268/go-clean-code-with-explanation-and-visual-10ms-100 +func AddSlotToSlotRanges(source SlotRanges, slot SlotRange) SlotRanges { + if len(source) == 0 { + return append(source, slot) + } + source = append(source, slot) + sort.Slice(source, func(i, j int) bool { + return source[i].Start < source[j].Start + }) + + mergedSlotRanges := make([]SlotRange, 0, len(source)) + mergedSlotRanges = append(mergedSlotRanges, source[0]) + + for _, interval := range source[1:] { + lastIntervalPos := len(mergedSlotRanges) - 1 + lastInterval := mergedSlotRanges[lastIntervalPos] + if CanMerge(lastInterval, interval) { + mergedSlotRanges[lastIntervalPos] = MergeSlotRanges(interval, lastInterval) } else { - // should not reach here - panic("should not reach here") - } - break - } - // merge the slot ranges if necessary - for i := 0; i < len(source)-1; i++ { - if source[i].Stop+1 == source[i+1].Start { - source[i].Stop = source[i+1].Stop - if i+1 == len(source)-1 { - // remove the last slot range - source = source[:i+1] - } else { - source = append(source[:i+1], source[i+2:]...) - } + mergedSlotRanges = append(mergedSlotRanges, interval) } } - return source + + return mergedSlotRanges } -func RemoveSlotFromSlotRanges(source SlotRanges, slot int) SlotRanges { +func RemoveSlotFromSlotRanges(source SlotRanges, slot SlotRange) SlotRanges { sort.Slice(source, func(i, j int) bool { return source[i].Start < source[j].Start }) - if !source.Contains(slot) { + if !source.HasOverlap(slot) { return source } - for i, slotRange := range source { - if slotRange.Contains(slot) { - if slotRange.Start == slot && slotRange.Stop == slot { - source = append(source[0:i], source[i+1:]...) - } else if slotRange.Start == slot { - source[i].Start = slot + 1 - } else if slotRange.Stop == slot { - source[i].Stop = slot - 1 - } else { - tmp := make(SlotRanges, len(source)+1) - copy(tmp, source[0:i]) - tmp[i] = SlotRange{Start: slotRange.Start, Stop: slot - 1} - tmp[i+1] = SlotRange{Start: slot + 1, Stop: slotRange.Stop} - copy(tmp[i+2:], source[i+1:]) - source = tmp - } + + result := make([]SlotRange, 0, len(source)) + for _, slotRange := range source { + // if no overlap, keep original range + if !slotRange.HasOverlap(&slot) { + result = append(result, slotRange) + continue + } + // if overlap, then we need to create a new left and right range + if slotRange.Start < slot.Start { + result = append(result, SlotRange{ + Start: slotRange.Start, + Stop: slot.Start - 1, + }) + } + if slotRange.Stop > slot.Stop { + result = append(result, SlotRange{ + Start: slot.Stop + 1, + Stop: slotRange.Stop, + }) } } - return source + return result } func CalculateSlotRanges(n int) SlotRanges { diff --git a/store/slot_test.go b/store/slot_test.go index f5740af..9158198 100644 --- a/store/slot_test.go +++ b/store/slot_test.go @@ -22,6 +22,7 @@ package store import ( "testing" + "github.com/apache/kvrocks-controller/consts" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -47,6 +48,16 @@ func TestSlotRange_Parse(t *testing.T) { assert.Equal(t, 1, sr.Start) assert.Equal(t, 12, sr.Stop) + sr, err = ParseSlotRange("5") + assert.Nil(t, err) + assert.Equal(t, 5, sr.Start) + assert.Equal(t, 5, sr.Stop) + + sr, err = ParseSlotRange("0") + assert.Nil(t, err) + assert.Equal(t, 0, sr.Start) + assert.Equal(t, 0, sr.Stop) + _, err = ParseSlotRange("1-65536") assert.Equal(t, ErrSlotOutOfRange, err) @@ -55,6 +66,12 @@ func TestSlotRange_Parse(t *testing.T) { _, err = ParseSlotRange("12-1") assert.NotNil(t, err) + + _, err = ParseSlotRange("1-12 5-10") + require.ErrorIs(t, err, consts.ErrInvalidArgument) + + _, err = ParseSlotRange("1-12, 5") + assert.NotNil(t, err) } func TestAddSlotToSlotRanges(t *testing.T) { @@ -63,25 +80,35 @@ func TestAddSlotToSlotRanges(t *testing.T) { {Start: 101, Stop: 199}, {Start: 201, Stop: 300}, } - slotRanges = AddSlotToSlotRanges(slotRanges, 0) - require.Equal(t, 3, len(slotRanges)) - require.EqualValues(t, SlotRange{Start: 0, Stop: 20}, slotRanges[0]) + slotRange, err := NewSlotRange(0, 0) + require.NoError(t, err) + slotRanges = AddSlotToSlotRanges(slotRanges, *slotRange) + require.Equal(t, 3, len(slotRanges), slotRanges) + require.EqualValues(t, SlotRange{Start: 0, Stop: 20}, slotRanges[0], slotRanges) - slotRanges = AddSlotToSlotRanges(slotRanges, 21) - require.Equal(t, 3, len(slotRanges)) - require.EqualValues(t, SlotRange{Start: 0, Stop: 21}, slotRanges[0]) + slotRange, err = NewSlotRange(21, 21) + require.NoError(t, err) + slotRanges = AddSlotToSlotRanges(slotRanges, *slotRange) + require.Equal(t, 3, len(slotRanges), slotRanges) + require.EqualValues(t, SlotRange{Start: 0, Stop: 21}, slotRanges[0], slotRanges) - slotRanges = AddSlotToSlotRanges(slotRanges, 50) - require.Equal(t, 4, len(slotRanges)) - require.EqualValues(t, SlotRange{Start: 50, Stop: 50}, slotRanges[1]) + slotRange, err = NewSlotRange(50, 50) + require.NoError(t, err) + slotRanges = AddSlotToSlotRanges(slotRanges, *slotRange) + require.Equal(t, 4, len(slotRanges), slotRanges) + require.EqualValues(t, SlotRange{Start: 50, Stop: 50}, slotRanges[1], slotRanges) - slotRanges = AddSlotToSlotRanges(slotRanges, 200) - require.Equal(t, 3, len(slotRanges)) - require.EqualValues(t, SlotRange{Start: 101, Stop: 300}, slotRanges[2]) + slotRange, err = NewSlotRange(200, 200) + require.NoError(t, err) + slotRanges = AddSlotToSlotRanges(slotRanges, *slotRange) + require.Equal(t, 3, len(slotRanges), slotRanges) + require.EqualValues(t, SlotRange{Start: 101, Stop: 300}, slotRanges[2], slotRanges) - slotRanges = AddSlotToSlotRanges(slotRanges, 400) - require.Equal(t, 4, len(slotRanges)) - require.EqualValues(t, SlotRange{Start: 400, Stop: 400}, slotRanges[3]) + slotRange, err = NewSlotRange(400, 400) + require.NoError(t, err) + slotRanges = AddSlotToSlotRanges(slotRanges, *slotRange) + require.Equal(t, 4, len(slotRanges), slotRanges) + require.EqualValues(t, SlotRange{Start: 400, Stop: 400}, slotRanges[3], slotRanges) } func TestRemoveSlotRanges(t *testing.T) { @@ -90,42 +117,60 @@ func TestRemoveSlotRanges(t *testing.T) { {Start: 101, Stop: 199}, {Start: 201, Stop: 300}, } - slotRanges = RemoveSlotFromSlotRanges(slotRanges, 0) - require.Equal(t, 3, len(slotRanges)) - require.EqualValues(t, SlotRange{Start: 1, Stop: 20}, slotRanges[0]) - - slotRanges = RemoveSlotFromSlotRanges(slotRanges, 21) - require.Equal(t, 3, len(slotRanges)) - require.EqualValues(t, SlotRange{Start: 1, Stop: 20}, slotRanges[0]) - - slotRanges = RemoveSlotFromSlotRanges(slotRanges, 20) - require.Equal(t, 3, len(slotRanges)) - require.EqualValues(t, SlotRange{Start: 1, Stop: 19}, slotRanges[0]) - - slotRanges = RemoveSlotFromSlotRanges(slotRanges, 150) - require.Equal(t, 4, len(slotRanges)) - require.EqualValues(t, SlotRange{Start: 101, Stop: 149}, slotRanges[1]) - - slotRanges = RemoveSlotFromSlotRanges(slotRanges, 101) - require.Equal(t, 4, len(slotRanges)) - require.EqualValues(t, SlotRange{Start: 102, Stop: 149}, slotRanges[1]) - - slotRanges = RemoveSlotFromSlotRanges(slotRanges, 199) - require.Equal(t, 4, len(slotRanges)) - require.EqualValues(t, SlotRange{Start: 151, Stop: 198}, slotRanges[2]) - - slotRanges = RemoveSlotFromSlotRanges(slotRanges, 300) - require.Equal(t, 4, len(slotRanges)) - require.EqualValues(t, SlotRange{Start: 201, Stop: 299}, slotRanges[3]) - - slotRanges = RemoveSlotFromSlotRanges(slotRanges, 298) - require.Equal(t, 5, len(slotRanges)) - require.EqualValues(t, SlotRange{Start: 201, Stop: 297}, slotRanges[3]) - require.EqualValues(t, SlotRange{Start: 299, Stop: 299}, slotRanges[4]) - - slotRanges = RemoveSlotFromSlotRanges(slotRanges, 299) - require.Equal(t, 4, len(slotRanges)) - require.EqualValues(t, SlotRange{Start: 201, Stop: 297}, slotRanges[3]) + slotRange, err := NewSlotRange(0, 0) + require.NoError(t, err) + slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange) + require.Equal(t, 3, len(slotRanges), slotRanges) + require.EqualValues(t, SlotRange{Start: 1, Stop: 20}, slotRanges[0], slotRanges) + + slotRange, err = NewSlotRange(21, 21) + require.NoError(t, err) + slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange) + require.Equal(t, 3, len(slotRanges), slotRanges) + require.EqualValues(t, SlotRange{Start: 1, Stop: 20}, slotRanges[0], slotRanges) + + slotRange, err = NewSlotRange(20, 20) + require.NoError(t, err) + slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange) + require.Equal(t, 3, len(slotRanges), slotRanges) + require.EqualValues(t, SlotRange{Start: 1, Stop: 19}, slotRanges[0], slotRanges) + + slotRange, err = NewSlotRange(150, 150) + require.NoError(t, err) + slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange) + require.Equal(t, 4, len(slotRanges), slotRanges) + require.EqualValues(t, SlotRange{Start: 101, Stop: 149}, slotRanges[1], slotRanges) + + slotRange, err = NewSlotRange(101, 101) + require.NoError(t, err) + slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange) + require.Equal(t, 4, len(slotRanges), slotRanges) + require.EqualValues(t, SlotRange{Start: 102, Stop: 149}, slotRanges[1], slotRanges) + + slotRange, err = NewSlotRange(199, 199) + require.NoError(t, err) + slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange) + require.Equal(t, 4, len(slotRanges), slotRanges) + require.EqualValues(t, SlotRange{Start: 151, Stop: 198}, slotRanges[2], slotRanges) + + slotRange, err = NewSlotRange(300, 300) + require.NoError(t, err) + slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange) + require.Equal(t, 4, len(slotRanges), slotRanges) + require.EqualValues(t, SlotRange{Start: 201, Stop: 299}, slotRanges[3], slotRanges) + + slotRange, err = NewSlotRange(298, 298) + require.NoError(t, err) + slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange) + require.Equal(t, 5, len(slotRanges), slotRanges) + require.EqualValues(t, SlotRange{Start: 201, Stop: 297}, slotRanges[3], slotRanges) + require.EqualValues(t, SlotRange{Start: 299, Stop: 299}, slotRanges[4], slotRanges) + + slotRange, err = NewSlotRange(299, 299) + require.NoError(t, err) + slotRanges = RemoveSlotFromSlotRanges(slotRanges, *slotRange) + require.Equal(t, 4, len(slotRanges), slotRanges) + require.EqualValues(t, SlotRange{Start: 201, Stop: 297}, slotRanges[3], slotRanges) } func TestCalculateSlotRanges(t *testing.T) { @@ -135,3 +180,148 @@ func TestCalculateSlotRanges(t *testing.T) { assert.Equal(t, 13104, slots[4].Start) assert.Equal(t, 16383, slots[4].Stop) } + +func TestSlotRange_HasOverlap(t *testing.T) { + type fields struct { + Start int + Stop int + } + type args struct { + that *SlotRange + } + tests := []struct { + name string + fields fields + args args + want bool + }{ + { + name: "0-5 does not overlap 6-7", + fields: fields{Start: 0, Stop: 5}, + args: args{&SlotRange{Start: 6, Stop: 7}}, + want: false, + }, + { + name: "0-5 does overlap 3-4", + fields: fields{Start: 0, Stop: 5}, + args: args{&SlotRange{Start: 3, Stop: 4}}, + want: true, + }, + { + name: "0-5 does overlap 5-8", + fields: fields{Start: 0, Stop: 5}, + args: args{&SlotRange{Start: 5, Stop: 8}}, + want: true, + }, + { + name: "0-5 does overlap 4-8", + fields: fields{Start: 0, Stop: 5}, + args: args{&SlotRange{Start: 4, Stop: 8}}, + want: true, + }, + { + name: "0-100 does not overlap 101-150", + fields: fields{Start: 0, Stop: 100}, + args: args{&SlotRange{Start: 101, Stop: 150}}, + want: false, + }, + { + name: "50-100 does overlap 30-50", + fields: fields{Start: 50, Stop: 100}, + args: args{&SlotRange{Start: 30, Stop: 50}}, + want: true, + }, + { + name: "50-100 does overlap 50-51", + fields: fields{Start: 50, Stop: 100}, + args: args{&SlotRange{Start: 50, Stop: 51}}, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + slotRange := &SlotRange{ + Start: tt.fields.Start, + Stop: tt.fields.Stop, + } + if got := slotRange.HasOverlap(tt.args.that); got != tt.want { + t.Errorf("SlotRange.HasOverlap() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestCanMerge(t *testing.T) { + type args struct { + a SlotRange + b SlotRange + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "0-5 and 6-10 can merge", + args: args{SlotRange{0, 5}, SlotRange{6, 10}}, + want: true, + }, + { + name: "6-10 and 0-5 can merge", + args: args{SlotRange{6, 10}, SlotRange{0, 5}}, + want: true, + }, + { + name: "6-6 and 0-5 can merge", + args: args{SlotRange{6, 6}, SlotRange{0, 5}}, + want: true, + }, + { + name: "0-5 and 7-10 cannot merge", + args: args{SlotRange{0, 5}, SlotRange{7, 10}}, + want: false, + }, + { + name: "7-10 and 0-5 cannot merge", + args: args{SlotRange{7, 10}, SlotRange{0, 5}}, + want: false, + }, + { + name: "2-2 and 4-4 cannot merge", + args: args{SlotRange{2, 2}, SlotRange{4, 4}}, + want: false, + }, + { + name: "4-4 and 2-2 cannot merge", + args: args{SlotRange{4, 4}, SlotRange{2, 2}}, + want: false, + }, + { + name: "2-3 and 4-4 can merge", + args: args{SlotRange{2, 3}, SlotRange{4, 4}}, + want: true, + }, + { + name: "4-4 and 2-3 can merge", + args: args{SlotRange{4, 4}, SlotRange{2, 3}}, + want: true, + }, + { + name: "4-4 and 3-3 can merge", + args: args{SlotRange{4, 4}, SlotRange{3, 3}}, + want: true, + }, + { + name: "3-3 and 4-4 can merge", + args: args{SlotRange{3, 3}, SlotRange{4, 4}}, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := CanMerge(tt.args.a, tt.args.b); got != tt.want { + t.Errorf("CanMerge() = %v, want %v", got, tt.want) + } + }) + } +}