This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 9b4932d5 test: finish migrating xtream tests (#1003)
9b4932d5 is described below
commit 9b4932d560309572a0281651fe81d769fd9fdece
Author: tison <[email protected]>
AuthorDate: Mon Oct 17 13:26:48 2022 +0800
test: finish migrating xtream tests (#1003)
Signed-off-by: tison <[email protected]>
---
tests/gocase/unit/type/stream/stream_test.go | 427 ++++++++++++++++++++++++++
tests/gocase/unit/type/zset/zset_test.go | 10 +-
tests/gocase/util/slices.go | 26 ++
tests/tcl/tests/test_helper.tcl | 1 -
tests/tcl/tests/unit/type/stream.tcl | 439 ---------------------------
5 files changed, 455 insertions(+), 448 deletions(-)
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index 22893140..8d218c4a 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -24,7 +24,9 @@ import (
"fmt"
"math/rand"
"strconv"
+ "strings"
"testing"
+ "time"
"github.com/apache/incubator-kvrocks/tests/gocase/util"
"github.com/go-redis/redis/v9"
@@ -221,6 +223,431 @@ func TestStream(t *testing.T) {
require.EqualValues(t, "1641544570597-0", items[0].ID)
require.EqualValues(t, "1641544570597-1", items[1].ID)
})
+
+ t.Run("XADD mass insertion and XLEN", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "mystream").Err())
+ insertIntoStreamKey(t, rdb, "mystream")
+ items := rdb.XRange(ctx, "mystream", "-", "+").Val()
+ require.Len(t, items, 10000)
+ for i := 0; i < 10000; i++ {
+ require.Subset(t, items[i].Values,
map[string]interface{}{"item": strconv.Itoa(i)})
+ }
+ })
+
+ t.Run("XADD with ID 0-0", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "otherstream").Err())
+ require.Error(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: "otherstream",
+ ID: "0-0",
+ Values: []string{"k", "v"},
+ }).Err())
+ require.Zero(t, rdb.Exists(ctx, "otherstream").Val())
+ })
+
+ t.Run("XRANGE COUNT works as expected", func(t *testing.T) {
+ require.Len(t, rdb.XRangeN(ctx, "mystream", "-", "+",
10).Val(), 10)
+ })
+
+ t.Run("XREVRANGE COUNT works as expected", func(t *testing.T) {
+ require.Len(t, rdb.XRevRangeN(ctx, "mystream", "+", "-",
10).Val(), 10)
+ })
+
+ t.Run("XRANGE can be used to iterate the whole stream", func(t
*testing.T) {
+ lastID, c := "-", 0
+ for {
+ items := rdb.XRangeN(ctx, "mystream", lastID, "+",
100).Val()
+ if len(items) == 0 {
+ break
+ }
+ for _, item := range items {
+ require.Subset(t, item.Values,
map[string]interface{}{"item": strconv.Itoa(c)})
+ c++
+ }
+ lastID = streamNextID(t, items[len(items)-1].ID)
+ }
+ require.Equal(t, 10000, c)
+ })
+
+ t.Run("XREVRANGE returns the reverse of XRANGE", func(t *testing.T) {
+ items := rdb.XRange(ctx, "mystream", "-", "+").Val()
+ revItems := rdb.XRevRange(ctx, "mystream", "+", "-").Val()
+ util.ReverseSlice(revItems)
+ require.EqualValues(t, items, revItems)
+ })
+
+ t.Run("XRANGE exclusive ranges", func(t *testing.T) {
+ ids := []string{"0-1", "0-18446744073709551615", "1-0", "42-0",
"42-42", "18446744073709551615-18446744073709551614",
"18446744073709551615-18446744073709551615"}
+ total := len(ids)
+ require.NoError(t, rdb.Do(ctx, "MULTI").Err())
+ // DEL returns "QUEUED" here, so we use Do to avoid ParseInt.
+ require.NoError(t, rdb.Do(ctx, "DEL", "vipstream").Err())
+ for _, id := range ids {
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: "vipstream",
+ ID: id,
+ Values: []string{"foo", "bar"},
+ }).Err())
+ }
+ require.NoError(t, rdb.Do(ctx, "EXEC").Err())
+ require.Len(t, rdb.XRange(ctx, "vipstream", "-", "+").Val(),
total)
+ require.Len(t, rdb.XRange(ctx, "vipstream", "("+ids[0],
"+").Val(), total-1)
+ require.Len(t, rdb.XRange(ctx, "vipstream", "-",
"("+ids[total-1]).Val(), total-1)
+ require.Len(t, rdb.XRange(ctx, "vipstream", "(0-1",
"(1-0").Val(), 1)
+ require.Len(t, rdb.XRange(ctx, "vipstream", "(1-0",
"(42-42").Val(), 1)
+ require.ErrorContains(t, rdb.XRange(ctx, "vipstream", "(-",
"+").Err(), "ERR")
+ require.ErrorContains(t, rdb.XRange(ctx, "vipstream", "-",
"(+").Err(), "ERR")
+ require.ErrorContains(t, rdb.XRange(ctx, "vipstream",
"(18446744073709551615-18446744073709551615", "+").Err(), "ERR")
+ require.ErrorContains(t, rdb.XRange(ctx, "vipstream", "-",
"(0-0").Err(), "ERR")
+ })
+
+ t.Run("XREAD with non empty stream", func(t *testing.T) {
+ r := rdb.XRead(ctx, &redis.XReadArgs{
+ Streams: []string{"mystream", "0-0"},
+ Count: 1,
+ }).Val()
+ require.Len(t, r, 1)
+ require.Equal(t, "mystream", r[0].Stream)
+ require.Len(t, r[0].Messages, 1)
+ require.Subset(t, r[0].Messages[0].Values,
map[string]interface{}{"item": "0"})
+ })
+
+ t.Run("Non blocking XREAD with empty streams", func(t *testing.T) {
+ // go-redis blocks underneath; fallback to Do
+ require.Empty(t, rdb.Do(ctx, "XREAD", "STREAMS", "s1", "s2",
"0-0", "0-0").Val())
+ })
+
+ t.Run("XREAD with non empty second stream", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "mystream").Err())
+ insertIntoStreamKey(t, rdb, "mystream")
+ r := rdb.XRead(ctx, &redis.XReadArgs{
+ Streams: []string{"nostream", "mystream", "0-0", "0-0"},
+ Count: 1,
+ }).Val()
+ require.Len(t, r, 1)
+ require.Equal(t, "mystream", r[0].Stream)
+ require.Len(t, r[0].Messages, 1)
+ require.Subset(t, r[0].Messages[0].Values,
map[string]interface{}{"item": "0"})
+ })
+
+ t.Run("Blocking XREAD waiting new data", func(t *testing.T) {
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "s2",
Values: []string{"old", "abcd1234"}}).Err())
+ c := srv.NewClient()
+ defer func() { require.NoError(t, c.Close()) }()
+ ch := make(chan []redis.XStream)
+ go func() {
+ ch <- c.XRead(ctx, &redis.XReadArgs{Streams:
[]string{"s1", "s2", "s3", "$", "$", "$"}, Block: 20 * time.Second}).Val()
+ }()
+ require.Eventually(t, func() bool {
+ cnt, _ := strconv.Atoi(util.FindInfoEntry(rdb,
"blocked_clients"))
+ return cnt > 0
+ }, 5*time.Second, 100*time.Millisecond)
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "s2",
Values: []string{"new", "abcd1234"}}).Err())
+ r := <-ch
+ require.Len(t, r, 1)
+ require.Equal(t, "s2", r[0].Stream)
+ require.Len(t, r[0].Messages, 1)
+ require.Subset(t, r[0].Messages[0].Values,
map[string]interface{}{"new": "abcd1234"})
+ })
+
+ t.Run("Blocking XREAD waiting old data", func(t *testing.T) {
+ c := srv.NewClient()
+ defer func() { require.NoError(t, c.Close()) }()
+ ch := make(chan []redis.XStream)
+ go func() {
+ ch <- c.XRead(ctx, &redis.XReadArgs{Streams:
[]string{"s1", "s2", "s3", "$", "0-0", "$"}, Block: 20 * time.Second}).Val()
+ }()
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "s2",
Values: []string{"foo", "abcd1234"}}).Err())
+ r := <-ch
+ require.Len(t, r, 1)
+ require.Equal(t, "s2", r[0].Stream)
+ require.GreaterOrEqual(t, len(r[0].Messages), 2)
+ require.Subset(t, r[0].Messages[0].Values,
map[string]interface{}{"old": "abcd1234"})
+ })
+
+ t.Run("Blocking XREAD will not reply with an empty array", func(t
*testing.T) {
+ require.NoError(t, rdb.Del(ctx, "s1").Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "s1",
ID: "666", Values: []string{"f", "v"}}).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "s1",
ID: "667", Values: []string{"f2", "v2"}}).Err())
+ require.NoError(t, rdb.XDel(ctx, "s1", "667").Err())
+ c := srv.NewTCPClient()
+ defer func() { require.NoError(t, c.Close()) }()
+ require.NoError(t, c.WriteArgs("XREAD", "BLOCK", "10",
"STREAMS", "s1", "666"))
+ time.Sleep(20 * time.Millisecond)
+ c.MustRead(t, "$-1") // before the fix, client didn't even
block, but was served synchronously with {s1 {}}
+ })
+
+ t.Run("Blocking XREAD for stream that ran dry (redis issue #5299)",
func(t *testing.T) {
+ // add an entry then delete it, now stream's last_id is 666.
+ require.NoError(t, rdb.Del(ctx, "mystream").Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"mystream", ID: "666", Values: []string{"key", "value"}}).Err())
+ require.NoError(t, rdb.XDel(ctx, "mystream", "666").Err())
+ // pass an ID smaller than stream's last_id, released on timeout
+ c := srv.NewClient()
+ defer func() { require.NoError(t, c.Close()) }()
+ require.Empty(t, c.XRead(ctx, &redis.XReadArgs{Streams:
[]string{"mystream", "665"}, Block: 10 * time.Millisecond}).Val())
+ // throw an error if the ID equal or smaller than the last_id
+ util.ErrorRegexp(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"mystream", ID: "665", Values: []string{"key", "value"}}).Err(),
"ERR.*equal.*smaller.*")
+ util.ErrorRegexp(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"mystream", ID: "666", Values: []string{"key", "value"}}).Err(),
"ERR.*equal.*smaller.*")
+ // entered blocking state and then release because of the new
entry
+ ch := make(chan []redis.XStream)
+ go func() {
+ ch <- c.XRead(ctx, &redis.XReadArgs{Streams:
[]string{"mystream", "665"}}).Val()
+ }()
+ require.Eventually(t, func() bool {
+ cnt, _ := strconv.Atoi(util.FindInfoEntry(rdb,
"blocked_clients"))
+ return cnt == 1
+ }, 5*time.Second, 100*time.Millisecond)
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"mystream", ID: "667", Values: []string{"key", "value"}}).Err())
+ r := <-ch
+ require.Len(t, r, 1)
+ require.Equal(t, "mystream", r[0].Stream)
+ require.Len(t, r[0].Messages, 1)
+ require.Equal(t, "667-0", r[0].Messages[0].ID)
+ require.Subset(t, r[0].Messages[0].Values,
map[string]interface{}{"key": "value"})
+ })
+
+ t.Run("XREAD with same stream name multiple times should work", func(t
*testing.T) {
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "s2",
Values: []string{"old", "abcd1234"}}).Err())
+ c := srv.NewClient()
+ defer func() { require.NoError(t, c.Close()) }()
+ ch := make(chan []redis.XStream)
+ go func() {
+ ch <- c.XRead(ctx, &redis.XReadArgs{Streams:
[]string{"s2", "s2", "s2", "$", "$", "$"}, Block: 20 * time.Second}).Val()
+ }()
+ require.Eventually(t, func() bool {
+ cnt, _ := strconv.Atoi(util.FindInfoEntry(rdb,
"blocked_clients"))
+ return cnt == 1
+ }, 5*time.Second, 100*time.Millisecond)
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "s2",
Values: []string{"new", "abcd1234"}}).Err())
+ r := <-ch
+ require.Len(t, r, 3)
+ require.Equal(t, "s2", r[0].Stream)
+ require.Len(t, r[0].Messages, 1)
+ require.Subset(t, r[0].Messages[0].Values,
map[string]interface{}{"new": "abcd1234"})
+ })
+
+ t.Run("XDEL basic test", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "somestream").Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"somestream", Values: []string{"foo", "value0"}}).Err())
+ id := rdb.XAdd(ctx, &redis.XAddArgs{Stream: "somestream",
Values: []string{"foo", "value1"}}).Val()
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"somestream", Values: []string{"foo", "value2"}}).Err())
+ require.NoError(t, rdb.XDel(ctx, "somestream", id).Err())
+ require.EqualValues(t, 2, rdb.XLen(ctx, "somestream").Val())
+ items := rdb.XRange(ctx, "somestream", "-", "+").Val()
+ require.Len(t, items, 2)
+ require.Subset(t, items[0].Values,
map[string]interface{}{"foo": "value0"})
+ require.Subset(t, items[1].Values,
map[string]interface{}{"foo": "value2"})
+ })
+
+ // Here the idea is to check the consistency of the stream data
structure as we remove all the elements down to zero elements.
+ t.Run("XDEL fuzz test", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "somestream").Err())
+ var ids []string
+ // add enough elements to have a few radix tree nodes inside
the stream
+ cnt := 0
+ for {
+ ids = append(ids, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"somestream", Values: map[string]interface{}{"item": cnt}}).Val())
+ cnt++
+ if rdb.XInfoStream(ctx, "somestream").Val().Length >
500 {
+ break
+ }
+ }
+ // Now remove all the elements till we reach an empty stream
and after every deletion,
+ // check that the stream is sane enough to report the right
number of elements with XRANGE:
+ // this will also force accessing the whole data structure to
check sanity.
+ require.EqualValues(t, cnt, rdb.XLen(ctx, "somestream").Val())
+ // We want to remove elements in random order to really test
the implementation in a better way.
+ rand.Shuffle(len(ids), func(i, j int) { ids[i], ids[j] =
ids[j], ids[i] })
+ for _, id := range ids {
+ require.EqualValues(t, 1, rdb.XDel(ctx, "somestream",
id).Val())
+ cnt--
+ require.EqualValues(t, cnt, rdb.XLen(ctx,
"somestream").Val())
+ // The test would be too slow calling XRANGE for every
iteration. Do it every 100 removal.
+ if cnt%100 == 0 {
+ require.Len(t, rdb.XRange(ctx, "somestream",
"-", "+").Val(), cnt)
+ }
+ }
+ })
+
+ t.Run("XRANGE fuzzing", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "mystream").Err())
+ insertIntoStreamKey(t, rdb, "mystream")
+ items := rdb.XRange(ctx, "mystream", "-", "+").Val()
+ lowID, highID := items[0].ID, items[len(items)-1].ID
+ for i := 0; i < 100; i++ {
+ start, end := streamRandomID(lowID, highID),
streamRandomID(lowID, highID)
+ realRange := rdb.XRange(ctx, "mystream", start,
end).Val()
+ fakeRange := streamSimulateXRANGE(items, start, end)
+ require.EqualValues(t, fakeRange, realRange,
fmt.Sprintf("start=%s, end=%s", start, end))
+ }
+ })
+
+ t.Run("XREVRANGE regression test for (redis issue #5006)", func(t
*testing.T) {
+ // add non compressed entries
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"teststream", ID: "1234567891230", Values: []string{"key1", "value1"}}).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"teststream", ID: "1234567891240", Values: []string{"key2", "value2"}}).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"teststream", ID: "1234567891250", Values: []string{"key3", "value3"}}).Err())
+ // add SAMEFIELD compressed entries
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"teststream2", ID: "1234567891230", Values: []string{"key1", "value1"}}).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"teststream2", ID: "1234567891240", Values: []string{"key1", "value2"}}).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"teststream2", ID: "1234567891250", Values: []string{"key1", "value3"}}).Err())
+ items := rdb.XRevRange(ctx, "teststream", "1234567891245",
"-").Val()
+ require.Len(t, items, 2)
+ require.EqualValues(t, redis.XMessage{ID: "1234567891240-0",
Values: map[string]interface{}{"key2": "value2"}}, items[0])
+ require.EqualValues(t, redis.XMessage{ID: "1234567891230-0",
Values: map[string]interface{}{"key1": "value1"}}, items[1])
+ items = rdb.XRevRange(ctx, "teststream2", "1234567891245",
"-").Val()
+ require.Len(t, items, 2)
+ require.EqualValues(t, redis.XMessage{ID: "1234567891240-0",
Values: map[string]interface{}{"key1": "value2"}}, items[0])
+ require.EqualValues(t, redis.XMessage{ID: "1234567891230-0",
Values: map[string]interface{}{"key1": "value1"}}, items[1])
+ })
+
+ t.Run("XREAD streamID edge (no-blocking)", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "x").Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "x",
ID: "1-1", Values: []string{"f", "v"}}).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "x",
ID: "1-18446744073709551615", Values: []string{"f", "v"}}).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "x",
ID: "2-1", Values: []string{"f", "v"}}).Err())
+ r := rdb.XRead(ctx, &redis.XReadArgs{Streams: []string{"x",
"1-18446744073709551615"}}).Val()
+ require.Len(t, r, 1)
+ require.Equal(t, "x", r[0].Stream)
+ require.Len(t, r[0].Messages, 1)
+ require.Equal(t, "2-1", r[0].Messages[0].ID)
+ require.Equal(t, map[string]interface{}{"f": "v"},
r[0].Messages[0].Values)
+ })
+
+ t.Run("XREAD streamID edge (blocking)", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "x").Err())
+ c := srv.NewClient()
+ defer func() { require.NoError(t, c.Close()) }()
+ ch := make(chan []redis.XStream)
+ go func() {
+ ch <- c.XRead(ctx, &redis.XReadArgs{Streams:
[]string{"x", "1-18446744073709551615"}}).Val()
+ }()
+ require.Eventually(t, func() bool {
+ cnt, _ := strconv.Atoi(util.FindInfoEntry(rdb,
"blocked_clients"))
+ return cnt == 1
+ }, 5*time.Second, 100*time.Millisecond)
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "x",
ID: "1-1", Values: []string{"f", "v"}}).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "x",
ID: "1-18446744073709551615", Values: []string{"f", "v"}}).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "x",
ID: "2-1", Values: []string{"f", "v"}}).Err())
+ r := <-ch
+ require.Len(t, r, 1)
+ require.Equal(t, "x", r[0].Stream)
+ require.Len(t, r[0].Messages, 1)
+ require.Equal(t, "2-1", r[0].Messages[0].ID)
+ require.Equal(t, map[string]interface{}{"f": "v"},
r[0].Messages[0].Values)
+ })
+
+ t.Run("XADD streamID edge", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "x").Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "x",
ID: "2577343934890-18446744073709551615", Values: []string{"f", "v"}}).Err())
// we need the timestamp to be in the future
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "x",
Values: []string{"f2", "v2"}}).Err())
+ items := rdb.XRange(ctx, "x", "-", "+").Val()
+ require.Len(t, items, 2)
+ require.EqualValues(t, redis.XMessage{ID:
"2577343934890-18446744073709551615", Values: map[string]interface{}{"f":
"v"}}, items[0])
+ require.EqualValues(t, redis.XMessage{ID: "2577343934891-0",
Values: map[string]interface{}{"f2": "v2"}}, items[1])
+ })
+
+ t.Run("XTRIM with MAXLEN option basic test", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "mystream").Err())
+ for i := 0; i < 1000; i++ {
+ if rand.Float64() < 0.9 {
+ require.NoError(t, rdb.XAdd(ctx,
&redis.XAddArgs{Stream: "mystream", Values: map[string]interface{}{"xitem":
i}}).Err())
+ } else {
+ require.NoError(t, rdb.XAdd(ctx,
&redis.XAddArgs{Stream: "mystream", Values: map[string]interface{}{"yitem":
i}}).Err())
+ }
+ }
+ require.NoError(t, rdb.XTrimMaxLen(ctx, "mystream", 666).Err())
+ require.EqualValues(t, 666, rdb.XLen(ctx, "mystream").Val())
+ require.NoError(t, rdb.XTrimMaxLen(ctx, "mystream", 555).Err())
+ require.EqualValues(t, 555, rdb.XLen(ctx, "mystream").Val())
+ })
+
+ t.Run("XADD with LIMIT consecutive calls", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "mystream").Err())
+ for i := 0; i < 100; i++ {
+ require.NoError(t, rdb.XAdd(ctx,
&redis.XAddArgs{Stream: "mystream", Values: map[string]interface{}{"xitem":
"v"}}).Err())
+ }
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"mystream", MaxLen: 55, Values: map[string]interface{}{"xitem": "v"}}).Err())
+ require.EqualValues(t, 55, rdb.XLen(ctx, "mystream").Val())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"mystream", MaxLen: 55, Values: map[string]interface{}{"xitem": "v"}}).Err())
+ require.EqualValues(t, 55, rdb.XLen(ctx, "mystream").Val())
+ })
+}
+
+// streamSimulateXRANGE simulates Redis XRANGE implementation in Golang.
+func streamSimulateXRANGE(items []redis.XMessage, start, end string)
[]redis.XMessage {
+ result := make([]redis.XMessage, 0)
+ for _, item := range items {
+ if streamCompareID(item.ID, start) >= 0 &&
streamCompareID(item.ID, end) <= 0 {
+ result = append(result, item)
+ }
+ }
+ return result
+}
+
+func streamCompareID(a, b string) int {
+ aParts, bParts := strings.Split(a, "-"), strings.Split(b, "-")
+ aMs, _ := strconv.Atoi(aParts[0])
+ aSeq, _ := strconv.Atoi(aParts[1])
+ bMs, _ := strconv.Atoi(bParts[0])
+ bSeq, _ := strconv.Atoi(bParts[1])
+ if aMs > bMs {
+ return 1
+ }
+ if aMs < bMs {
+ return -1
+ }
+ if aSeq > bSeq {
+ return 1
+ }
+ if aSeq < bSeq {
+ return -1
+ }
+ return 0
+}
+
+// streamRandomID generates a random stream entry ID with the ms part between
min and max and
+// a low sequence number (0 - 999 range), in order to stress test XRANGE
against streamSimulateXRANGE.
+func streamRandomID(minID, maxID string) string {
+ minParts, maxParts := strings.Split(minID, "-"), strings.Split(maxID,
"-")
+ minMs, _ := strconv.Atoi(minParts[0])
+ maxMs, _ := strconv.Atoi(maxParts[0])
+ delta := int64(maxMs - minMs + 1)
+ ms, seq := int64(minMs)+util.RandomInt(delta), util.RandomInt(1000)
+ return fmt.Sprintf("%d-%d", ms, seq)
+}
+
+// streamNextID returns the ID immediately greater than the specified one.
+//
+// Note that this function does not care to handle 'seq' overflow since it's a
64 bit value.
+func streamNextID(t *testing.T, id string) string {
+ parts := strings.Split(id, "-")
+ require.Len(t, parts, 2)
+ ms, seq := parts[0], parts[1]
+ seqN, err := strconv.Atoi(seq)
+ require.NoError(t, err)
+ return fmt.Sprintf("%s-%d", ms, seqN+1)
+}
+
+func insertIntoStreamKey(t *testing.T, rdb *redis.Client, key string) {
+ ctx := context.Background()
+ require.NoError(t, rdb.Do(ctx, "MULTI").Err())
+ for i := 0; i < 10000; i++ {
+ // From time to time insert a field with a different set
+ // of fields in order to stress the stream compression code.
+ if rand.Float64() < 0.9 {
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: key,
+ Values: map[string]interface{}{"item": i},
+ }).Err())
+ } else {
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: key,
+ Values: map[string]interface{}{"item": i,
"otherfield": "foo"},
+ }).Err())
+ }
+ }
+ require.NoError(t, rdb.Do(ctx, "EXEC").Err())
}
func TestStreamOffset(t *testing.T) {
diff --git a/tests/gocase/unit/type/zset/zset_test.go
b/tests/gocase/unit/type/zset/zset_test.go
index 777f821d..d6ba6356 100644
--- a/tests/gocase/unit/type/zset/zset_test.go
+++ b/tests/gocase/unit/type/zset/zset_test.go
@@ -67,12 +67,6 @@ func createDefaultLexZset(rdb *redis.Client, ctx
context.Context) {
{0, "omega"}})
}
-func reverse(s []redis.Z) {
- for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
- s[i], s[j] = s[j], s[i]
- }
-}
-
func basicTests(t *testing.T, rdb *redis.Client, ctx context.Context, encoding
string) {
t.Run(fmt.Sprintf("Check encoding - %s", encoding), func(t *testing.T) {
rdb.Del(ctx, "ztmp")
@@ -374,7 +368,7 @@ func basicTests(t *testing.T, rdb *redis.Client, ctx
context.Context, encoding s
{math.Inf(1), "g"}}
createZset(rdb, ctx, "mzset", zsetInt)
require.Equal(t, zsetInt, rdb.ZRangeByScoreWithScores(ctx,
"mzset", &redis.ZRangeBy{Min: "-inf", Max: "+inf"}).Val())
- reverse(zsetInt)
+ util.ReverseSlice(zsetInt)
require.Equal(t, zsetInt, rdb.ZRevRangeByScoreWithScores(ctx,
"mzset", &redis.ZRangeBy{Min: "-inf", Max: "+inf"}).Val())
zsetDouble := []redis.Z{
@@ -386,7 +380,7 @@ func basicTests(t *testing.T, rdb *redis.Client, ctx
context.Context, encoding s
{1.004, "f"}}
createZset(rdb, ctx, "mzset", zsetDouble)
require.Equal(t, zsetDouble, rdb.ZRangeByScoreWithScores(ctx,
"mzset", &redis.ZRangeBy{Min: "-inf", Max: "+inf"}).Val())
- reverse(zsetDouble)
+ util.ReverseSlice(zsetDouble)
require.Equal(t, zsetDouble,
rdb.ZRevRangeByScoreWithScores(ctx, "mzset", &redis.ZRangeBy{Min: "-inf", Max:
"+inf"}).Val())
})
diff --git a/tests/gocase/util/slices.go b/tests/gocase/util/slices.go
new file mode 100644
index 00000000..91abdcaf
--- /dev/null
+++ b/tests/gocase/util/slices.go
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package util
+
+func ReverseSlice[T any](s []T) {
+ for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
+ s[i], s[j] = s[j], s[i]
+ }
+}
diff --git a/tests/tcl/tests/test_helper.tcl b/tests/tcl/tests/test_helper.tcl
index 3b5c87ab..22ac3530 100644
--- a/tests/tcl/tests/test_helper.tcl
+++ b/tests/tcl/tests/test_helper.tcl
@@ -34,7 +34,6 @@ source tests/support/util.tcl
set ::all_tests {
unit/type/list
- unit/type/stream
integration/slotmigrate
}
diff --git a/tests/tcl/tests/unit/type/stream.tcl
b/tests/tcl/tests/unit/type/stream.tcl
deleted file mode 100644
index 62765e33..00000000
--- a/tests/tcl/tests/unit/type/stream.tcl
+++ /dev/null
@@ -1,439 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Copyright (c) 2006-2020, Salvatore Sanfilippo
-# See bundled license file licenses/LICENSE.redis for details.
-
-# This file is copied and modified from the Redis project
-# https://github.com/redis/redis/blob/unstable/tests/unit/type/stream.tcl
-
-# return value is like strcmp() and similar.
-proc streamCompareID {a b} {
- if {$a eq $b} {return 0}
- lassign [split $a -] a_ms a_seq
- lassign [split $b -] b_ms b_seq
- if {$a_ms > $b_ms} {return 1}
- if {$a_ms < $b_ms} {return -1}
- # Same ms case, compare seq.
- if {$a_seq > $b_seq} {return 1}
- if {$a_seq < $b_seq} {return -1}
-}
-
-# return the ID immediately greater than the specified one.
-# Note that this function does not care to handle 'seq' overflow
-# since it's a 64 bit value.
-proc streamNextID {id} {
- lassign [split $id -] ms seq
- incr seq
- join [list $ms $seq] -
-}
-
-# Generate a random stream entry ID with the ms part between min and max
-# and a low sequence number (0 - 999 range), in order to stress test
-# XRANGE against a Tcl implementation implementing the same concept
-# with Tcl-only code in a linear array.
-proc streamRandomID {min_id max_id} {
- lassign [split $min_id -] min_ms min_seq
- lassign [split $max_id -] max_ms max_seq
- set delta [expr {$max_ms-$min_ms+1}]
- set ms [expr {$min_ms+[randomInt $delta]}]
- set seq [randomInt 1000]
- return $ms-$seq
-}
-
-# Tcl-side implementation of XRANGE to perform fuzz testing in the Redis
-# XRANGE implementation.
-proc streamSimulateXRANGE {items start end} {
- set res {}
- foreach i $items {
- set this_id [lindex $i 0]
- if {[streamCompareID $this_id $start] >= 0} {
- if {[streamCompareID $this_id $end] <= 0} {
- lappend res $i
- }
- }
- }
- return $res
-}
-
-set content {} ;# Will be populated with Tcl side copy of the stream content.
-
-start_server {
- tags {"stream"}
-} {
- proc insert_into_stream_key {key {count 10000}} {
- r multi
- for {set j 0} {$j < $count} {incr j} {
- # From time to time insert a field with a different set
- # of fields in order to stress the stream compression code.
- if {rand() < 0.9} {
- r XADD $key * item $j
- } else {
- r XADD $key * item $j otherfield foo
- }
- }
- r exec
- }
-
- test {XADD mass insertion and XLEN} {
- r DEL mystream
- insert_into_stream_key mystream
-
- set items [r XRANGE mystream - +]
- for {set j 0} {$j < 10000} {incr j} {
- assert {[lrange [lindex $items $j 1] 0 1] eq [list item $j]}
- }
- assert {[r xlen mystream] == $j}
- }
-
- test {XADD with ID 0-0} {
- r DEL otherstream
- catch {r XADD otherstream 0-0 k v} err
- assert {[r EXISTS otherstream] == 0}
- }
-
- test {XRANGE COUNT works as expected} {
- assert {[llength [r xrange mystream - + COUNT 10]] == 10}
- }
-
- test {XREVRANGE COUNT works as expected} {
- assert {[llength [r xrevrange mystream + - COUNT 10]] == 10}
- }
-
- test {XRANGE can be used to iterate the whole stream} {
- set last_id "-"
- set j 0
- while 1 {
- set elements [r xrange mystream $last_id + COUNT 100]
- if {[llength $elements] == 0} break
- foreach e $elements {
- assert {[lrange [lindex $e 1] 0 1] eq [list item $j]}
- incr j;
- }
- set last_id [streamNextID [lindex $elements end 0]]
- }
- assert {$j == 10000}
- }
-
- test {XREVRANGE returns the reverse of XRANGE} {
- assert {[r xrange mystream - +] == [lreverse [r xrevrange mystream +
-]]}
- }
-
- test {XRANGE exclusive ranges} {
- set ids {0-1 0-18446744073709551615 1-0 42-0 42-42
- 18446744073709551615-18446744073709551614
- 18446744073709551615-18446744073709551615}
- set total [llength $ids]
- r multi
- r DEL vipstream
- foreach id $ids {
- r XADD vipstream $id foo bar
- }
- r exec
- assert {[llength [r xrange vipstream - +]] == $total}
- assert {[llength [r xrange vipstream ([lindex $ids 0] +]] == $total-1}
- assert {[llength [r xrange vipstream - ([lindex $ids $total-1]]] ==
$total-1}
- assert {[llength [r xrange vipstream (0-1 (1-0]] == 1}
- assert {[llength [r xrange vipstream (1-0 (42-42]] == 1}
- catch {r xrange vipstream (- +} e
- assert_match {ERR*} $e
- catch {r xrange vipstream - (+} e
- assert_match {ERR*} $e
- catch {r xrange vipstream (18446744073709551615-18446744073709551615
+} e
- assert_match {ERR*} $e
- catch {r xrange vipstream - (0-0} e
- assert_match {ERR*} $e
- }
-
- test {XREAD with non empty stream} {
- set res [r XREAD COUNT 1 STREAMS mystream 0-0]
- assert {[lrange [lindex $res 0 1 0 1] 0 1] eq {item 0}}
- }
-
- test {Non blocking XREAD with empty streams} {
- set res [r XREAD STREAMS s1{t} s2{t} 0-0 0-0]
- assert {$res eq {}}
- }
-
- test {XREAD with non empty second stream} {
- insert_into_stream_key mystream{t}
- set res [r XREAD COUNT 1 STREAMS nostream{t} mystream{t} 0-0 0-0]
- assert {[lindex $res 0 0] eq {mystream{t}}}
- assert {[lrange [lindex $res 0 1 0 1] 0 1] eq {item 0}}
- }
-
- test {Blocking XREAD waiting new data} {
- r XADD s2{t} * old abcd1234
- set rd [redis_deferring_client]
- $rd XREAD BLOCK 20000 STREAMS s1{t} s2{t} s3{t} $ $ $
- wait_for_blocked_client
- r XADD s2{t} * new abcd1234
- set res [$rd read]
- assert {[lindex $res 0 0] eq {s2{t}}}
- assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
- $rd close
- }
-
- test {Blocking XREAD waiting old data} {
- set rd [redis_deferring_client]
- $rd XREAD BLOCK 20000 STREAMS s1{t} s2{t} s3{t} $ 0-0 $
- r XADD s2{t} * foo abcd1234
- set res [$rd read]
- assert {[lindex $res 0 0] eq {s2{t}}}
- assert {[lindex $res 0 1 0 1] eq {old abcd1234}}
- $rd close
- }
-
- test {Blocking XREAD will not reply with an empty array} {
- r del s1
- r XADD s1 666 f v
- r XADD s1 667 f2 v2
- r XDEL s1 667
- set rd [redis_deferring_client]
- $rd XREAD BLOCK 10 STREAMS s1 666
- after 20
- assert {[$rd read] == {}} ;# before the fix, client didn't even block,
but was served synchronously with {s1 {}}
- $rd close
- }
-
- test "Blocking XREAD for stream that ran dry (issue #5299)" {
- set rd [redis_deferring_client]
-
- # Add a entry then delete it, now stream's last_id is 666.
- r DEL mystream
- r XADD mystream 666 key value
- r XDEL mystream 666
-
- # Pass a ID smaller than stream's last_id, released on timeout.
- $rd XREAD BLOCK 10 STREAMS mystream 665
- assert_equal [$rd read] {}
-
- # Throw an error if the ID equal or smaller than the last_id.
- assert_error ERR*equal*smaller* {r XADD mystream 665 key value}
- assert_error ERR*equal*smaller* {r XADD mystream 666 key value}
-
- # Entered blocking state and then release because of the new entry.
- $rd XREAD BLOCK 0 STREAMS mystream 665
- wait_for_blocked_clients_count 1
- r XADD mystream 667 key value
- assert_equal [$rd read] {{mystream {{667-0 {key value}}}}}
-
- $rd close
- }
-
- #test "XREAD: XADD + DEL should not awake client" {
- # set rd [redis_deferring_client]
- # r del s1
- # $rd XREAD BLOCK 20000 STREAMS s1 $
- # wait_for_blocked_clients_count 1
- # r multi
- # r XADD s1 * old abcd1234
- # r DEL s1
- # r exec
- # r XADD s1 * new abcd1234
- # set res [$rd read]
- # assert {[lindex $res 0 0] eq {s1}}
- # assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
- # $rd close
- #}
-
- #test "XREAD: XADD + DEL + LPUSH should not awake client" {
- # set rd [redis_deferring_client]
- # r del s1
- # $rd XREAD BLOCK 20000 STREAMS s1 $
- # wait_for_blocked_clients_count 1
- # r multi
- # r XADD s1 * old abcd1234
- # r DEL s1
- # r LPUSH s1 foo bar
- # r exec
- # r DEL s1
- # r XADD s1 * new abcd1234
- # set res [$rd read]
- # assert {[lindex $res 0 0] eq {s1}}
- # assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
- # $rd close
- #}
-
- test {XREAD with same stream name multiple times should work} {
- r XADD s2 * old abcd1234
- set rd [redis_deferring_client]
- $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $
- wait_for_blocked_clients_count 1
- r XADD s2 * new abcd1234
- set res [$rd read]
- assert {[lindex $res 0 0] eq {s2}}
- assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
- $rd close
- }
-
- #test {XREAD + multiple XADD inside transaction} {
- # r XADD s2 * old abcd1234
- # set rd [redis_deferring_client]
- # $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $
- # wait_for_blocked_clients_count 1
- # r MULTI
- # r XADD s2 * field one
- # r XADD s2 * field two
- # r XADD s2 * field three
- # r EXEC
- # set res [$rd read]
- # assert {[lindex $res 0 0] eq {s2}}
- # assert {[lindex $res 0 1 0 1] eq {field one}}
- # assert {[lindex $res 0 1 1 1] eq {field two}}
- # $rd close
- #}
-
- test {XDEL basic test} {
- r del somestream
- r xadd somestream * foo value0
- set id [r xadd somestream * foo value1]
- r xadd somestream * foo value2
- r xdel somestream $id
- assert {[r xlen somestream] == 2}
- set result [r xrange somestream - +]
- assert {[lindex $result 0 1 1] eq {value0}}
- assert {[lindex $result 1 1 1] eq {value2}}
- }
-
- # Here the idea is to check the consistency of the stream data structure
- # as we remove all the elements down to zero elements.
- test {XDEL fuzz test} {
- r del somestream
- set ids {}
- set x 0; # Length of the stream
- while 1 {
- lappend ids [r xadd somestream * item $x]
- incr x
- # Add enough elements to have a few radix tree nodes inside the
stream.
- if {[dict get [r xinfo stream somestream] length] > 500} break
- }
-
- # Now remove all the elements till we reach an empty stream
- # and after every deletion, check that the stream is sane enough
- # to report the right number of elements with XRANGE: this will also
- # force accessing the whole data structure to check sanity.
- assert {[r xlen somestream] == $x}
-
- # We want to remove elements in random order to really test the
- # implementation in a better way.
- set ids [lshuffle $ids]
- foreach id $ids {
- assert {[r xdel somestream $id] == 1}
- incr x -1
- assert {[r xlen somestream] == $x}
- # The test would be too slow calling XRANGE for every iteration.
- # Do it every 100 removal.
- if {$x % 100 == 0} {
- set res [r xrange somestream - +]
- assert {[llength $res] == $x}
- }
- }
- }
-
- test {XRANGE fuzzing} {
- set items [r XRANGE mystream{t} - +]
- set low_id [lindex $items 0 0]
- set high_id [lindex $items end 0]
- for {set j 0} {$j < 100} {incr j} {
- set start [streamRandomID $low_id $high_id]
- set end [streamRandomID $low_id $high_id]
- set range [r xrange mystream{t} $start $end]
- set tcl_range [streamSimulateXRANGE $items $start $end]
- if {$range ne $tcl_range} {
- puts "*** WARNING *** - XRANGE fuzzing mismatch: $start - $end"
- puts "---"
- puts "XRANGE: '$range'"
- puts "---"
- puts "TCL: '$tcl_range'"
- puts "---"
- fail "XRANGE fuzzing failed, check logs for details"
- }
- }
- }
-
- test {XREVRANGE regression test for issue #5006} {
- # Add non compressed entries
- r xadd teststream 1234567891230 key1 value1
- r xadd teststream 1234567891240 key2 value2
- r xadd teststream 1234567891250 key3 value3
-
- # Add SAMEFIELD compressed entries
- r xadd teststream2 1234567891230 key1 value1
- r xadd teststream2 1234567891240 key1 value2
- r xadd teststream2 1234567891250 key1 value3
-
- assert_equal [r xrevrange teststream 1234567891245 -]
{{1234567891240-0 {key2 value2}} {1234567891230-0 {key1 value1}}}
-
- assert_equal [r xrevrange teststream2 1234567891245 -]
{{1234567891240-0 {key1 value2}} {1234567891230-0 {key1 value1}}}
- }
-
- test {XREAD streamID edge (no-blocking)} {
- r del x
- r XADD x 1-1 f v
- r XADD x 1-18446744073709551615 f v
- r XADD x 2-1 f v
- set res [r XREAD BLOCK 0 STREAMS x 1-18446744073709551615]
- assert {[lindex $res 0 1 0] == {2-1 {f v}}}
- }
-
- test {XREAD streamID edge (blocking)} {
- r del x
- set rd [redis_deferring_client]
- $rd XREAD BLOCK 0 STREAMS x 1-18446744073709551615
- wait_for_blocked_clients_count 1
- r XADD x 1-1 f v
- r XADD x 1-18446744073709551615 f v
- r XADD x 2-1 f v
- set res [$rd read]
- assert {[lindex $res 0 1 0] == {2-1 {f v}}}
- $rd close
- }
-
- test {XADD streamID edge} {
- r del x
- r XADD x 2577343934890-18446744073709551615 f v ;# we need the
timestamp to be in the future
- r XADD x * f2 v2
- assert_equal [r XRANGE x - +] {{2577343934890-18446744073709551615 {f
v}} {2577343934891-0 {f2 v2}}}
- }
-
- test {XTRIM with MAXLEN option basic test} {
- r DEL mystream
- for {set j 0} {$j < 1000} {incr j} {
- if {rand() < 0.9} {
- r XADD mystream * xitem $j
- } else {
- r XADD mystream * yitem $j
- }
- }
- r XTRIM mystream MAXLEN 666
- assert {[r XLEN mystream] == 666}
- r XTRIM mystream MAXLEN = 555
- assert {[r XLEN mystream] == 555}
- }
-
- test {XADD with LIMIT consecutive calls} {
- r del mystream
- for {set j 0} {$j < 100} {incr j} {
- r XADD mystream * xitem v
- }
- r XADD mystream MAXLEN = 55 * xitem v
- assert {[r xlen mystream] == 55}
- r XADD mystream MAXLEN = 55 * xitem v
- assert {[r xlen mystream] == 55}
- }
-}