This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new 590a0b4 Improve the operation of slot in SlotRanges (#169)
590a0b4 is described below
commit 590a0b4a5f67d25a4852c69af2539a6f9aee888b
Author: hulk <[email protected]>
AuthorDate: Mon Apr 29 10:38:58 2024 +0800
Improve the operation of slot in SlotRanges (#169)
Currently, the continuous slot ranges won't be merged while the new slot is
added
which is expected in Redis behavior. For example, we now have below slot
ranges in one shard: [1,100], [102, 200], [202, 300] and the slot `101`
was added to this shard, we would like to become: [1, 200], [202, 300]
instead of [1, 101], [102, 200], [202, 300].
---
controller/cluster.go | 8 +--
store/cluster.go | 7 +--
store/slot.go | 132 ++++++++++++++++++++++++---------------
store/slot_test.go | 168 ++++++++++++++++++++------------------------------
4 files changed, 157 insertions(+), 158 deletions(-)
diff --git a/controller/cluster.go b/controller/cluster.go
index b086419..27fcc6b 100644
--- a/controller/cluster.go
+++ b/controller/cluster.go
@@ -298,11 +298,9 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx
context.Context, cluster *
log.Error("Failed to set the slot",
zap.Error(err))
return
}
- cluster.Shards[i].SlotRanges =
store.RemoveSlotRanges(cluster.Shards[i].SlotRanges,
- []store.SlotRange{{Start: shard.MigratingSlot,
Stop: shard.MigratingSlot}})
- cluster.Shards[shard.TargetShardIndex].SlotRanges =
store.MergeSlotRanges(
-
cluster.Shards[shard.TargetShardIndex].SlotRanges,
- []store.SlotRange{{Start: shard.MigratingSlot,
Stop: shard.MigratingSlot}})
+ cluster.Shards[i].SlotRanges =
store.RemoveSlotFromSlotRanges(cluster.Shards[i].SlotRanges,
shard.MigratingSlot)
+ cluster.Shards[shard.TargetShardIndex].SlotRanges =
store.AddSlotToSlotRanges(
+
cluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot)
cluster.Shards[i].ClearMigrateState()
if err := c.clusterStore.SetCluster(ctx, c.namespace,
cluster); err != nil {
log.Error("Failed to update the cluster",
zap.Error(err))
diff --git a/store/cluster.go b/store/cluster.go
index 75f6cef..ec72586 100644
--- a/store/cluster.go
+++ b/store/cluster.go
@@ -53,7 +53,7 @@ func NewCluster(name string, nodes []string, replicas int)
(*Cluster, error) {
}
shardCount := len(nodes) / replicas
shards := make([]*Shard, 0)
- slotRanges := SpiltSlotRange(shardCount)
+ slotRanges := CalculateSlotRanges(shardCount)
for i := 0; i < shardCount; i++ {
shard := NewShard()
shard.Nodes = make([]Node, 0)
@@ -186,9 +186,8 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context,
slot int, targetShardId
return consts.ErrShardIsSame
}
if slotOnly {
- migrateSlot := SlotRange{Start: slot, Stop: slot}
- cluster.Shards[sourceShardIdx].SlotRanges =
RemoveSlotRanges(cluster.Shards[sourceShardIdx].SlotRanges,
[]SlotRange{migrateSlot})
- cluster.Shards[targetShardIdx].SlotRanges =
MergeSlotRanges(cluster.Shards[targetShardIdx].SlotRanges,
[]SlotRange{migrateSlot})
+ cluster.Shards[sourceShardIdx].SlotRanges =
RemoveSlotFromSlotRanges(cluster.Shards[sourceShardIdx].SlotRanges, slot)
+ cluster.Shards[targetShardIdx].SlotRanges =
AddSlotToSlotRanges(cluster.Shards[targetShardIdx].SlotRanges, slot)
return nil
}
diff --git a/store/slot.go b/store/slot.go
index db088db..8544aa6 100644
--- a/store/slot.go
+++ b/store/slot.go
@@ -39,6 +39,8 @@ type SlotRange struct {
Stop int `json:"stop"`
}
+type SlotRanges []SlotRange
+
func NewSlotRange(start, stop int) (*SlotRange, error) {
if start > stop {
return nil, errors.New("start was larger than Shutdown")
@@ -122,69 +124,103 @@ func ParseSlotRange(s string) (*SlotRange, error) {
}, nil
}
-func MergeSlotRanges(source []SlotRange, target []SlotRange) []SlotRange {
- source = append(source, target...)
- if len(source) == 1 {
- return source
+func (SlotRanges *SlotRanges) Contains(slot int) bool {
+ for _, slotRange := range *SlotRanges {
+ if slotRange.Contains(slot) {
+ return true
+ }
}
+ return false
+}
+
+func AddSlotToSlotRanges(source SlotRanges, slot int) SlotRanges {
sort.Slice(source, func(i, j int) bool {
return source[i].Start < source[j].Start
})
- merged := make([]SlotRange, 0)
- start := source[0].Start
- stop := source[0].Stop
- for i := 1; i < len(source); i++ {
- if stop+1 < source[i].Start {
- merged = append(merged, SlotRange{Start: start, Stop:
stop})
- start = source[i].Start
- stop = source[i].Stop
- } else if stop < source[i].Stop {
- stop = source[i].Stop
+ if len(source) == 0 {
+ return append(source, SlotRange{Start: slot, Stop: slot})
+ }
+ if source[0].Start-1 > slot {
+ return append([]SlotRange{{Start: slot, Stop: slot}}, source...)
+ }
+ if source[len(source)-1].Stop+1 < slot {
+ return append(source, SlotRange{Start: slot, Stop: slot})
+ }
+
+ // first run is to find the fittest slot range and create a new one if
necessary
+ for i, slotRange := range source {
+ if slotRange.Contains(slot) {
+ return source
+ }
+ // check next slot range, it won't be the last one since we
have checked it before
+ if slotRange.Stop+1 < slot {
+ continue
+ }
+ if slotRange.Start == slot+1 {
+ source[i].Start = slot
+ } else if slotRange.Stop == slot-1 {
+ source[i].Stop = slot
+ } else if slotRange.Start > slot {
+ // no suitable slot range, create a new one before the
current slot range
+ tmp := make(SlotRanges, len(source)+1)
+ copy(tmp, source[0:i])
+ tmp[i] = SlotRange{Start: slot, Stop: slot}
+ copy(tmp[i+1:], source[i:])
+ source = tmp
+ } else {
+ // should not reach here
+ panic("should not reach here")
}
+ break
}
- merged = append(merged, SlotRange{Start: start, Stop: stop})
- return merged
+ // merge the slot ranges if necessary
+ for i := 0; i < len(source)-1; i++ {
+ if source[i].Stop+1 == source[i+1].Start {
+ source[i].Stop = source[i+1].Stop
+ if i+1 == len(source)-1 {
+ // remove the last slot range
+ source = source[:i+1]
+ } else {
+ source = append(source[:i+1], source[i+2:]...)
+ }
+ }
+ }
+ return source
}
-func RemoveSlotRanges(source []SlotRange, target []SlotRange) []SlotRange {
- for delIdx := 0; delIdx < len(target); {
- deleteSlotRange := target[delIdx]
- sort.Slice(source, func(i, j int) bool {
- return source[i].Start < source[j].Start
- })
- skip := true
- for i, slotRange := range source {
- if !slotRange.HasOverlap(&deleteSlotRange) {
- continue
- }
- skip = false
- source = append(source[0:i], source[i+1:]...)
- if deleteSlotRange.Start == slotRange.Start &&
deleteSlotRange.Stop < slotRange.Stop {
- source = append(source, SlotRange{Start:
deleteSlotRange.Stop + 1, Stop: slotRange.Stop})
- } else if deleteSlotRange.Stop == slotRange.Stop &&
deleteSlotRange.Start > slotRange.Start {
- source = append(source, SlotRange{Start:
slotRange.Start, Stop: deleteSlotRange.Start - 1})
- } else if deleteSlotRange.Start < slotRange.Start &&
deleteSlotRange.Stop < slotRange.Stop {
- source = append(source, SlotRange{Start:
deleteSlotRange.Stop + 1, Stop: slotRange.Stop})
- } else if deleteSlotRange.Start > slotRange.Start &&
deleteSlotRange.Stop > slotRange.Stop {
- source = append(source, SlotRange{Start:
slotRange.Start, Stop: deleteSlotRange.Start - 1})
- } else if deleteSlotRange.Start > slotRange.Start &&
deleteSlotRange.Stop < slotRange.Stop {
- source = append(source, SlotRange{Start:
slotRange.Start, Stop: deleteSlotRange.Start - 1})
- source = append(source, SlotRange{Start:
deleteSlotRange.Stop + 1., Stop: slotRange.Stop})
+func RemoveSlotFromSlotRanges(source SlotRanges, slot int) SlotRanges {
+ sort.Slice(source, func(i, j int) bool {
+ return source[i].Start < source[j].Start
+ })
+ if !source.Contains(slot) {
+ return source
+ }
+ for i, slotRange := range source {
+ if slotRange.Contains(slot) {
+ if slotRange.Start == slot && slotRange.Stop == slot {
+ source = append(source[0:i], source[i+1:]...)
+ } else if slotRange.Start == slot {
+ source[i].Start = slot + 1
+ } else if slotRange.Stop == slot {
+ source[i].Stop = slot - 1
+ } else {
+ tmp := make(SlotRanges, len(source)+1)
+ copy(tmp, source[0:i])
+ tmp[i] = SlotRange{Start: slotRange.Start,
Stop: slot - 1}
+ tmp[i+1] = SlotRange{Start: slot + 1, Stop:
slotRange.Stop}
+ copy(tmp[i+2:], source[i+1:])
+ source = tmp
}
- break
- }
- if skip {
- delIdx++
}
}
return source
}
-func SpiltSlotRange(number int) []SlotRange {
+func CalculateSlotRanges(n int) SlotRanges {
var slots []SlotRange
- rangeSize := (MaxSlotID + 1) / number
- for i := 0; i < number; i++ {
- if i != number-1 {
+ rangeSize := (MaxSlotID + 1) / n
+ for i := 0; i < n; i++ {
+ if i != n-1 {
slots = append(slots, SlotRange{Start: i * rangeSize,
Stop: (i+1)*rangeSize - 1})
} else {
slots = append(slots, SlotRange{Start: i * rangeSize,
Stop: MaxSlotID})
diff --git a/store/slot_test.go b/store/slot_test.go
index e5d2ce2..f5740af 100644
--- a/store/slot_test.go
+++ b/store/slot_test.go
@@ -57,113 +57,79 @@ func TestSlotRange_Parse(t *testing.T) {
assert.NotNil(t, err)
}
-func TestSlotRange_MergeSlotRanges(t *testing.T) {
- range1 := SlotRange{
- Start: 0,
- Stop: 8191,
- }
- range2 := SlotRange{
- Start: 8192,
- Stop: 16383,
- }
- newSlot := MergeSlotRanges([]SlotRange{range1}, []SlotRange{range2})
- assert.Equal(t, 1, len(newSlot))
- assert.Equal(t, 0, newSlot[0].Start)
- assert.Equal(t, 16383, newSlot[0].Stop)
-
- range3 := SlotRange{
- Start: 0,
- Stop: 8199,
- }
- range4 := SlotRange{
- Start: 8192,
- Stop: 16383,
- }
- newSlot = MergeSlotRanges([]SlotRange{range3}, []SlotRange{range4})
- assert.Equal(t, 1, len(newSlot))
- assert.Equal(t, 0, newSlot[0].Start)
- assert.Equal(t, 16383, newSlot[0].Stop)
+func TestAddSlotToSlotRanges(t *testing.T) {
+ slotRanges := SlotRanges{
+ {Start: 1, Stop: 20},
+ {Start: 101, Stop: 199},
+ {Start: 201, Stop: 300},
+ }
+ slotRanges = AddSlotToSlotRanges(slotRanges, 0)
+ require.Equal(t, 3, len(slotRanges))
+ require.EqualValues(t, SlotRange{Start: 0, Stop: 20}, slotRanges[0])
+
+ slotRanges = AddSlotToSlotRanges(slotRanges, 21)
+ require.Equal(t, 3, len(slotRanges))
+ require.EqualValues(t, SlotRange{Start: 0, Stop: 21}, slotRanges[0])
+
+ slotRanges = AddSlotToSlotRanges(slotRanges, 50)
+ require.Equal(t, 4, len(slotRanges))
+ require.EqualValues(t, SlotRange{Start: 50, Stop: 50}, slotRanges[1])
+
+ slotRanges = AddSlotToSlotRanges(slotRanges, 200)
+ require.Equal(t, 3, len(slotRanges))
+ require.EqualValues(t, SlotRange{Start: 101, Stop: 300}, slotRanges[2])
+
+ slotRanges = AddSlotToSlotRanges(slotRanges, 400)
+ require.Equal(t, 4, len(slotRanges))
+ require.EqualValues(t, SlotRange{Start: 400, Stop: 400}, slotRanges[3])
}
-func TestSlotRange_RemoveSlotRanges(t *testing.T) {
- range1 := SlotRange{
- Start: 0,
- Stop: 8191,
- }
- range2 := SlotRange{
- Start: 0,
- Stop: 0,
- }
- range3 := SlotRange{
- Start: 8191,
- Stop: 8191,
+func TestRemoveSlotRanges(t *testing.T) {
+ slotRanges := SlotRanges{
+ {Start: 1, Stop: 20},
+ {Start: 101, Stop: 199},
+ {Start: 201, Stop: 300},
}
- newSlot := RemoveSlotRanges([]SlotRange{range1}, []SlotRange{range2})
- assert.Equal(t, 1, newSlot[0].Start)
- assert.Equal(t, 8191, newSlot[0].Stop)
- newSlot = RemoveSlotRanges([]SlotRange{range1}, []SlotRange{range3})
- assert.Equal(t, 0, newSlot[0].Start)
- assert.Equal(t, 8190, newSlot[0].Stop)
-
- range1 = SlotRange{
- Start: 0,
- Stop: 8191,
- }
- range2 = SlotRange{
- Start: 8192,
- Stop: 16383,
- }
- range3 = SlotRange{
- Start: 0,
- Stop: 8192,
- }
- newSlot = RemoveSlotRanges([]SlotRange{range1, range2},
[]SlotRange{range3})
- assert.Equal(t, 8193, newSlot[0].Start)
- assert.Equal(t, 16383, newSlot[0].Stop)
+ slotRanges = RemoveSlotFromSlotRanges(slotRanges, 0)
+ require.Equal(t, 3, len(slotRanges))
+ require.EqualValues(t, SlotRange{Start: 1, Stop: 20}, slotRanges[0])
- range1 = SlotRange{
- Start: 0,
- Stop: 8191,
- }
- range2 = SlotRange{
- Start: 8192,
- Stop: 16383,
- }
- range3 = SlotRange{
- Start: 1,
- Stop: 8192,
- }
- newSlot = RemoveSlotRanges([]SlotRange{range1, range2},
[]SlotRange{range3})
- assert.Equal(t, 0, newSlot[0].Start)
- assert.Equal(t, 0, newSlot[0].Stop)
- assert.Equal(t, 8193, newSlot[1].Start)
- assert.Equal(t, 16383, newSlot[1].Stop)
-
- range1 = SlotRange{
- Start: 0,
- Stop: 8191,
- }
- range2 = SlotRange{
- Start: 8192,
- Stop: 16383,
- }
- range3 = SlotRange{
- Start: 1,
- Stop: 8192,
- }
- range4 := SlotRange{
- Start: 8194,
- Stop: 16383,
- }
- newSlot = RemoveSlotRanges([]SlotRange{range1, range2},
[]SlotRange{range3, range4})
- assert.Equal(t, 0, newSlot[0].Start)
- assert.Equal(t, 0, newSlot[0].Stop)
- assert.Equal(t, 8193, newSlot[1].Start)
- assert.Equal(t, 8193, newSlot[1].Stop)
+ slotRanges = RemoveSlotFromSlotRanges(slotRanges, 21)
+ require.Equal(t, 3, len(slotRanges))
+ require.EqualValues(t, SlotRange{Start: 1, Stop: 20}, slotRanges[0])
+
+ slotRanges = RemoveSlotFromSlotRanges(slotRanges, 20)
+ require.Equal(t, 3, len(slotRanges))
+ require.EqualValues(t, SlotRange{Start: 1, Stop: 19}, slotRanges[0])
+
+ slotRanges = RemoveSlotFromSlotRanges(slotRanges, 150)
+ require.Equal(t, 4, len(slotRanges))
+ require.EqualValues(t, SlotRange{Start: 101, Stop: 149}, slotRanges[1])
+
+ slotRanges = RemoveSlotFromSlotRanges(slotRanges, 101)
+ require.Equal(t, 4, len(slotRanges))
+ require.EqualValues(t, SlotRange{Start: 102, Stop: 149}, slotRanges[1])
+
+ slotRanges = RemoveSlotFromSlotRanges(slotRanges, 199)
+ require.Equal(t, 4, len(slotRanges))
+ require.EqualValues(t, SlotRange{Start: 151, Stop: 198}, slotRanges[2])
+
+ slotRanges = RemoveSlotFromSlotRanges(slotRanges, 300)
+ require.Equal(t, 4, len(slotRanges))
+ require.EqualValues(t, SlotRange{Start: 201, Stop: 299}, slotRanges[3])
+
+ slotRanges = RemoveSlotFromSlotRanges(slotRanges, 298)
+ require.Equal(t, 5, len(slotRanges))
+ require.EqualValues(t, SlotRange{Start: 201, Stop: 297}, slotRanges[3])
+ require.EqualValues(t, SlotRange{Start: 299, Stop: 299}, slotRanges[4])
+
+ slotRanges = RemoveSlotFromSlotRanges(slotRanges, 299)
+ require.Equal(t, 4, len(slotRanges))
+ require.EqualValues(t, SlotRange{Start: 201, Stop: 297}, slotRanges[3])
}
-func TestSlotRange_SpiltSlotRange(t *testing.T) {
- slots := SpiltSlotRange(5)
+func TestCalculateSlotRanges(t *testing.T) {
+ slots := CalculateSlotRanges(5)
assert.Equal(t, 0, slots[0].Start)
assert.Equal(t, 3275, slots[0].Stop)
assert.Equal(t, 13104, slots[4].Start)