This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 91b5478e fix(stream): add `KeyRangeGen` for `XRead` and `XReadGroup`
(#2657)
91b5478e is described below
commit 91b5478e8982d5814442def8d0850d0555cc69b2
Author: Edward Xu <[email protected]>
AuthorDate: Fri Nov 15 12:27:39 2024 +0800
fix(stream): add `KeyRangeGen` for `XRead` and `XReadGroup` (#2657)
Co-authored-by: Twice <[email protected]>
---
src/commands/cmd_stream.cc | 29 ++++++-
tests/gocase/unit/command/command_test.go | 134 ++++++++++++++++++++++++++++++
2 files changed, 161 insertions(+), 2 deletions(-)
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 0fe1cd3b..89650bda 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -33,6 +33,22 @@
#include "types/redis_stream.h"
namespace redis {
+namespace {
+// for XRead and XReadGroup stream range parse.
+CommandKeyRange ParseStreamReadRange(const std::vector<std::string> &args,
uint32_t start_offset) {
+ // assert here we must have a stream in args since it has been parsed.
+ auto stream_keyword_iter = std::find_if(std::next(args.cbegin(),
start_offset), args.cend(),
+ [](const std::string &arg) { return
util::EqualICase(arg, "streams"); });
+ int stream_pos = static_cast<int>(std::distance(args.cbegin(),
stream_keyword_iter));
+ int stream_size = static_cast<int>(args.size() - stream_pos) / 2;
+
+ CommandKeyRange range;
+ range.first_key = stream_pos + 1;
+ range.key_step = 1;
+ range.last_key = range.first_key + stream_size - 1;
+ return range;
+}
+} // namespace
class CommandXAck : public Commander {
public:
@@ -1404,6 +1420,10 @@ class CommandXRead : public Commander,
bufferevent_enable(bev, EV_READ);
}
+ static const inline CommandKeyRangeGen keyRangeGen = [](const
std::vector<std::string> &args) {
+ return ParseStreamReadRange(args, 0);
+ };
+
private:
std::vector<std::string> streams_;
std::vector<StreamEntryID> ids_;
@@ -1715,6 +1735,10 @@ class CommandXReadGroup : public Commander,
bufferevent_enable(bev, EV_READ);
}
+ static const inline CommandKeyRangeGen keyRangeGen = [](const
std::vector<std::string> &args) {
+ return ParseStreamReadRange(args, 4);
+ };
+
private:
std::vector<std::string> streams_;
std::vector<StreamEntryID> ids_;
@@ -1896,8 +1920,9 @@ REDIS_REGISTER_COMMANDS(Stream,
MakeCmdAttr<CommandXAck>("xack", -4, "write no-d
MakeCmdAttr<CommandXPending>("xpending", -3,
"read-only", 1, 1, 1),
MakeCmdAttr<CommandXRange>("xrange", -4, "read-only",
1, 1, 1),
MakeCmdAttr<CommandXRevRange>("xrevrange", -2,
"read-only", 1, 1, 1),
- MakeCmdAttr<CommandXRead>("xread", -4, "read-only
blocking", NO_KEY),
- MakeCmdAttr<CommandXReadGroup>("xreadgroup", -7,
"write blocking", NO_KEY),
+ MakeCmdAttr<CommandXRead>("xread", -4, "read-only
blocking", CommandXRead::keyRangeGen),
+ MakeCmdAttr<CommandXReadGroup>("xreadgroup", -7,
"write blocking",
+
CommandXReadGroup::keyRangeGen),
MakeCmdAttr<CommandXTrim>("xtrim", -4, "write
no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXSetId>("xsetid", -3, "write", 1,
1, 1))
diff --git a/tests/gocase/unit/command/command_test.go
b/tests/gocase/unit/command/command_test.go
index d6f233fa..27a6d536 100644
--- a/tests/gocase/unit/command/command_test.go
+++ b/tests/gocase/unit/command/command_test.go
@@ -289,4 +289,138 @@ func TestCommand(t *testing.T) {
require.Equal(t, "dst", vs[0])
require.Equal(t, "src", vs[1])
})
+
+ t.Run("COMMAND GETKEYS XREAD", func(t *testing.T) {
+ {
+ r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREAD",
+ "STREAMS", "k0-1", "k0-2", "0-0", "0-0")
+ vs, err := r.Slice()
+ require.NoError(t, err)
+ require.Len(t, vs, 2)
+ require.Equal(t, "k0-1", vs[0])
+ require.Equal(t, "k0-2", vs[1])
+ }
+
+ {
+ r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREAD",
"COUNT", "10",
+ "STREAMS", "k1-1", "k1-2", "0-0", "0-0")
+ vs, err := r.Slice()
+ require.NoError(t, err)
+ require.Len(t, vs, 2)
+ require.Equal(t, "k1-1", vs[0])
+ require.Equal(t, "k1-2", vs[1])
+ }
+
+ {
+ r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREAD",
"BLOCK", "1000",
+ "STREAMS", "k2-1", "k2-2", "0-0", "0-0")
+ vs, err := r.Slice()
+ require.NoError(t, err)
+ require.Len(t, vs, 2)
+ require.Equal(t, "k2-1", vs[0])
+ require.Equal(t, "k2-2", vs[1])
+ }
+
+ {
+ r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREAD",
"COUNT", "10",
+ "BLOCK", "1000", "STREAMS", "k3-1", "k3-2",
"0-0", "0-0")
+ vs, err := r.Slice()
+ require.NoError(t, err)
+ require.Len(t, vs, 2)
+ require.Equal(t, "k3-1", vs[0])
+ require.Equal(t, "k3-2", vs[1])
+ }
+ })
+
+ t.Run("COMMAND GETKEYS XREADGROUP", func(t *testing.T) {
+ {
+ r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP",
"GROUP", "group1", "consumer1",
+ "STREAMS", "gk0-1", "gk0-2", "0-0", "0-0")
+ vs, err := r.Slice()
+ require.NoError(t, err)
+ require.Len(t, vs, 2)
+ require.Equal(t, "gk0-1", vs[0])
+ require.Equal(t, "gk0-2", vs[1])
+ }
+
+ {
+ r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP",
"GROUP", "streams", "streams",
+ "STREAMS", "gk1-1", "gk1-2", "0-0", "0-0")
+ vs, err := r.Slice()
+ require.NoError(t, err)
+ require.Len(t, vs, 2)
+ require.Equal(t, "gk1-1", vs[0])
+ require.Equal(t, "gk1-2", vs[1])
+ }
+
+ {
+ r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP",
"GROUP", "group1", "consumer1",
+ "COUNT", "10", "STREAMS", "gk3-1", "gk3-2",
"0-0", "0-0")
+ vs, err := r.Slice()
+ require.NoError(t, err)
+ require.Len(t, vs, 2)
+ require.Equal(t, "gk3-1", vs[0])
+ require.Equal(t, "gk3-2", vs[1])
+ }
+
+ {
+ r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP",
"GROUP", "group1", "consumer1",
+ "BLOCK", "10", "STREAMS", "gk4-1", "gk4-2",
"0-0", "0-0")
+ vs, err := r.Slice()
+ require.NoError(t, err)
+ require.Len(t, vs, 2)
+ require.Equal(t, "gk4-1", vs[0])
+ require.Equal(t, "gk4-2", vs[1])
+ }
+
+ {
+ r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP",
"GROUP", "group1", "consumer1",
+ "NOACK", "STREAMS", "gk5-1", "gk5-2", "0-0",
"0-0")
+ vs, err := r.Slice()
+ require.NoError(t, err)
+ require.Len(t, vs, 2)
+ require.Equal(t, "gk5-1", vs[0])
+ require.Equal(t, "gk5-2", vs[1])
+ }
+
+ {
+ r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP",
"GROUP", "group1", "consumer1",
+ "COUNT", "10", "NOACK", "STREAMS", "gk6-1",
"gk6-2", "0-0", "0-0")
+ vs, err := r.Slice()
+ require.NoError(t, err)
+ require.Len(t, vs, 2)
+ require.Equal(t, "gk6-1", vs[0])
+ require.Equal(t, "gk6-2", vs[1])
+ }
+
+ {
+ r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP",
"GROUP", "group1", "consumer1",
+ "BLOCK", "1000", "NOACK", "STREAMS", "gk7-1",
"gk7-2", "0-0", "0-0")
+ vs, err := r.Slice()
+ require.NoError(t, err)
+ require.Len(t, vs, 2)
+ require.Equal(t, "gk7-1", vs[0])
+ require.Equal(t, "gk7-2", vs[1])
+ }
+
+ {
+ r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP",
"GROUP", "group1", "consumer1",
+ "COUNT", "10", "BLOCK", "1000", "STREAMS",
"gk8-1", "gk8-2", "0-0", "0-0")
+ vs, err := r.Slice()
+ require.NoError(t, err)
+ require.Len(t, vs, 2)
+ require.Equal(t, "gk8-1", vs[0])
+ require.Equal(t, "gk8-2", vs[1])
+ }
+
+ {
+ r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP",
"GROUP", "group1", "consumer1",
+ "COUNT", "10", "BLOCK", "1000", "NOACK",
"STREAMS", "gk9-1", "gk9-2", "0-0", "0-0")
+ vs, err := r.Slice()
+ require.NoError(t, err)
+ require.Len(t, vs, 2)
+ require.Equal(t, "gk9-1", vs[0])
+ require.Equal(t, "gk9-2", vs[1])
+ }
+ })
}