This is an automated email from the ASF dual-hosted git repository. caipengbo pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kvrocks.git
commit 98ae7ad8dbfcfb4b2a285b6e7489fa6ae789892e Author: Myth <[email protected]> AuthorDate: Sun Mar 3 19:59:00 2024 +0800 Drop the stream XGROUP command in 2.8 since it's not ready to use --- src/commands/cmd_stream.cc | 2 +- tests/gocase/unit/type/stream/stream_test.go | 266 +++++++++++++-------------- 2 files changed, 134 insertions(+), 134 deletions(-) diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc index 76e2146b..662469a8 100644 --- a/src/commands/cmd_stream.cc +++ b/src/commands/cmd_stream.cc @@ -1191,7 +1191,7 @@ class CommandXSetId : public Commander { REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandXAdd>("xadd", -5, "write", 1, 1, 1), MakeCmdAttr<CommandXDel>("xdel", -3, "write no-dbsize-check", 1, 1, 1), - MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2, 2, 1), + // MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2, 2, 1), MakeCmdAttr<CommandXLen>("xlen", -2, "read-only", 1, 1, 1), MakeCmdAttr<CommandXInfo>("xinfo", -2, "read-only", 0, 0, 0), MakeCmdAttr<CommandXRange>("xrange", -4, "read-only", 1, 1, 1), diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index 7bbce02b..56bf5309 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -867,139 +867,139 @@ func TestStreamOffset(t *testing.T) { require.EqualValues(t, providedSeqNum, seqNum) }) - t.Run("XGROUP CREATE with different kinds of commands and XGROUP DESTROY", func(t *testing.T) { - streamName := "test-stream-a" - groupName := "test-group-a" - require.NoError(t, rdb.Del(ctx, streamName).Err()) - // No such stream (No such key) - require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$").Err()) - require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIESREAD", "10").Err()) - require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIESREAD").Err()) - require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "MKSTREAM", "ENTRIESREAD").Err()) - require.NoError(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "MKSTREAM").Err()) - require.NoError(t, rdb.XInfoStream(ctx, streamName).Err()) - require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$").Err()) - // Invalid syntax - groupName = "test-group-b" - require.Error(t, rdb.Do(ctx, "XGROUP", "CREAT", streamName, groupName, "$").Err()) - require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIEREAD", "10").Err()) - require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIESREAD", "-10").Err()) - require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, "1test-group-c", "$").Err()) - - require.NoError(t, rdb.Del(ctx, "myStream").Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "myStream", Values: []string{"iTeM", "1", "vAluE", "a"}}).Err()) - require.NoError(t, rdb.XGroupCreate(ctx, "myStream", "myGroup", "$").Err()) - result, err := rdb.XGroupDestroy(ctx, "myStream", "myGroup").Result() - require.NoError(t, err) - require.Equal(t, int64(1), result) - result, err = rdb.XGroupDestroy(ctx, "myStream", "myGroup").Result() - require.NoError(t, err) - require.Equal(t, int64(0), result) - }) - - t.Run("XGROUP CREATECONSUMER with different kinds of commands", func(t *testing.T) { - streamName := "test-stream" - groupName := "test-group" - consumerName := "test-consumer" - require.NoError(t, rdb.Del(ctx, streamName).Err()) - //No such stream - require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "1-0", - Values: []string{"data", "a"}, - }).Err()) - //no such group - require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err()) - require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "$").Err()) - - r := rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Val() - require.Equal(t, int64(1), r) - r = rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Val() - require.Equal(t, int64(0), r) - }) - - t.Run("XGROUP SETID with different kinds of commands", func(t *testing.T) { - streamName := "test-stream" - groupName := "test-group" - require.NoError(t, rdb.Del(ctx, streamName).Err()) - //No such stream - require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName, "$").Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "1-0", - Values: []string{"data", "a"}, - }).Err()) - //No such group - require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName, "$").Err()) - require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "$").Err()) - - require.NoError(t, rdb.XGroupSetID(ctx, streamName, groupName, "0-0").Err()) - require.Error(t, rdb.Do(ctx, "xgroup", "setid", streamName, groupName, "$", "entries", "100").Err()) - require.Error(t, rdb.Do(ctx, "xgroup", "setid", streamName, groupName, "$", "entriesread", "-100").Err()) - require.NoError(t, rdb.Do(ctx, "xgroup", "setid", streamName, groupName, "$", "entriesread", "100").Err()) - }) - - t.Run("XINFO GROUPS and XINFO CONSUMERS", func(t *testing.T) { - streamName := "test-stream" - group1 := "t1" - group2 := "t2" - consumer1 := "c1" - consumer2 := "c2" - consumer3 := "c3" - require.NoError(t, rdb.Del(ctx, streamName).Err()) - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "1-0", - Values: []string{"data", "a"}, - }).Err()) - require.NoError(t, rdb.XGroupCreate(ctx, streamName, group1, "$").Err()) - r := rdb.XInfoGroups(ctx, streamName).Val() - require.Equal(t, group1, r[0].Name) - require.Equal(t, int64(0), r[0].Consumers) - require.Equal(t, int64(0), r[0].Pending) - require.Equal(t, "1-0", r[0].LastDeliveredID) - require.Equal(t, int64(0), r[0].EntriesRead) - require.Equal(t, int64(0), r[0].Lag) - - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "2-0", - Values: []string{"data1", "b"}, - }).Err()) - require.NoError(t, rdb.XGroupCreate(ctx, streamName, group2, "$").Err()) - r = rdb.XInfoGroups(ctx, streamName).Val() - require.Equal(t, group2, r[1].Name) - require.Equal(t, "2-0", r[1].LastDeliveredID) - - require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group1, consumer1).Err()) - require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group1, consumer2).Err()) - require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group2, consumer3).Err()) - r = rdb.XInfoGroups(ctx, streamName).Val() - require.Equal(t, int64(2), r[0].Consumers) - require.Equal(t, int64(1), r[1].Consumers) - - r1 := rdb.XInfoConsumers(ctx, streamName, group1).Val() - require.Equal(t, consumer1, r1[0].Name) - require.Equal(t, consumer2, r1[1].Name) - r1 = rdb.XInfoConsumers(ctx, streamName, group2).Val() - require.Equal(t, consumer3, r1[0].Name) - }) - - t.Run("XREAD After XGroupCreate and XGroupCreateConsumer, for issue #2109", func(t *testing.T) { - streamName := "test-stream" - group := "group" - require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ - Stream: streamName, - ID: "*", - Values: []string{"data1", "b"}, - }).Err()) - require.NoError(t, rdb.XGroupCreate(ctx, streamName, group, "0").Err()) - require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group, "consumer").Err()) - require.NoError(t, rdb.XRead(ctx, &redis.XReadArgs{ - Streams: []string{streamName, "0"}, - }).Err()) - }) + // t.Run("XGROUP CREATE with different kinds of commands and XGROUP DESTROY", func(t *testing.T) { + // streamName := "test-stream-a" + // groupName := "test-group-a" + // require.NoError(t, rdb.Del(ctx, streamName).Err()) + // // No such stream (No such key) + // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$").Err()) + // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIESREAD", "10").Err()) + // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIESREAD").Err()) + // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "MKSTREAM", "ENTRIESREAD").Err()) + // require.NoError(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "MKSTREAM").Err()) + // require.NoError(t, rdb.XInfoStream(ctx, streamName).Err()) + // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$").Err()) + // // Invalid syntax + // groupName = "test-group-b" + // require.Error(t, rdb.Do(ctx, "XGROUP", "CREAT", streamName, groupName, "$").Err()) + // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIEREAD", "10").Err()) + // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, groupName, "$", "ENTRIESREAD", "-10").Err()) + // require.Error(t, rdb.Do(ctx, "XGROUP", "CREATE", streamName, "1test-group-c", "$").Err()) + + // require.NoError(t, rdb.Del(ctx, "myStream").Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: "myStream", Values: []string{"iTeM", "1", "vAluE", "a"}}).Err()) + // require.NoError(t, rdb.XGroupCreate(ctx, "myStream", "myGroup", "$").Err()) + // result, err := rdb.XGroupDestroy(ctx, "myStream", "myGroup").Result() + // require.NoError(t, err) + // require.Equal(t, int64(1), result) + // result, err = rdb.XGroupDestroy(ctx, "myStream", "myGroup").Result() + // require.NoError(t, err) + // require.Equal(t, int64(0), result) + // }) + + // t.Run("XGROUP CREATECONSUMER with different kinds of commands", func(t *testing.T) { + // streamName := "test-stream" + // groupName := "test-group" + // consumerName := "test-consumer" + // require.NoError(t, rdb.Del(ctx, streamName).Err()) + // //No such stream + // require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "1-0", + // Values: []string{"data", "a"}, + // }).Err()) + // //no such group + // require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err()) + // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "$").Err()) + + // r := rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Val() + // require.Equal(t, int64(1), r) + // r = rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Val() + // require.Equal(t, int64(0), r) + // }) + + // t.Run("XGROUP SETID with different kinds of commands", func(t *testing.T) { + // streamName := "test-stream" + // groupName := "test-group" + // require.NoError(t, rdb.Del(ctx, streamName).Err()) + // //No such stream + // require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName, "$").Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "1-0", + // Values: []string{"data", "a"}, + // }).Err()) + // //No such group + // require.Error(t, rdb.XGroupSetID(ctx, streamName, groupName, "$").Err()) + // require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "$").Err()) + + // require.NoError(t, rdb.XGroupSetID(ctx, streamName, groupName, "0-0").Err()) + // require.Error(t, rdb.Do(ctx, "xgroup", "setid", streamName, groupName, "$", "entries", "100").Err()) + // require.Error(t, rdb.Do(ctx, "xgroup", "setid", streamName, groupName, "$", "entriesread", "-100").Err()) + // require.NoError(t, rdb.Do(ctx, "xgroup", "setid", streamName, groupName, "$", "entriesread", "100").Err()) + // }) + + // t.Run("XINFO GROUPS and XINFO CONSUMERS", func(t *testing.T) { + // streamName := "test-stream" + // group1 := "t1" + // group2 := "t2" + // consumer1 := "c1" + // consumer2 := "c2" + // consumer3 := "c3" + // require.NoError(t, rdb.Del(ctx, streamName).Err()) + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "1-0", + // Values: []string{"data", "a"}, + // }).Err()) + // require.NoError(t, rdb.XGroupCreate(ctx, streamName, group1, "$").Err()) + // r := rdb.XInfoGroups(ctx, streamName).Val() + // require.Equal(t, group1, r[0].Name) + // require.Equal(t, int64(0), r[0].Consumers) + // require.Equal(t, int64(0), r[0].Pending) + // require.Equal(t, "1-0", r[0].LastDeliveredID) + // require.Equal(t, int64(0), r[0].EntriesRead) + // require.Equal(t, int64(0), r[0].Lag) + + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "2-0", + // Values: []string{"data1", "b"}, + // }).Err()) + // require.NoError(t, rdb.XGroupCreate(ctx, streamName, group2, "$").Err()) + // r = rdb.XInfoGroups(ctx, streamName).Val() + // require.Equal(t, group2, r[1].Name) + // require.Equal(t, "2-0", r[1].LastDeliveredID) + + // require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group1, consumer1).Err()) + // require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group1, consumer2).Err()) + // require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group2, consumer3).Err()) + // r = rdb.XInfoGroups(ctx, streamName).Val() + // require.Equal(t, int64(2), r[0].Consumers) + // require.Equal(t, int64(1), r[1].Consumers) + + // r1 := rdb.XInfoConsumers(ctx, streamName, group1).Val() + // require.Equal(t, consumer1, r1[0].Name) + // require.Equal(t, consumer2, r1[1].Name) + // r1 = rdb.XInfoConsumers(ctx, streamName, group2).Val() + // require.Equal(t, consumer3, r1[0].Name) + // }) + + // t.Run("XREAD After XGroupCreate and XGroupCreateConsumer, for issue #2109", func(t *testing.T) { + // streamName := "test-stream" + // group := "group" + // require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + // Stream: streamName, + // ID: "*", + // Values: []string{"data1", "b"}, + // }).Err()) + // require.NoError(t, rdb.XGroupCreate(ctx, streamName, group, "0").Err()) + // require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group, "consumer").Err()) + // require.NoError(t, rdb.XRead(ctx, &redis.XReadArgs{ + // Streams: []string{streamName, "0"}, + // }).Err()) + // }) } func parseStreamEntryID(id string) (ts int64, seqNum int64) {
