This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch 2.9
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/2.9 by this push:
new db89273f chore: disable search and stream group commands
db89273f is described below
commit db89273f57da7c6ee79e718f5a4757be12477dc7
Author: PragmaTwice <[email protected]>
AuthorDate: Mon Jul 1 13:16:44 2024 +0900
chore: disable search and stream group commands
---
src/commands/cmd_search.cc | 17 +-
src/commands/cmd_stream.cc | 37 +-
tests/gocase/unit/search/search_test.go | 2 +
tests/gocase/unit/type/stream/stream_test.go | 2074 +++++++++++++-------------
4 files changed, 1067 insertions(+), 1063 deletions(-)
diff --git a/src/commands/cmd_search.cc b/src/commands/cmd_search.cc
index 1928672c..ea42f269 100644
--- a/src/commands/cmd_search.cc
+++ b/src/commands/cmd_search.cc
@@ -358,13 +358,14 @@ class CommandFTDrop : public Commander {
};
};
-REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandFTCreate>("ft.create", -2, "write
exclusive no-multi no-script", 0, 0, 0),
- MakeCmdAttr<CommandFTSearchSQL>("ft.searchsql", 2,
"read-only", 0, 0, 0),
- MakeCmdAttr<CommandFTSearch>("ft.search", -3,
"read-only", 0, 0, 0),
- MakeCmdAttr<CommandFTExplainSQL>("ft.explainsql", -2,
"read-only", 0, 0, 0),
- MakeCmdAttr<CommandFTExplain>("ft.explain", -3,
"read-only", 0, 0, 0),
- MakeCmdAttr<CommandFTInfo>("ft.info", 2, "read-only",
0, 0, 0),
- MakeCmdAttr<CommandFTList>("ft._list", 1, "read-only",
0, 0, 0),
- MakeCmdAttr<CommandFTDrop>("ft.dropindex", 2, "write
exclusive no-multi no-script", 0, 0, 0));
+// REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandFTCreate>("ft.create", -2,
"write exclusive no-multi no-script", 0, 0, 0),
+// MakeCmdAttr<CommandFTSearchSQL>("ft.searchsql", 2,
"read-only", 0, 0, 0),
+// MakeCmdAttr<CommandFTSearch>("ft.search", -3,
"read-only", 0, 0, 0),
+// MakeCmdAttr<CommandFTExplainSQL>("ft.explainsql",
-2, "read-only", 0, 0, 0),
+// MakeCmdAttr<CommandFTExplain>("ft.explain", -3,
"read-only", 0, 0, 0),
+// MakeCmdAttr<CommandFTInfo>("ft.info", 2,
"read-only", 0, 0, 0),
+// MakeCmdAttr<CommandFTList>("ft._list", 1,
"read-only", 0, 0, 0),
+// MakeCmdAttr<CommandFTDrop>("ft.dropindex", 2,
"write exclusive no-multi no-script", 0, 0,
+// 0));
} // namespace redis
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 7faeb9c4..e1e1a257 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -655,10 +655,10 @@ class CommandXInfo : public Commander {
count_ = *parse_result;
}
- } else if (val == "groups" && args.size() == 3) {
- subcommand_ = "groups";
- } else if (val == "consumers" && args.size() == 4) {
- subcommand_ = "consumers";
+ // } else if (val == "groups" && args.size() == 3) {
+ // subcommand_ = "groups";
+ // } else if (val == "consumers" && args.size() == 4) {
+ // subcommand_ = "consumers";
} else {
return {Status::RedisParseErr, errUnknownSubcommandOrWrongArguments};
}
@@ -1736,19 +1736,20 @@ class CommandXSetId : public Commander {
std::optional<uint64_t> entries_added_;
};
-REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandXAck>("xack", -4, "write
no-dbsize-check", 1, 1, 1),
- MakeCmdAttr<CommandXAdd>("xadd", -5, "write", 1, 1, 1),
- MakeCmdAttr<CommandXDel>("xdel", -3, "write
no-dbsize-check", 1, 1, 1),
- MakeCmdAttr<CommandXClaim>("xclaim", -6, "write", 1,
1, 1),
- MakeCmdAttr<CommandAutoClaim>("xautoclaim", -6,
"write", 1, 1, 1),
- MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2,
2, 1),
- MakeCmdAttr<CommandXLen>("xlen", -2, "read-only", 1,
1, 1),
- MakeCmdAttr<CommandXInfo>("xinfo", -2, "read-only", 0,
0, 0),
- MakeCmdAttr<CommandXRange>("xrange", -4, "read-only",
1, 1, 1),
- MakeCmdAttr<CommandXRevRange>("xrevrange", -2,
"read-only", 1, 1, 1),
- MakeCmdAttr<CommandXRead>("xread", -4, "read-only", 0,
0, 0),
- MakeCmdAttr<CommandXReadGroup>("xreadgroup", -7,
"write", 0, 0, 0),
- MakeCmdAttr<CommandXTrim>("xtrim", -4, "write
no-dbsize-check", 1, 1, 1),
- MakeCmdAttr<CommandXSetId>("xsetid", -3, "write", 1,
1, 1))
+REDIS_REGISTER_COMMANDS(
+ // MakeCmdAttr<CommandXAck>("xack", -4, "write no-dbsize-check", 1, 1, 1),
+ MakeCmdAttr<CommandXAdd>("xadd", -5, "write", 1, 1, 1),
+ MakeCmdAttr<CommandXDel>("xdel", -3, "write no-dbsize-check", 1, 1, 1),
+ // MakeCmdAttr<CommandXClaim>("xclaim", -6, "write", 1, 1, 1),
+ // MakeCmdAttr<CommandAutoClaim>("xautoclaim", -6, "write", 1, 1, 1),
+ // MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2, 2, 1),
+ MakeCmdAttr<CommandXLen>("xlen", -2, "read-only", 1, 1, 1),
+ MakeCmdAttr<CommandXInfo>("xinfo", -2, "read-only", 0, 0, 0),
+ MakeCmdAttr<CommandXRange>("xrange", -4, "read-only", 1, 1, 1),
+ MakeCmdAttr<CommandXRevRange>("xrevrange", -2, "read-only", 1, 1, 1),
+ MakeCmdAttr<CommandXRead>("xread", -4, "read-only", 0, 0, 0),
+ // MakeCmdAttr<CommandXReadGroup>("xreadgroup", -7, "write", 0, 0, 0),
+ MakeCmdAttr<CommandXTrim>("xtrim", -4, "write no-dbsize-check", 1, 1, 1),
+ MakeCmdAttr<CommandXSetId>("xsetid", -3, "write", 1, 1, 1))
} // namespace redis
diff --git a/tests/gocase/unit/search/search_test.go
b/tests/gocase/unit/search/search_test.go
index bb87c19f..651323c1 100644
--- a/tests/gocase/unit/search/search_test.go
+++ b/tests/gocase/unit/search/search_test.go
@@ -29,6 +29,8 @@ import (
)
func TestSearch(t *testing.T) {
+ t.Skip("search commands is disabled")
+
srv := util.StartServer(t, map[string]string{})
defer srv.Close()
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index b8958e70..4ba461ff 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -867,1043 +867,1043 @@ func TestStreamOffset(t *testing.T) {
require.EqualValues(t, providedSeqNum, seqNum)
})
- t.Run("XGROUP CREATE with different kinds of commands and XGROUP
DESTROY", func(t *testing.T) {
- streamName := "test-stream-a"
- groupName := "test-group-a"
- require.NoError(t, rdb.Del(ctx, streamName).Err())
- // No such stream (No such key)
- require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$").Err())
- require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$", "ENTRIESREAD", "10").Err())
- require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$", "ENTRIESREAD").Err())
- require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$", "MKSTREAM", "ENTRIESREAD").Err())
- require.NoError(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$", "MKSTREAM").Err())
- require.NoError(t, rdb.XInfoStream(ctx, streamName).Err())
- require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$").Err())
- // Invalid syntax
- groupName = "test-group-b"
- require.Error(t, rdb.Do(ctx, "XGROUP", "CREAT", streamName,
groupName, "$").Err())
- require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$", "ENTRIEREAD", "10").Err())
- require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$", "ENTRIESREAD", "-10").Err())
- require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
"1test-group-c", "$").Err())
-
- require.NoError(t, rdb.Del(ctx, "myStream").Err())
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"myStream", Values: []string{"iTeM", "1", "vAluE", "a"}}).Err())
- require.NoError(t, rdb.XGroupCreate(ctx, "myStream", "myGroup",
"$").Err())
- result, err := rdb.XGroupDestroy(ctx, "myStream",
"myGroup").Result()
- require.NoError(t, err)
- require.Equal(t, int64(1), result)
- result, err = rdb.XGroupDestroy(ctx, "myStream",
"myGroup").Result()
- require.NoError(t, err)
- require.Equal(t, int64(0), result)
- })
-
- t.Run("XGROUP CREATECONSUMER with different kinds of commands", func(t
*testing.T) {
- streamName := "test-stream"
- groupName := "test-group"
- consumerName := "test-consumer"
- require.NoError(t, rdb.Del(ctx, streamName).Err())
- //No such stream
- require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName,
groupName, consumerName).Err())
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "1-0",
- Values: []string{"data", "a"},
- }).Err())
- //no such group
- require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName,
groupName, consumerName).Err())
- require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"$").Err())
-
- r := rdb.XGroupCreateConsumer(ctx, streamName, groupName,
consumerName).Val()
- require.Equal(t, int64(1), r)
- r = rdb.XGroupCreateConsumer(ctx, streamName, groupName,
consumerName).Val()
- require.Equal(t, int64(0), r)
- })
-
- t.Run("XGROUP DELCONSUMER with different kinds of commands", func(t
*testing.T) {
- streamName := "test-stream"
- groupName := "test-group"
- consumerName := "test-consumer"
- require.NoError(t, rdb.Del(ctx, streamName).Err())
- //No such stream
- require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName,
groupName, consumerName).Err())
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "1-0",
- Values: []string{"data", "a"},
- }).Err())
- //no such group
- require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName,
groupName, consumerName).Err())
- require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"$").Err())
- require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName,
groupName, consumerName).Err())
-
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "*",
- Values: []string{"data1", "a1"},
- }).Err())
- require.NoError(t, rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
- Group: groupName,
- Consumer: consumerName,
- Streams: []string{streamName, ">"},
- Count: 1,
- NoAck: false,
- }).Err())
- ri, erri := rdb.XInfoGroups(ctx, streamName).Result()
- require.NoError(t, erri)
- require.Equal(t, int64(1), ri[0].Consumers)
- require.Equal(t, int64(1), ri[0].Pending)
-
- r, err := rdb.XGroupDelConsumer(ctx, streamName, groupName,
consumerName).Result()
- require.NoError(t, err)
- require.Equal(t, int64(1), r)
- ri, erri = rdb.XInfoGroups(ctx, streamName).Result()
- require.NoError(t, erri)
- require.Equal(t, int64(0), ri[0].Consumers)
- require.Equal(t, int64(0), ri[0].Pending)
-
- require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName,
groupName, consumerName).Err())
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "*",
- Values: []string{"data2", "a2"},
- }).Err())
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "*",
- Values: []string{"data3", "a3"},
- }).Err())
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "*",
- Values: []string{"data4", "a4"},
- }).Err())
- require.NoError(t, rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
- Group: groupName,
- Consumer: consumerName,
- Streams: []string{streamName, ">"},
- Count: 3,
- NoAck: false,
- }).Err())
- ri, erri = rdb.XInfoGroups(ctx, streamName).Result()
- require.NoError(t, erri)
- require.Equal(t, int64(1), ri[0].Consumers)
- require.Equal(t, int64(3), ri[0].Pending)
- r, err = rdb.XGroupDelConsumer(ctx, streamName, groupName,
consumerName).Result()
- require.NoError(t, err)
- require.Equal(t, int64(3), r)
- ri, erri = rdb.XInfoGroups(ctx, streamName).Result()
- require.NoError(t, erri)
- require.Equal(t, int64(0), ri[0].Consumers)
- require.Equal(t, int64(0), ri[0].Pending)
- })
-
- t.Run("XGROUP SETID with different kinds of commands", func(t
*testing.T) {
- streamName := "test-stream"
- groupName := "test-group"
- require.NoError(t, rdb.Del(ctx, streamName).Err())
- //No such stream
- require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName,
"$").Err())
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "1-0",
- Values: []string{"data", "a"},
- }).Err())
- //No such group
- require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName,
"$").Err())
- require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"$").Err())
-
- require.NoError(t, rdb.XGroupSetID(ctx, streamName, groupName,
"0-0").Err())
- require.Error(t, rdb.Do(ctx, "xgroup", "setid", streamName,
groupName, "$", "entries", "100").Err())
- require.Error(t, rdb.Do(ctx, "xgroup", "setid", streamName,
groupName, "$", "entriesread", "-100").Err())
- require.NoError(t, rdb.Do(ctx, "xgroup", "setid", streamName,
groupName, "$", "entriesread", "100").Err())
- })
-
- t.Run("XINFO GROUPS and XINFO CONSUMERS", func(t *testing.T) {
- streamName := "test-stream"
- group1 := "t1"
- group2 := "t2"
- consumer1 := "c1"
- consumer2 := "c2"
- consumer3 := "c3"
- require.NoError(t, rdb.Del(ctx, streamName).Err())
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "1-0",
- Values: []string{"data", "a"},
- }).Err())
- require.NoError(t, rdb.XGroupCreate(ctx, streamName, group1,
"$").Err())
- r := rdb.XInfoGroups(ctx, streamName).Val()
- require.Equal(t, group1, r[0].Name)
- require.Equal(t, int64(0), r[0].Consumers)
- require.Equal(t, int64(0), r[0].Pending)
- require.Equal(t, "1-0", r[0].LastDeliveredID)
- require.Equal(t, int64(0), r[0].EntriesRead)
- require.Equal(t, int64(0), r[0].Lag)
-
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "2-0",
- Values: []string{"data1", "b"},
- }).Err())
- require.NoError(t, rdb.XGroupCreate(ctx, streamName, group2,
"$").Err())
- r = rdb.XInfoGroups(ctx, streamName).Val()
- require.Equal(t, group2, r[1].Name)
- require.Equal(t, "2-0", r[1].LastDeliveredID)
-
- require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName,
group1, consumer1).Err())
- require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName,
group1, consumer2).Err())
- require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName,
group2, consumer3).Err())
- r = rdb.XInfoGroups(ctx, streamName).Val()
- require.Equal(t, int64(2), r[0].Consumers)
- require.Equal(t, int64(1), r[1].Consumers)
-
- r1 := rdb.XInfoConsumers(ctx, streamName, group1).Val()
- require.Equal(t, consumer1, r1[0].Name)
- require.Equal(t, consumer2, r1[1].Name)
- r1 = rdb.XInfoConsumers(ctx, streamName, group2).Val()
- require.Equal(t, consumer3, r1[0].Name)
- })
-
- t.Run("XINFO after delete pending message and related consumer, for
issue #2350", func(t *testing.T) {
- streamName := "test-stream-2350"
- groupName := "test-group-2350"
- consumerName := "test-consumer-2350"
- require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName,
groupName, "$").Err())
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "*",
- Values: []string{"testing", "overflow"},
- }).Err())
- readRsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
- Group: groupName,
- Consumer: consumerName,
- Streams: []string{streamName, ">"},
- Count: 1,
- NoAck: false,
- })
- require.NoError(t, readRsp.Err())
- require.Len(t, readRsp.Val(), 1)
- streamRsp := readRsp.Val()[0]
- require.Len(t, streamRsp.Messages, 1)
- msgID := streamRsp.Messages[0]
- require.NoError(t, rdb.XAck(ctx, streamName, groupName,
msgID.ID).Err())
- require.NoError(t, rdb.XGroupDelConsumer(ctx, streamName,
groupName, consumerName).Err())
- infoRsp := rdb.XInfoGroups(ctx, streamName)
- require.NoError(t, infoRsp.Err())
- infoGroups := infoRsp.Val()
- require.Len(t, infoGroups, 1)
- infoGroup := infoGroups[0]
- require.Equal(t, groupName, infoGroup.Name)
- require.Equal(t, int64(0), infoGroup.Consumers)
- require.Equal(t, int64(0), infoGroup.Pending)
- require.Equal(t, msgID.ID, infoGroup.LastDeliveredID)
- })
-
- t.Run("XREAD After XGroupCreate and XGroupCreateConsumer, for issue
#2109", func(t *testing.T) {
- streamName := "test-stream"
- group := "group"
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "*",
- Values: []string{"data1", "b"},
- }).Err())
- require.NoError(t, rdb.XGroupCreate(ctx, streamName, group,
"0").Err())
- require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName,
group, "consumer").Err())
- require.NoError(t, rdb.XRead(ctx, &redis.XReadArgs{
- Streams: []string{streamName, "0"},
- }).Err())
- })
-
- t.Run("XREADGROUP with different kinds of commands", func(t *testing.T)
{
- streamName := "mystream"
- groupName := "mygroup"
- require.NoError(t, rdb.Del(ctx, streamName).Err())
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "1-0",
- Values: []string{"field1", "data1"},
- }).Err())
- require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
- consumerName := "myconsumer"
- r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
- Group: groupName,
- Consumer: consumerName,
- Streams: []string{streamName, ">"},
- Count: 1,
- NoAck: false,
- }).Result()
- require.NoError(t, err)
- require.Equal(t, []redis.XStream{{
- Stream: streamName,
- Messages: []redis.XMessage{{ID: "1-0", Values:
map[string]interface{}{"field1": "data1"}}},
- }}, r)
-
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "2-0",
- Values: []string{"field2", "data2"},
- }).Err())
- r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
- Group: groupName,
- Consumer: consumerName,
- Streams: []string{streamName, ">"},
- Count: 1,
- NoAck: false,
- }).Result()
- require.NoError(t, err)
- require.Equal(t, []redis.XStream{{
- Stream: streamName,
- Messages: []redis.XMessage{{ID: "2-0", Values:
map[string]interface{}{"field2": "data2"}}},
- }}, r)
-
- r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
- Group: groupName,
- Consumer: consumerName,
- Streams: []string{streamName, "0"},
- Count: 2,
- NoAck: false,
- }).Result()
- require.NoError(t, err)
- require.Equal(t, []redis.XStream{{
- Stream: streamName,
- Messages: []redis.XMessage{{ID: "1-0", Values:
map[string]interface{}{"field1": "data1"}},
- {ID: "2-0", Values:
map[string]interface{}{"field2": "data2"}}},
- }}, r)
-
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "3-0",
- Values: []string{"field3", "data3"},
- }).Err())
- r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
- Group: groupName,
- Consumer: consumerName,
- Streams: []string{streamName, ">"},
- Count: 1,
- NoAck: true,
- }).Result()
- require.NoError(t, err)
- require.Equal(t, []redis.XStream{{
- Stream: streamName,
- Messages: []redis.XMessage{{ID: "3-0", Values:
map[string]interface{}{"field3": "data3"}}},
- }}, r)
- r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
- Group: groupName,
- Consumer: consumerName,
- Streams: []string{streamName, "0"},
- Count: 2,
- NoAck: false,
- }).Result()
- require.NoError(t, err)
- require.Equal(t, []redis.XStream{{
- Stream: streamName,
- Messages: []redis.XMessage{{ID: "1-0", Values:
map[string]interface{}{"field1": "data1"}},
- {ID: "2-0", Values:
map[string]interface{}{"field2": "data2"}}},
- }}, r)
-
- c := srv.NewClient()
- defer func() { require.NoError(t, c.Close()) }()
- ch := make(chan []redis.XStream)
- go func() {
- ch <- c.XReadGroup(ctx, &redis.XReadGroupArgs{
- Group: groupName,
- Consumer: consumerName,
- Streams: []string{streamName, ">"},
- Count: 2,
- Block: 10 * time.Second,
- NoAck: false,
- }).Val()
- }()
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "4-0",
- Values: []string{"field4", "data4"},
- }).Err())
- r = <-ch
- require.Equal(t, []redis.XStream{{
- Stream: streamName,
- Messages: []redis.XMessage{{ID: "4-0", Values:
map[string]interface{}{"field4": "data4"}}},
- }}, r)
-
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "5-0",
- Values: []string{"field5", "data5"},
- }).Err())
- require.NoError(t, rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
- Group: groupName,
- Consumer: consumerName,
- Streams: []string{streamName, ">"},
- Count: 1,
- NoAck: false,
- }).Err())
- require.NoError(t, rdb.XDel(ctx, streamName, "5-0").Err())
- r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
- Group: groupName,
- Consumer: consumerName,
- Streams: []string{streamName, "5"},
- Count: 1,
- NoAck: false,
- }).Result()
- require.NoError(t, err)
- require.Equal(t, []redis.XStream{{
- Stream: streamName,
- Messages: []redis.XMessage{{ID: "5-0", Values:
map[string]interface{}(nil)}},
- }}, r)
- })
-
- t.Run("Check xreadgroup fetches the newest data after create consumer
in the command", func(t *testing.T) {
- streamName := "mystream"
- groupName := "mygroup"
- require.NoError(t, rdb.Del(ctx, streamName).Err())
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "1-0",
- Values: []string{"field1", "data1"},
- }).Err())
- require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
- consumerName := "myconsumer"
- err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
- Group: groupName,
- Consumer: consumerName,
- Streams: []string{streamName, ">"},
- Count: 1,
- NoAck: false,
- }).Err()
- require.NoError(t, err)
- ri, erri := rdb.XInfoGroups(ctx, streamName).Result()
- require.NoError(t, erri)
- require.Equal(t, int64(1), ri[0].Consumers)
- })
-
- t.Run("XACK with different kinds of commands", func(t *testing.T) {
- streamName := "mystream"
- groupName := "mygroup"
- require.NoError(t, rdb.Del(ctx, streamName).Err())
- r, err := rdb.XAck(ctx, streamName, groupName, "0-0").Result()
- require.NoError(t, err)
- require.Equal(t, int64(0), r)
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "1-0",
- Values: []string{"field1", "data1"},
- }).Err())
- require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
- consumerName := "myconsumer"
- err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
- Group: groupName,
- Consumer: consumerName,
- Streams: []string{streamName, ">"},
- Count: 1,
- NoAck: false,
- }).Err()
- require.NoError(t, err)
- r, err = rdb.XAck(ctx, streamName, groupName, "1-0").Result()
- require.NoError(t, err)
- require.Equal(t, int64(1), r)
-
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "2-0",
- Values: []string{"field1", "data1"},
- }).Err())
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "3-0",
- Values: []string{"field1", "data1"},
- }).Err())
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "4-0",
- Values: []string{"field1", "data1"},
- }).Err())
- err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
- Group: groupName,
- Consumer: consumerName,
- Streams: []string{streamName, ">"},
- Count: 3,
- NoAck: false,
- }).Err()
- require.NoError(t, err)
- r, err = rdb.XAck(ctx, streamName, groupName, "2-0", "3-0",
"4-0").Result()
- require.NoError(t, err)
- require.Equal(t, int64(3), r)
- })
-
- t.Run("Simple XCLAIM command tests", func(t *testing.T) {
- streamName := "mystream"
- groupName := "mygroup"
- consumerName := "myconsumer"
- consumer1Name := "myconsumer1"
- require.NoError(t, rdb.Del(ctx, streamName).Err())
-
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "1-0",
- Values: []string{"field1", "data1"},
- }).Err())
- require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
- r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
- Group: groupName,
- Consumer: consumerName,
- Streams: []string{streamName, ">"},
- Count: 1,
- NoAck: false,
- }).Result()
- require.NoError(t, err)
- require.Equal(t, []redis.XStream{{
- Stream: streamName,
- Messages: []redis.XMessage{{ID: "1-0", Values:
map[string]interface{}{"field1": "data1"}}},
- }}, r)
-
- claimedMessages, err := rdb.XClaim(ctx, &redis.XClaimArgs{
- Stream: streamName,
- Group: groupName,
- Consumer: consumer1Name,
- MinIdle: 0,
- Messages: []string{"1-0"},
- }).Result()
- require.NoError(t, err)
- require.Len(t, claimedMessages, 1, "Expected to claim 1
message")
- require.Equal(t, "1-0", claimedMessages[0].ID, "Expected
claimed message ID to match")
-
- time.Sleep(2000 * time.Millisecond)
- minIdleTime := 1000 * time.Millisecond
- claimedMessages, err = rdb.XClaim(ctx, &redis.XClaimArgs{
- Stream: streamName,
- Group: groupName,
- Consumer: consumerName,
- MinIdle: minIdleTime,
- Messages: []string{"1-0"},
- }).Result()
- require.NoError(t, err)
- require.Len(t, claimedMessages, 1, "Expected to claim 1 message
if idle time is large enough")
- require.Equal(t, "1-0", claimedMessages[0].ID, "Expected
claimed message ID to match")
-
- minIdleTime = 60000 * time.Millisecond
- claimedMessages, err = rdb.XClaim(ctx, &redis.XClaimArgs{
- Stream: streamName,
- Group: groupName,
- Consumer: consumer1Name,
- MinIdle: minIdleTime,
- Messages: []string{"1-0"},
- }).Result()
-
- require.NoError(t, err)
- require.Empty(t, claimedMessages, "Expected no messages to be
claimed due to insufficient idle time")
- })
-
- t.Run("XCLAIM with different timing situations and options", func(t
*testing.T) {
- streamName := "mystream"
- groupName := "mygroup"
- consumerName := "myconsumer"
- consumer1Name := "myconsumer1"
- require.NoError(t, rdb.Del(ctx, streamName).Err())
-
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "1-0",
- Values: []string{"field1", "data1"},
- }).Err())
- require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
- r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
- Group: groupName,
- Consumer: consumerName,
- Streams: []string{streamName, ">"},
- Count: 1,
- NoAck: false,
- }).Result()
- require.NoError(t, err)
- require.Equal(t, []redis.XStream{{
- Stream: streamName,
- Messages: []redis.XMessage{{ID: "1-0", Values:
map[string]interface{}{"field1": "data1"}}},
- }}, r)
-
- rawClaimedMessages, err := rdb.Do(ctx, "XCLAIM", streamName,
groupName, consumer1Name, "0", "1-0", "IDLE", "5000").Result()
- require.NoError(t, err)
- messages, ok := rawClaimedMessages.([]interface{})
- require.True(t, ok, "Expected the result to be a slice of
interface{}")
- firstMsg, ok := messages[0].([]interface{})
- require.True(t, ok, "Expected message details to be a slice of
interface{}")
- msgID, ok := firstMsg[0].(string)
- require.True(t, ok, "Expected message ID to be a string")
- require.Equal(t, "1-0", msgID, "Expected claimed message ID to
match")
-
- claimedMessages, err := rdb.XClaim(ctx, &redis.XClaimArgs{
- Stream: streamName,
- Group: groupName,
- Consumer: consumerName,
- MinIdle: 2000 * time.Millisecond,
- Messages: []string{"1-0"},
- }).Result()
- require.NoError(t, err)
- require.Len(t, claimedMessages, 1, "Expected to claim 1 message
if idle time is large enough")
- require.Equal(t, "1-0", claimedMessages[0].ID, "Expected
claimed message ID to match")
-
- tenSecondsAgo := time.Now().Add(-10 * time.Second).UnixMilli()
- rawClaimedMessages, err = rdb.Do(ctx, "XCLAIM", streamName,
groupName, consumer1Name, "0", "1-0", "TIME", tenSecondsAgo).Result()
- require.NoError(t, err)
- messages, ok = rawClaimedMessages.([]interface{})
- require.True(t, ok, "Expected the result to be a slice of
interface{}")
- firstMsg, ok = messages[0].([]interface{})
- require.True(t, ok, "Expected message details to be a slice of
interface{}")
- msgID, ok = firstMsg[0].(string)
- require.True(t, ok, "Expected message ID to be a string")
- require.Equal(t, "1-0", msgID, "Expected claimed message ID to
match")
-
- claimedMessages, err = rdb.XClaim(ctx, &redis.XClaimArgs{
- Stream: streamName,
- Group: groupName,
- Consumer: consumerName,
- MinIdle: 5000 * time.Millisecond,
- Messages: []string{"1-0"},
- }).Result()
- require.NoError(t, err)
- require.Len(t, claimedMessages, 1, "Expected to claim 1 message
if idle time is large enough")
- require.Equal(t, "1-0", claimedMessages[0].ID, "Expected
claimed message ID to match")
- })
-
- t.Run("XCLAIM command with different options", func(t *testing.T) {
- streamName := "mystream"
- groupName := "mygroup"
- consumerName := "myconsumer"
- consumer1Name := "myconsumer1"
-
- require.NoError(t, rdb.Del(ctx, streamName).Err())
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "1-0",
- Values: []string{"field1", "data1"},
- }).Err())
- require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
-
- rawClaimedMessages, err := rdb.Do(ctx, "XCLAIM", streamName,
groupName, consumerName, "0", "1-0", "FORCE").Result()
- require.NoError(t, err)
- messages, ok := rawClaimedMessages.([]interface{})
- require.True(t, ok, "Expected the result to be a slice of
interface{}")
- firstMsg, ok := messages[0].([]interface{})
- require.True(t, ok, "Expected message details to be a slice of
interface{}")
- msgID, ok := firstMsg[0].(string)
- require.True(t, ok, "Expected message ID to be a string")
- require.Equal(t, "1-0", msgID, "Expected claimed message ID to
match")
-
- cmd := rdb.XClaimJustID(ctx, &redis.XClaimArgs{
- Stream: streamName,
- Group: groupName,
- Consumer: consumer1Name,
- MinIdle: 0,
- Messages: []string{"1-0"},
- })
-
- claimedIDs, err := cmd.Result()
- require.NoError(t, err)
- require.Len(t, claimedIDs, 1, "Expected to claim exactly one
message ID")
- require.Equal(t, "1-0", claimedIDs[0], "Expected claimed
message ID to match")
- })
-
- t.Run("XAUTOCLAIM can claim PEL items from another consume", func(t
*testing.T) {
-
- streamName := "mystream"
- groupName := "mygroup"
- var id1 string
- require.NoError(t, rdb.Del(ctx, streamName).Err())
- {
- rsp := rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "*",
- Values: []string{"a", "1"},
- })
- require.NoError(t, rsp.Err())
- id1 = rsp.Val()
- }
- var id2 string
- {
- rsp := rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "*",
- Values: []string{"b", "2"},
- })
- require.NoError(t, rsp.Err())
- id2 = rsp.Val()
- }
- var id3 string
- {
- rsp := rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "*",
- Values: []string{"c", "3"},
- })
- require.NoError(t, rsp.Err())
- id3 = rsp.Val()
- }
- var id4 string
- {
- rsp := rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "*",
- Values: []string{"d", "4"},
- })
- require.NoError(t, rsp.Err())
- id4 = rsp.Val()
- }
-
- require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
-
- consumer1 := "consumer1"
- consumer2 := "consumer2"
- {
- rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
- Group: groupName,
- Consumer: consumer1,
- Streams: []string{streamName, ">"},
- Count: 1,
- })
- require.NoError(t, rsp.Err())
- require.Len(t, rsp.Val(), 1)
- require.Len(t, rsp.Val()[0].Messages, 1)
- require.Equal(t, id1, rsp.Val()[0].Messages[0].ID)
- require.Len(t, rsp.Val()[0].Messages[0].Values, 1)
- require.Equal(t, "1",
rsp.Val()[0].Messages[0].Values["a"])
- }
-
- {
- time.Sleep(200 * time.Millisecond)
- rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
- Stream: streamName,
- Group: groupName,
- Consumer: consumer2,
- MinIdle: 10 * time.Millisecond,
- Count: 1,
- Start: "-",
- })
- require.NoError(t, rsp.Err())
- msgs, start := rsp.Val()
- require.Equal(t, "0-0", start)
- require.Len(t, msgs, 1)
- require.Len(t, msgs[0].Values, 1)
- require.Equal(t, "1", msgs[0].Values["a"])
- }
-
- {
- rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
- Group: groupName,
- Consumer: consumer1,
- Streams: []string{streamName, ">"},
- Count: 3,
- })
- require.NoError(t, rsp.Err())
-
- time.Sleep(time.Millisecond * 200)
- require.NoError(t, rdb.XDel(ctx, streamName, id2).Err())
- }
-
- {
- cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName,
consumer2, 10, "-", "COUNT", 3)
- require.NoError(t, cmd.Err())
- require.Equal(t, []interface{}{
- id4,
- []interface{}{
- []interface{}{
- id1,
- []interface{}{"a", "1"},
- },
- []interface{}{
- id3,
- []interface{}{"c", "3"},
- },
- },
- []interface{}{
- id2,
- },
- }, cmd.Val())
- }
-
- {
- time.Sleep(time.Millisecond * 200)
- require.NoError(t, rdb.XDel(ctx, streamName, id4).Err())
- rsp := rdb.XAutoClaimJustID(ctx, &redis.XAutoClaimArgs{
- Stream: streamName,
- Group: groupName,
- Consumer: consumer2,
- MinIdle: 10 * time.Millisecond,
- Start: "-",
- })
- require.NoError(t, rsp.Err())
- msgs, start := rsp.Val()
- require.Equal(t, "0-0", start)
- require.Len(t, msgs, 2)
- require.Equal(t, id1, msgs[0])
- require.Equal(t, id3, msgs[1])
- }
- })
-
- t.Run("XAUTOCLAIM as an iterator", func(t *testing.T) {
- streamName := "mystream"
- groupName := "mygroup"
- var id3, id5 string
- require.NoError(t, rdb.Del(ctx, streamName).Err())
- {
- rsp := rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "*",
- Values: []string{"a", "1"},
- })
- require.NoError(t, rsp.Err())
- }
- {
- rsp := rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "*",
- Values: []string{"b", "2"},
- })
- require.NoError(t, rsp.Err())
- }
- {
- rsp := rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "*",
- Values: []string{"c", "3"},
- })
- require.NoError(t, rsp.Err())
- id3 = rsp.Val()
- }
- {
- rsp := rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "*",
- Values: []string{"d", "4"},
- })
- require.NoError(t, rsp.Err())
- }
- {
- rsp := rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "*",
- Values: []string{"e", "5"},
- })
- require.NoError(t, rsp.Err())
- id5 = rsp.Val()
- }
- require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
-
- consumer1, consumer2 := "consumer1", "consumer2"
- {
- rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
- Group: groupName,
- Consumer: consumer1,
- Streams: []string{streamName, ">"},
- Count: 90,
- })
- require.NoError(t, rsp.Err())
- time.Sleep(200 * time.Millisecond)
- }
- {
- rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
- Stream: streamName,
- Group: groupName,
- Consumer: consumer2,
- MinIdle: 10 * time.Millisecond,
- Count: 2,
- Start: "-",
- })
- require.NoError(t, rsp.Err())
- msgs, start := rsp.Val()
- require.Equal(t, id3, start)
- require.Len(t, msgs, 2)
- require.Len(t, msgs[0].Values, 1)
- require.Equal(t, "1", msgs[0].Values["a"])
- }
-
- {
- rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
- Stream: streamName,
- Group: groupName,
- Consumer: consumer2,
- MinIdle: 10 * time.Millisecond,
- Start: id3,
- Count: 2,
- })
- require.NoError(t, rsp.Err())
- msgs, start := rsp.Val()
- require.Equal(t, id5, start)
- require.Len(t, msgs, 2)
- require.Len(t, msgs[0].Values, 1)
- require.Equal(t, "3", msgs[0].Values["c"])
- }
-
- {
- rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
- Stream: streamName,
- Group: groupName,
- Consumer: consumer2,
- MinIdle: 10 * time.Millisecond,
- Start: id5,
- Count: 1,
- })
- require.NoError(t, rsp.Err())
- msgs, start := rsp.Val()
- require.Equal(t, "0-0", start)
- require.Len(t, msgs, 1)
- require.Len(t, msgs[0].Values, 1)
- require.Equal(t, "5", msgs[0].Values["e"])
- }
- })
-
- t.Run("XAUTOCLAIM with XDEL", func(t *testing.T) {
- streamName := "x"
- groupName := "grp"
- require.NoError(t, rdb.Del(ctx, streamName).Err())
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "1-0",
- Values: map[string]interface{}{"f": "v"},
- }).Err())
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "2-0",
- Values: map[string]interface{}{"f": "v"},
- }).Err())
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "3-0",
- Values: map[string]interface{}{"f": "v"},
- }).Err())
- require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
- {
- rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
- Group: groupName,
- Consumer: "Alice",
- Streams: []string{streamName, ">"},
- })
- require.NoError(t, rsp.Err())
- require.Len(t, rsp.Val(), 1)
- require.Len(t, rsp.Val()[0].Messages, 3)
- require.Equal(t, "1-0", rsp.Val()[0].Messages[0].ID)
- require.Equal(t, "v",
rsp.Val()[0].Messages[0].Values["f"])
- require.Equal(t, "2-0", rsp.Val()[0].Messages[1].ID)
- require.Equal(t, "v",
rsp.Val()[0].Messages[1].Values["f"])
- require.Equal(t, "3-0", rsp.Val()[0].Messages[2].ID)
- require.Equal(t, "v",
rsp.Val()[0].Messages[2].Values["f"])
- }
- {
- require.NoError(t, rdb.XDel(ctx, streamName,
"2-0").Err())
- cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName,
"Bob", 0, "0-0")
- require.NoError(t, cmd.Err())
- require.Equal(t, []interface{}{
- "0-0",
- []interface{}{
- []interface{}{
- "1-0",
- []interface{}{"f", "v"},
- },
- []interface{}{
- "3-0",
- []interface{}{"f", "v"},
- },
- },
- []interface{}{
- "2-0",
- },
- }, cmd.Val())
- }
- })
-
- t.Run("XAUTOCLAIM with XDEL and count", func(t *testing.T) {
- streamName := "x"
- groupName := "grp"
- require.NoError(t, rdb.Del(ctx, streamName).Err())
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "1-0",
- Values: map[string]interface{}{"f": "v"},
- }).Err())
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "2-0",
- Values: map[string]interface{}{"f": "v"},
- }).Err())
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
- Stream: streamName,
- ID: "3-0",
- Values: map[string]interface{}{"f": "v"},
- }).Err())
- require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
- {
- rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
- Group: groupName,
- Consumer: "Alice",
- Streams: []string{streamName, ">"},
- })
- require.NoError(t, rsp.Err())
- require.Len(t, rsp.Val(), 1)
- require.Len(t, rsp.Val()[0].Messages, 3)
- require.Equal(t, "1-0", rsp.Val()[0].Messages[0].ID)
- require.Equal(t, "v",
rsp.Val()[0].Messages[0].Values["f"])
- require.Equal(t, "2-0", rsp.Val()[0].Messages[1].ID)
- require.Equal(t, "v",
rsp.Val()[0].Messages[1].Values["f"])
- require.Equal(t, "3-0", rsp.Val()[0].Messages[2].ID)
- require.Equal(t, "v",
rsp.Val()[0].Messages[2].Values["f"])
- }
- {
- require.NoError(t, rdb.XDel(ctx, streamName,
"1-0").Err())
- require.NoError(t, rdb.XDel(ctx, streamName,
"2-0").Err())
- cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName,
"Bob", 0, "0-0", "COUNT", 1)
- require.NoError(t, cmd.Err())
- require.Equal(t, []interface{}{
- "2-0",
- []interface{}{},
- []interface{}{
- "1-0",
- },
- }, cmd.Val())
- }
- {
- cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName,
"Bob", 0, "2-0", "COUNT", 1)
- require.NoError(t, cmd.Err())
- require.Equal(t, []interface{}{
- "3-0",
- []interface{}{},
- []interface{}{
- "2-0",
- },
- }, cmd.Val())
- }
- {
- cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName,
"Bob", 0, "3-0", "COUNT", 1)
- require.NoError(t, cmd.Err())
- require.Equal(t, []interface{}{
- "0-0",
- []interface{}{
- []interface{}{
- "3-0",
- []interface{}{"f", "v"},
- },
- },
- []interface{}{},
- }, cmd.Val())
- }
- // assert_equal [XPENDING x grp - + 10 Alice] {}
- // add xpending to this test case when it is supported
- })
-
- t.Run("XAUTOCLAIM with out of range count", func(t *testing.T) {
- err := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
- Stream: "x",
- Group: "grp",
- Consumer: "Bob",
- MinIdle: 0,
- Start: "3-0",
- Count: 8070450532247928833,
- }).Err()
- require.Error(t, err)
- require.True(t, strings.HasPrefix(err.Error(), "ERR COUNT"))
- })
-
- t.Run("XAUTOCLAIM COUNT must be > 0", func(t *testing.T) {
- cmd := rdb.Do(ctx, "XAUTOCLAIM", "key", "group", "consumer", 1,
1, "COUNT", 0)
- require.Error(t, cmd.Err())
- require.Equal(t, "ERR COUNT must be > 0", cmd.Err().Error())
- })
+ // t.Run("XGROUP CREATE with different kinds of commands and XGROUP
DESTROY", func(t *testing.T) {
+ // streamName := "test-stream-a"
+ // groupName := "test-group-a"
+ // require.NoError(t, rdb.Del(ctx, streamName).Err())
+ // // No such stream (No such key)
+ // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$").Err())
+ // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$", "ENTRIESREAD", "10").Err())
+ // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$", "ENTRIESREAD").Err())
+ // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$", "MKSTREAM", "ENTRIESREAD").Err())
+ // require.NoError(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$", "MKSTREAM").Err())
+ // require.NoError(t, rdb.XInfoStream(ctx, streamName).Err())
+ // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$").Err())
+ // // Invalid syntax
+ // groupName = "test-group-b"
+ // require.Error(t, rdb.Do(ctx, "XGROUP", "CREAT", streamName,
groupName, "$").Err())
+ // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$", "ENTRIEREAD", "10").Err())
+ // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
groupName, "$", "ENTRIESREAD", "-10").Err())
+ // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName,
"1test-group-c", "$").Err())
+
+ // require.NoError(t, rdb.Del(ctx, "myStream").Err())
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"myStream", Values: []string{"iTeM", "1", "vAluE", "a"}}).Err())
+ // require.NoError(t, rdb.XGroupCreate(ctx, "myStream", "myGroup",
"$").Err())
+ // result, err := rdb.XGroupDestroy(ctx, "myStream",
"myGroup").Result()
+ // require.NoError(t, err)
+ // require.Equal(t, int64(1), result)
+ // result, err = rdb.XGroupDestroy(ctx, "myStream",
"myGroup").Result()
+ // require.NoError(t, err)
+ // require.Equal(t, int64(0), result)
+ // })
+
+ // t.Run("XGROUP CREATECONSUMER with different kinds of commands",
func(t *testing.T) {
+ // streamName := "test-stream"
+ // groupName := "test-group"
+ // consumerName := "test-consumer"
+ // require.NoError(t, rdb.Del(ctx, streamName).Err())
+ // //No such stream
+ // require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName,
groupName, consumerName).Err())
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "1-0",
+ // Values: []string{"data", "a"},
+ // }).Err())
+ // //no such group
+ // require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName,
groupName, consumerName).Err())
+ // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"$").Err())
+
+ // r := rdb.XGroupCreateConsumer(ctx, streamName, groupName,
consumerName).Val()
+ // require.Equal(t, int64(1), r)
+ // r = rdb.XGroupCreateConsumer(ctx, streamName, groupName,
consumerName).Val()
+ // require.Equal(t, int64(0), r)
+ // })
+
+ // t.Run("XGROUP DELCONSUMER with different kinds of commands", func(t
*testing.T) {
+ // streamName := "test-stream"
+ // groupName := "test-group"
+ // consumerName := "test-consumer"
+ // require.NoError(t, rdb.Del(ctx, streamName).Err())
+ // //No such stream
+ // require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName,
groupName, consumerName).Err())
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "1-0",
+ // Values: []string{"data", "a"},
+ // }).Err())
+ // //no such group
+ // require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName,
groupName, consumerName).Err())
+ // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"$").Err())
+ // require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName,
groupName, consumerName).Err())
+
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "*",
+ // Values: []string{"data1", "a1"},
+ // }).Err())
+ // require.NoError(t, rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ // Group: groupName,
+ // Consumer: consumerName,
+ // Streams: []string{streamName, ">"},
+ // Count: 1,
+ // NoAck: false,
+ // }).Err())
+ // ri, erri := rdb.XInfoGroups(ctx, streamName).Result()
+ // require.NoError(t, erri)
+ // require.Equal(t, int64(1), ri[0].Consumers)
+ // require.Equal(t, int64(1), ri[0].Pending)
+
+ // r, err := rdb.XGroupDelConsumer(ctx, streamName, groupName,
consumerName).Result()
+ // require.NoError(t, err)
+ // require.Equal(t, int64(1), r)
+ // ri, erri = rdb.XInfoGroups(ctx, streamName).Result()
+ // require.NoError(t, erri)
+ // require.Equal(t, int64(0), ri[0].Consumers)
+ // require.Equal(t, int64(0), ri[0].Pending)
+
+ // require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName,
groupName, consumerName).Err())
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "*",
+ // Values: []string{"data2", "a2"},
+ // }).Err())
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "*",
+ // Values: []string{"data3", "a3"},
+ // }).Err())
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "*",
+ // Values: []string{"data4", "a4"},
+ // }).Err())
+ // require.NoError(t, rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ // Group: groupName,
+ // Consumer: consumerName,
+ // Streams: []string{streamName, ">"},
+ // Count: 3,
+ // NoAck: false,
+ // }).Err())
+ // ri, erri = rdb.XInfoGroups(ctx, streamName).Result()
+ // require.NoError(t, erri)
+ // require.Equal(t, int64(1), ri[0].Consumers)
+ // require.Equal(t, int64(3), ri[0].Pending)
+ // r, err = rdb.XGroupDelConsumer(ctx, streamName, groupName,
consumerName).Result()
+ // require.NoError(t, err)
+ // require.Equal(t, int64(3), r)
+ // ri, erri = rdb.XInfoGroups(ctx, streamName).Result()
+ // require.NoError(t, erri)
+ // require.Equal(t, int64(0), ri[0].Consumers)
+ // require.Equal(t, int64(0), ri[0].Pending)
+ // })
+
+ // t.Run("XGROUP SETID with different kinds of commands", func(t
*testing.T) {
+ // streamName := "test-stream"
+ // groupName := "test-group"
+ // require.NoError(t, rdb.Del(ctx, streamName).Err())
+ // //No such stream
+ // require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName,
"$").Err())
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "1-0",
+ // Values: []string{"data", "a"},
+ // }).Err())
+ // //No such group
+ // require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName,
"$").Err())
+ // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"$").Err())
+
+ // require.NoError(t, rdb.XGroupSetID(ctx, streamName, groupName,
"0-0").Err())
+ // require.Error(t, rdb.Do(ctx, "xgroup", "setid", streamName,
groupName, "$", "entries", "100").Err())
+ // require.Error(t, rdb.Do(ctx, "xgroup", "setid", streamName,
groupName, "$", "entriesread", "-100").Err())
+ // require.NoError(t, rdb.Do(ctx, "xgroup", "setid", streamName,
groupName, "$", "entriesread", "100").Err())
+ // })
+
+ // t.Run("XINFO GROUPS and XINFO CONSUMERS", func(t *testing.T) {
+ // streamName := "test-stream"
+ // group1 := "t1"
+ // group2 := "t2"
+ // consumer1 := "c1"
+ // consumer2 := "c2"
+ // consumer3 := "c3"
+ // require.NoError(t, rdb.Del(ctx, streamName).Err())
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "1-0",
+ // Values: []string{"data", "a"},
+ // }).Err())
+ // require.NoError(t, rdb.XGroupCreate(ctx, streamName, group1,
"$").Err())
+ // r := rdb.XInfoGroups(ctx, streamName).Val()
+ // require.Equal(t, group1, r[0].Name)
+ // require.Equal(t, int64(0), r[0].Consumers)
+ // require.Equal(t, int64(0), r[0].Pending)
+ // require.Equal(t, "1-0", r[0].LastDeliveredID)
+ // require.Equal(t, int64(0), r[0].EntriesRead)
+ // require.Equal(t, int64(0), r[0].Lag)
+
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "2-0",
+ // Values: []string{"data1", "b"},
+ // }).Err())
+ // require.NoError(t, rdb.XGroupCreate(ctx, streamName, group2,
"$").Err())
+ // r = rdb.XInfoGroups(ctx, streamName).Val()
+ // require.Equal(t, group2, r[1].Name)
+ // require.Equal(t, "2-0", r[1].LastDeliveredID)
+
+ // require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName,
group1, consumer1).Err())
+ // require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName,
group1, consumer2).Err())
+ // require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName,
group2, consumer3).Err())
+ // r = rdb.XInfoGroups(ctx, streamName).Val()
+ // require.Equal(t, int64(2), r[0].Consumers)
+ // require.Equal(t, int64(1), r[1].Consumers)
+
+ // r1 := rdb.XInfoConsumers(ctx, streamName, group1).Val()
+ // require.Equal(t, consumer1, r1[0].Name)
+ // require.Equal(t, consumer2, r1[1].Name)
+ // r1 = rdb.XInfoConsumers(ctx, streamName, group2).Val()
+ // require.Equal(t, consumer3, r1[0].Name)
+ // })
+
+ // t.Run("XINFO after delete pending message and related consumer, for
issue #2350", func(t *testing.T) {
+ // streamName := "test-stream-2350"
+ // groupName := "test-group-2350"
+ // consumerName := "test-consumer-2350"
+ // require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName,
groupName, "$").Err())
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "*",
+ // Values: []string{"testing", "overflow"},
+ // }).Err())
+ // readRsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ // Group: groupName,
+ // Consumer: consumerName,
+ // Streams: []string{streamName, ">"},
+ // Count: 1,
+ // NoAck: false,
+ // })
+ // require.NoError(t, readRsp.Err())
+ // require.Len(t, readRsp.Val(), 1)
+ // streamRsp := readRsp.Val()[0]
+ // require.Len(t, streamRsp.Messages, 1)
+ // msgID := streamRsp.Messages[0]
+ // require.NoError(t, rdb.XAck(ctx, streamName, groupName,
msgID.ID).Err())
+ // require.NoError(t, rdb.XGroupDelConsumer(ctx, streamName,
groupName, consumerName).Err())
+ // infoRsp := rdb.XInfoGroups(ctx, streamName)
+ // require.NoError(t, infoRsp.Err())
+ // infoGroups := infoRsp.Val()
+ // require.Len(t, infoGroups, 1)
+ // infoGroup := infoGroups[0]
+ // require.Equal(t, groupName, infoGroup.Name)
+ // require.Equal(t, int64(0), infoGroup.Consumers)
+ // require.Equal(t, int64(0), infoGroup.Pending)
+ // require.Equal(t, msgID.ID, infoGroup.LastDeliveredID)
+ // })
+
+ // t.Run("XREAD After XGroupCreate and XGroupCreateConsumer, for issue
#2109", func(t *testing.T) {
+ // streamName := "test-stream"
+ // group := "group"
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "*",
+ // Values: []string{"data1", "b"},
+ // }).Err())
+ // require.NoError(t, rdb.XGroupCreate(ctx, streamName, group,
"0").Err())
+ // require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName,
group, "consumer").Err())
+ // require.NoError(t, rdb.XRead(ctx, &redis.XReadArgs{
+ // Streams: []string{streamName, "0"},
+ // }).Err())
+ // })
+
+ // t.Run("XREADGROUP with different kinds of commands", func(t
*testing.T) {
+ // streamName := "mystream"
+ // groupName := "mygroup"
+ // require.NoError(t, rdb.Del(ctx, streamName).Err())
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "1-0",
+ // Values: []string{"field1", "data1"},
+ // }).Err())
+ // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+ // consumerName := "myconsumer"
+ // r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ // Group: groupName,
+ // Consumer: consumerName,
+ // Streams: []string{streamName, ">"},
+ // Count: 1,
+ // NoAck: false,
+ // }).Result()
+ // require.NoError(t, err)
+ // require.Equal(t, []redis.XStream{{
+ // Stream: streamName,
+ // Messages: []redis.XMessage{{ID: "1-0", Values:
map[string]interface{}{"field1": "data1"}}},
+ // }}, r)
+
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "2-0",
+ // Values: []string{"field2", "data2"},
+ // }).Err())
+ // r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ // Group: groupName,
+ // Consumer: consumerName,
+ // Streams: []string{streamName, ">"},
+ // Count: 1,
+ // NoAck: false,
+ // }).Result()
+ // require.NoError(t, err)
+ // require.Equal(t, []redis.XStream{{
+ // Stream: streamName,
+ // Messages: []redis.XMessage{{ID: "2-0", Values:
map[string]interface{}{"field2": "data2"}}},
+ // }}, r)
+
+ // r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ // Group: groupName,
+ // Consumer: consumerName,
+ // Streams: []string{streamName, "0"},
+ // Count: 2,
+ // NoAck: false,
+ // }).Result()
+ // require.NoError(t, err)
+ // require.Equal(t, []redis.XStream{{
+ // Stream: streamName,
+ // Messages: []redis.XMessage{{ID: "1-0", Values:
map[string]interface{}{"field1": "data1"}},
+ // {ID: "2-0", Values:
map[string]interface{}{"field2": "data2"}}},
+ // }}, r)
+
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "3-0",
+ // Values: []string{"field3", "data3"},
+ // }).Err())
+ // r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ // Group: groupName,
+ // Consumer: consumerName,
+ // Streams: []string{streamName, ">"},
+ // Count: 1,
+ // NoAck: true,
+ // }).Result()
+ // require.NoError(t, err)
+ // require.Equal(t, []redis.XStream{{
+ // Stream: streamName,
+ // Messages: []redis.XMessage{{ID: "3-0", Values:
map[string]interface{}{"field3": "data3"}}},
+ // }}, r)
+ // r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ // Group: groupName,
+ // Consumer: consumerName,
+ // Streams: []string{streamName, "0"},
+ // Count: 2,
+ // NoAck: false,
+ // }).Result()
+ // require.NoError(t, err)
+ // require.Equal(t, []redis.XStream{{
+ // Stream: streamName,
+ // Messages: []redis.XMessage{{ID: "1-0", Values:
map[string]interface{}{"field1": "data1"}},
+ // {ID: "2-0", Values:
map[string]interface{}{"field2": "data2"}}},
+ // }}, r)
+
+ // c := srv.NewClient()
+ // defer func() { require.NoError(t, c.Close()) }()
+ // ch := make(chan []redis.XStream)
+ // go func() {
+ // ch <- c.XReadGroup(ctx, &redis.XReadGroupArgs{
+ // Group: groupName,
+ // Consumer: consumerName,
+ // Streams: []string{streamName, ">"},
+ // Count: 2,
+ // Block: 10 * time.Second,
+ // NoAck: false,
+ // }).Val()
+ // }()
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "4-0",
+ // Values: []string{"field4", "data4"},
+ // }).Err())
+ // r = <-ch
+ // require.Equal(t, []redis.XStream{{
+ // Stream: streamName,
+ // Messages: []redis.XMessage{{ID: "4-0", Values:
map[string]interface{}{"field4": "data4"}}},
+ // }}, r)
+
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "5-0",
+ // Values: []string{"field5", "data5"},
+ // }).Err())
+ // require.NoError(t, rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ // Group: groupName,
+ // Consumer: consumerName,
+ // Streams: []string{streamName, ">"},
+ // Count: 1,
+ // NoAck: false,
+ // }).Err())
+ // require.NoError(t, rdb.XDel(ctx, streamName, "5-0").Err())
+ // r, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ // Group: groupName,
+ // Consumer: consumerName,
+ // Streams: []string{streamName, "5"},
+ // Count: 1,
+ // NoAck: false,
+ // }).Result()
+ // require.NoError(t, err)
+ // require.Equal(t, []redis.XStream{{
+ // Stream: streamName,
+ // Messages: []redis.XMessage{{ID: "5-0", Values:
map[string]interface{}(nil)}},
+ // }}, r)
+ // })
+
+ // t.Run("Check xreadgroup fetches the newest data after create
consumer in the command", func(t *testing.T) {
+ // streamName := "mystream"
+ // groupName := "mygroup"
+ // require.NoError(t, rdb.Del(ctx, streamName).Err())
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "1-0",
+ // Values: []string{"field1", "data1"},
+ // }).Err())
+ // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+ // consumerName := "myconsumer"
+ // err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ // Group: groupName,
+ // Consumer: consumerName,
+ // Streams: []string{streamName, ">"},
+ // Count: 1,
+ // NoAck: false,
+ // }).Err()
+ // require.NoError(t, err)
+ // ri, erri := rdb.XInfoGroups(ctx, streamName).Result()
+ // require.NoError(t, erri)
+ // require.Equal(t, int64(1), ri[0].Consumers)
+ // })
+
+ // t.Run("XACK with different kinds of commands", func(t *testing.T) {
+ // streamName := "mystream"
+ // groupName := "mygroup"
+ // require.NoError(t, rdb.Del(ctx, streamName).Err())
+ // r, err := rdb.XAck(ctx, streamName, groupName, "0-0").Result()
+ // require.NoError(t, err)
+ // require.Equal(t, int64(0), r)
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "1-0",
+ // Values: []string{"field1", "data1"},
+ // }).Err())
+ // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+ // consumerName := "myconsumer"
+ // err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ // Group: groupName,
+ // Consumer: consumerName,
+ // Streams: []string{streamName, ">"},
+ // Count: 1,
+ // NoAck: false,
+ // }).Err()
+ // require.NoError(t, err)
+ // r, err = rdb.XAck(ctx, streamName, groupName, "1-0").Result()
+ // require.NoError(t, err)
+ // require.Equal(t, int64(1), r)
+
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "2-0",
+ // Values: []string{"field1", "data1"},
+ // }).Err())
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "3-0",
+ // Values: []string{"field1", "data1"},
+ // }).Err())
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "4-0",
+ // Values: []string{"field1", "data1"},
+ // }).Err())
+ // err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ // Group: groupName,
+ // Consumer: consumerName,
+ // Streams: []string{streamName, ">"},
+ // Count: 3,
+ // NoAck: false,
+ // }).Err()
+ // require.NoError(t, err)
+ // r, err = rdb.XAck(ctx, streamName, groupName, "2-0", "3-0",
"4-0").Result()
+ // require.NoError(t, err)
+ // require.Equal(t, int64(3), r)
+ // })
+
+ // t.Run("Simple XCLAIM command tests", func(t *testing.T) {
+ // streamName := "mystream"
+ // groupName := "mygroup"
+ // consumerName := "myconsumer"
+ // consumer1Name := "myconsumer1"
+ // require.NoError(t, rdb.Del(ctx, streamName).Err())
+
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "1-0",
+ // Values: []string{"field1", "data1"},
+ // }).Err())
+ // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+ // r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ // Group: groupName,
+ // Consumer: consumerName,
+ // Streams: []string{streamName, ">"},
+ // Count: 1,
+ // NoAck: false,
+ // }).Result()
+ // require.NoError(t, err)
+ // require.Equal(t, []redis.XStream{{
+ // Stream: streamName,
+ // Messages: []redis.XMessage{{ID: "1-0", Values:
map[string]interface{}{"field1": "data1"}}},
+ // }}, r)
+
+ // claimedMessages, err := rdb.XClaim(ctx, &redis.XClaimArgs{
+ // Stream: streamName,
+ // Group: groupName,
+ // Consumer: consumer1Name,
+ // MinIdle: 0,
+ // Messages: []string{"1-0"},
+ // }).Result()
+ // require.NoError(t, err)
+ // require.Len(t, claimedMessages, 1, "Expected to claim 1
message")
+ // require.Equal(t, "1-0", claimedMessages[0].ID, "Expected
claimed message ID to match")
+
+ // time.Sleep(2000 * time.Millisecond)
+ // minIdleTime := 1000 * time.Millisecond
+ // claimedMessages, err = rdb.XClaim(ctx, &redis.XClaimArgs{
+ // Stream: streamName,
+ // Group: groupName,
+ // Consumer: consumerName,
+ // MinIdle: minIdleTime,
+ // Messages: []string{"1-0"},
+ // }).Result()
+ // require.NoError(t, err)
+ // require.Len(t, claimedMessages, 1, "Expected to claim 1 message
if idle time is large enough")
+ // require.Equal(t, "1-0", claimedMessages[0].ID, "Expected
claimed message ID to match")
+
+ // minIdleTime = 60000 * time.Millisecond
+ // claimedMessages, err = rdb.XClaim(ctx, &redis.XClaimArgs{
+ // Stream: streamName,
+ // Group: groupName,
+ // Consumer: consumer1Name,
+ // MinIdle: minIdleTime,
+ // Messages: []string{"1-0"},
+ // }).Result()
+
+ // require.NoError(t, err)
+ // require.Empty(t, claimedMessages, "Expected no messages to be
claimed due to insufficient idle time")
+ // })
+
+ // t.Run("XCLAIM with different timing situations and options", func(t
*testing.T) {
+ // streamName := "mystream"
+ // groupName := "mygroup"
+ // consumerName := "myconsumer"
+ // consumer1Name := "myconsumer1"
+ // require.NoError(t, rdb.Del(ctx, streamName).Err())
+
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "1-0",
+ // Values: []string{"field1", "data1"},
+ // }).Err())
+ // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+ // r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ // Group: groupName,
+ // Consumer: consumerName,
+ // Streams: []string{streamName, ">"},
+ // Count: 1,
+ // NoAck: false,
+ // }).Result()
+ // require.NoError(t, err)
+ // require.Equal(t, []redis.XStream{{
+ // Stream: streamName,
+ // Messages: []redis.XMessage{{ID: "1-0", Values:
map[string]interface{}{"field1": "data1"}}},
+ // }}, r)
+
+ // rawClaimedMessages, err := rdb.Do(ctx, "XCLAIM", streamName,
groupName, consumer1Name, "0", "1-0", "IDLE", "5000").Result()
+ // require.NoError(t, err)
+ // messages, ok := rawClaimedMessages.([]interface{})
+ // require.True(t, ok, "Expected the result to be a slice of
interface{}")
+ // firstMsg, ok := messages[0].([]interface{})
+ // require.True(t, ok, "Expected message details to be a slice of
interface{}")
+ // msgID, ok := firstMsg[0].(string)
+ // require.True(t, ok, "Expected message ID to be a string")
+ // require.Equal(t, "1-0", msgID, "Expected claimed message ID to
match")
+
+ // claimedMessages, err := rdb.XClaim(ctx, &redis.XClaimArgs{
+ // Stream: streamName,
+ // Group: groupName,
+ // Consumer: consumerName,
+ // MinIdle: 2000 * time.Millisecond,
+ // Messages: []string{"1-0"},
+ // }).Result()
+ // require.NoError(t, err)
+ // require.Len(t, claimedMessages, 1, "Expected to claim 1 message
if idle time is large enough")
+ // require.Equal(t, "1-0", claimedMessages[0].ID, "Expected
claimed message ID to match")
+
+ // tenSecondsAgo := time.Now().Add(-10 * time.Second).UnixMilli()
+ // rawClaimedMessages, err = rdb.Do(ctx, "XCLAIM", streamName,
groupName, consumer1Name, "0", "1-0", "TIME", tenSecondsAgo).Result()
+ // require.NoError(t, err)
+ // messages, ok = rawClaimedMessages.([]interface{})
+ // require.True(t, ok, "Expected the result to be a slice of
interface{}")
+ // firstMsg, ok = messages[0].([]interface{})
+ // require.True(t, ok, "Expected message details to be a slice of
interface{}")
+ // msgID, ok = firstMsg[0].(string)
+ // require.True(t, ok, "Expected message ID to be a string")
+ // require.Equal(t, "1-0", msgID, "Expected claimed message ID to
match")
+
+ // claimedMessages, err = rdb.XClaim(ctx, &redis.XClaimArgs{
+ // Stream: streamName,
+ // Group: groupName,
+ // Consumer: consumerName,
+ // MinIdle: 5000 * time.Millisecond,
+ // Messages: []string{"1-0"},
+ // }).Result()
+ // require.NoError(t, err)
+ // require.Len(t, claimedMessages, 1, "Expected to claim 1 message
if idle time is large enough")
+ // require.Equal(t, "1-0", claimedMessages[0].ID, "Expected
claimed message ID to match")
+ // })
+
+ // t.Run("XCLAIM command with different options", func(t *testing.T) {
+ // streamName := "mystream"
+ // groupName := "mygroup"
+ // consumerName := "myconsumer"
+ // consumer1Name := "myconsumer1"
+
+ // require.NoError(t, rdb.Del(ctx, streamName).Err())
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "1-0",
+ // Values: []string{"field1", "data1"},
+ // }).Err())
+ // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+
+ // rawClaimedMessages, err := rdb.Do(ctx, "XCLAIM", streamName,
groupName, consumerName, "0", "1-0", "FORCE").Result()
+ // require.NoError(t, err)
+ // messages, ok := rawClaimedMessages.([]interface{})
+ // require.True(t, ok, "Expected the result to be a slice of
interface{}")
+ // firstMsg, ok := messages[0].([]interface{})
+ // require.True(t, ok, "Expected message details to be a slice of
interface{}")
+ // msgID, ok := firstMsg[0].(string)
+ // require.True(t, ok, "Expected message ID to be a string")
+ // require.Equal(t, "1-0", msgID, "Expected claimed message ID to
match")
+
+ // cmd := rdb.XClaimJustID(ctx, &redis.XClaimArgs{
+ // Stream: streamName,
+ // Group: groupName,
+ // Consumer: consumer1Name,
+ // MinIdle: 0,
+ // Messages: []string{"1-0"},
+ // })
+
+ // claimedIDs, err := cmd.Result()
+ // require.NoError(t, err)
+ // require.Len(t, claimedIDs, 1, "Expected to claim exactly one
message ID")
+ // require.Equal(t, "1-0", claimedIDs[0], "Expected claimed
message ID to match")
+ // })
+
+ // t.Run("XAUTOCLAIM can claim PEL items from another consume", func(t
*testing.T) {
+
+ // streamName := "mystream"
+ // groupName := "mygroup"
+ // var id1 string
+ // require.NoError(t, rdb.Del(ctx, streamName).Err())
+ // {
+ // rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "*",
+ // Values: []string{"a", "1"},
+ // })
+ // require.NoError(t, rsp.Err())
+ // id1 = rsp.Val()
+ // }
+ // var id2 string
+ // {
+ // rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "*",
+ // Values: []string{"b", "2"},
+ // })
+ // require.NoError(t, rsp.Err())
+ // id2 = rsp.Val()
+ // }
+ // var id3 string
+ // {
+ // rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "*",
+ // Values: []string{"c", "3"},
+ // })
+ // require.NoError(t, rsp.Err())
+ // id3 = rsp.Val()
+ // }
+ // var id4 string
+ // {
+ // rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "*",
+ // Values: []string{"d", "4"},
+ // })
+ // require.NoError(t, rsp.Err())
+ // id4 = rsp.Val()
+ // }
+
+ // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+
+ // consumer1 := "consumer1"
+ // consumer2 := "consumer2"
+ // {
+ // rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ // Group: groupName,
+ // Consumer: consumer1,
+ // Streams: []string{streamName, ">"},
+ // Count: 1,
+ // })
+ // require.NoError(t, rsp.Err())
+ // require.Len(t, rsp.Val(), 1)
+ // require.Len(t, rsp.Val()[0].Messages, 1)
+ // require.Equal(t, id1, rsp.Val()[0].Messages[0].ID)
+ // require.Len(t, rsp.Val()[0].Messages[0].Values, 1)
+ // require.Equal(t, "1",
rsp.Val()[0].Messages[0].Values["a"])
+ // }
+
+ // {
+ // time.Sleep(200 * time.Millisecond)
+ // rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
+ // Stream: streamName,
+ // Group: groupName,
+ // Consumer: consumer2,
+ // MinIdle: 10 * time.Millisecond,
+ // Count: 1,
+ // Start: "-",
+ // })
+ // require.NoError(t, rsp.Err())
+ // msgs, start := rsp.Val()
+ // require.Equal(t, "0-0", start)
+ // require.Len(t, msgs, 1)
+ // require.Len(t, msgs[0].Values, 1)
+ // require.Equal(t, "1", msgs[0].Values["a"])
+ // }
+
+ // {
+ // rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ // Group: groupName,
+ // Consumer: consumer1,
+ // Streams: []string{streamName, ">"},
+ // Count: 3,
+ // })
+ // require.NoError(t, rsp.Err())
+
+ // time.Sleep(time.Millisecond * 200)
+ // require.NoError(t, rdb.XDel(ctx, streamName, id2).Err())
+ // }
+
+ // {
+ // cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName,
consumer2, 10, "-", "COUNT", 3)
+ // require.NoError(t, cmd.Err())
+ // require.Equal(t, []interface{}{
+ // id4,
+ // []interface{}{
+ // []interface{}{
+ // id1,
+ // []interface{}{"a", "1"},
+ // },
+ // []interface{}{
+ // id3,
+ // []interface{}{"c", "3"},
+ // },
+ // },
+ // []interface{}{
+ // id2,
+ // },
+ // }, cmd.Val())
+ // }
+
+ // {
+ // time.Sleep(time.Millisecond * 200)
+ // require.NoError(t, rdb.XDel(ctx, streamName, id4).Err())
+ // rsp := rdb.XAutoClaimJustID(ctx, &redis.XAutoClaimArgs{
+ // Stream: streamName,
+ // Group: groupName,
+ // Consumer: consumer2,
+ // MinIdle: 10 * time.Millisecond,
+ // Start: "-",
+ // })
+ // require.NoError(t, rsp.Err())
+ // msgs, start := rsp.Val()
+ // require.Equal(t, "0-0", start)
+ // require.Len(t, msgs, 2)
+ // require.Equal(t, id1, msgs[0])
+ // require.Equal(t, id3, msgs[1])
+ // }
+ // })
+
+ // t.Run("XAUTOCLAIM as an iterator", func(t *testing.T) {
+ // streamName := "mystream"
+ // groupName := "mygroup"
+ // var id3, id5 string
+ // require.NoError(t, rdb.Del(ctx, streamName).Err())
+ // {
+ // rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "*",
+ // Values: []string{"a", "1"},
+ // })
+ // require.NoError(t, rsp.Err())
+ // }
+ // {
+ // rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "*",
+ // Values: []string{"b", "2"},
+ // })
+ // require.NoError(t, rsp.Err())
+ // }
+ // {
+ // rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "*",
+ // Values: []string{"c", "3"},
+ // })
+ // require.NoError(t, rsp.Err())
+ // id3 = rsp.Val()
+ // }
+ // {
+ // rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "*",
+ // Values: []string{"d", "4"},
+ // })
+ // require.NoError(t, rsp.Err())
+ // }
+ // {
+ // rsp := rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "*",
+ // Values: []string{"e", "5"},
+ // })
+ // require.NoError(t, rsp.Err())
+ // id5 = rsp.Val()
+ // }
+ // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+
+ // consumer1, consumer2 := "consumer1", "consumer2"
+ // {
+ // rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ // Group: groupName,
+ // Consumer: consumer1,
+ // Streams: []string{streamName, ">"},
+ // Count: 90,
+ // })
+ // require.NoError(t, rsp.Err())
+ // time.Sleep(200 * time.Millisecond)
+ // }
+ // {
+ // rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
+ // Stream: streamName,
+ // Group: groupName,
+ // Consumer: consumer2,
+ // MinIdle: 10 * time.Millisecond,
+ // Count: 2,
+ // Start: "-",
+ // })
+ // require.NoError(t, rsp.Err())
+ // msgs, start := rsp.Val()
+ // require.Equal(t, id3, start)
+ // require.Len(t, msgs, 2)
+ // require.Len(t, msgs[0].Values, 1)
+ // require.Equal(t, "1", msgs[0].Values["a"])
+ // }
+
+ // {
+ // rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
+ // Stream: streamName,
+ // Group: groupName,
+ // Consumer: consumer2,
+ // MinIdle: 10 * time.Millisecond,
+ // Start: id3,
+ // Count: 2,
+ // })
+ // require.NoError(t, rsp.Err())
+ // msgs, start := rsp.Val()
+ // require.Equal(t, id5, start)
+ // require.Len(t, msgs, 2)
+ // require.Len(t, msgs[0].Values, 1)
+ // require.Equal(t, "3", msgs[0].Values["c"])
+ // }
+
+ // {
+ // rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
+ // Stream: streamName,
+ // Group: groupName,
+ // Consumer: consumer2,
+ // MinIdle: 10 * time.Millisecond,
+ // Start: id5,
+ // Count: 1,
+ // })
+ // require.NoError(t, rsp.Err())
+ // msgs, start := rsp.Val()
+ // require.Equal(t, "0-0", start)
+ // require.Len(t, msgs, 1)
+ // require.Len(t, msgs[0].Values, 1)
+ // require.Equal(t, "5", msgs[0].Values["e"])
+ // }
+ // })
+
+ // t.Run("XAUTOCLAIM with XDEL", func(t *testing.T) {
+ // streamName := "x"
+ // groupName := "grp"
+ // require.NoError(t, rdb.Del(ctx, streamName).Err())
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "1-0",
+ // Values: map[string]interface{}{"f": "v"},
+ // }).Err())
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "2-0",
+ // Values: map[string]interface{}{"f": "v"},
+ // }).Err())
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "3-0",
+ // Values: map[string]interface{}{"f": "v"},
+ // }).Err())
+ // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+ // {
+ // rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ // Group: groupName,
+ // Consumer: "Alice",
+ // Streams: []string{streamName, ">"},
+ // })
+ // require.NoError(t, rsp.Err())
+ // require.Len(t, rsp.Val(), 1)
+ // require.Len(t, rsp.Val()[0].Messages, 3)
+ // require.Equal(t, "1-0", rsp.Val()[0].Messages[0].ID)
+ // require.Equal(t, "v",
rsp.Val()[0].Messages[0].Values["f"])
+ // require.Equal(t, "2-0", rsp.Val()[0].Messages[1].ID)
+ // require.Equal(t, "v",
rsp.Val()[0].Messages[1].Values["f"])
+ // require.Equal(t, "3-0", rsp.Val()[0].Messages[2].ID)
+ // require.Equal(t, "v",
rsp.Val()[0].Messages[2].Values["f"])
+ // }
+ // {
+ // require.NoError(t, rdb.XDel(ctx, streamName,
"2-0").Err())
+ // cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName,
"Bob", 0, "0-0")
+ // require.NoError(t, cmd.Err())
+ // require.Equal(t, []interface{}{
+ // "0-0",
+ // []interface{}{
+ // []interface{}{
+ // "1-0",
+ // []interface{}{"f", "v"},
+ // },
+ // []interface{}{
+ // "3-0",
+ // []interface{}{"f", "v"},
+ // },
+ // },
+ // []interface{}{
+ // "2-0",
+ // },
+ // }, cmd.Val())
+ // }
+ // })
+
+ // t.Run("XAUTOCLAIM with XDEL and count", func(t *testing.T) {
+ // streamName := "x"
+ // groupName := "grp"
+ // require.NoError(t, rdb.Del(ctx, streamName).Err())
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "1-0",
+ // Values: map[string]interface{}{"f": "v"},
+ // }).Err())
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "2-0",
+ // Values: map[string]interface{}{"f": "v"},
+ // }).Err())
+ // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ // Stream: streamName,
+ // ID: "3-0",
+ // Values: map[string]interface{}{"f": "v"},
+ // }).Err())
+ // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+ // {
+ // rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ // Group: groupName,
+ // Consumer: "Alice",
+ // Streams: []string{streamName, ">"},
+ // })
+ // require.NoError(t, rsp.Err())
+ // require.Len(t, rsp.Val(), 1)
+ // require.Len(t, rsp.Val()[0].Messages, 3)
+ // require.Equal(t, "1-0", rsp.Val()[0].Messages[0].ID)
+ // require.Equal(t, "v",
rsp.Val()[0].Messages[0].Values["f"])
+ // require.Equal(t, "2-0", rsp.Val()[0].Messages[1].ID)
+ // require.Equal(t, "v",
rsp.Val()[0].Messages[1].Values["f"])
+ // require.Equal(t, "3-0", rsp.Val()[0].Messages[2].ID)
+ // require.Equal(t, "v",
rsp.Val()[0].Messages[2].Values["f"])
+ // }
+ // {
+ // require.NoError(t, rdb.XDel(ctx, streamName,
"1-0").Err())
+ // require.NoError(t, rdb.XDel(ctx, streamName,
"2-0").Err())
+ // cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName,
"Bob", 0, "0-0", "COUNT", 1)
+ // require.NoError(t, cmd.Err())
+ // require.Equal(t, []interface{}{
+ // "2-0",
+ // []interface{}{},
+ // []interface{}{
+ // "1-0",
+ // },
+ // }, cmd.Val())
+ // }
+ // {
+ // cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName,
"Bob", 0, "2-0", "COUNT", 1)
+ // require.NoError(t, cmd.Err())
+ // require.Equal(t, []interface{}{
+ // "3-0",
+ // []interface{}{},
+ // []interface{}{
+ // "2-0",
+ // },
+ // }, cmd.Val())
+ // }
+ // {
+ // cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName,
"Bob", 0, "3-0", "COUNT", 1)
+ // require.NoError(t, cmd.Err())
+ // require.Equal(t, []interface{}{
+ // "0-0",
+ // []interface{}{
+ // []interface{}{
+ // "3-0",
+ // []interface{}{"f", "v"},
+ // },
+ // },
+ // []interface{}{},
+ // }, cmd.Val())
+ // }
+ // // assert_equal [XPENDING x grp - + 10 Alice] {}
+ // // add xpending to this test case when it is supported
+ // })
+
+ // t.Run("XAUTOCLAIM with out of range count", func(t *testing.T) {
+ // err := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{
+ // Stream: "x",
+ // Group: "grp",
+ // Consumer: "Bob",
+ // MinIdle: 0,
+ // Start: "3-0",
+ // Count: 8070450532247928833,
+ // }).Err()
+ // require.Error(t, err)
+ // require.True(t, strings.HasPrefix(err.Error(), "ERR COUNT"))
+ // })
+
+ // t.Run("XAUTOCLAIM COUNT must be > 0", func(t *testing.T) {
+ // cmd := rdb.Do(ctx, "XAUTOCLAIM", "key", "group", "consumer", 1,
1, "COUNT", 0)
+ // require.Error(t, cmd.Err())
+ // require.Equal(t, "ERR COUNT must be > 0", cmd.Err().Error())
+ // })
}
func parseStreamEntryID(id string) (ts int64, seqNum int64) {