This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 91b5478e fix(stream): add `KeyRangeGen` for `XRead` and `XReadGroup` 
(#2657)
91b5478e is described below

commit 91b5478e8982d5814442def8d0850d0555cc69b2
Author: Edward Xu <[email protected]>
AuthorDate: Fri Nov 15 12:27:39 2024 +0800

    fix(stream): add `KeyRangeGen` for `XRead` and `XReadGroup` (#2657)
    
    Co-authored-by: Twice <[email protected]>
---
 src/commands/cmd_stream.cc                |  29 ++++++-
 tests/gocase/unit/command/command_test.go | 134 ++++++++++++++++++++++++++++++
 2 files changed, 161 insertions(+), 2 deletions(-)

diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 0fe1cd3b..89650bda 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -33,6 +33,22 @@
 #include "types/redis_stream.h"
 
 namespace redis {
+namespace {
+// for XRead and XReadGroup stream range parse.
+CommandKeyRange ParseStreamReadRange(const std::vector<std::string> &args, 
uint32_t start_offset) {
+  // assert here we must have a stream in args since it has been parsed.
+  auto stream_keyword_iter = std::find_if(std::next(args.cbegin(), 
start_offset), args.cend(),
+                                          [](const std::string &arg) { return 
util::EqualICase(arg, "streams"); });
+  int stream_pos = static_cast<int>(std::distance(args.cbegin(), 
stream_keyword_iter));
+  int stream_size = static_cast<int>(args.size() - stream_pos) / 2;
+
+  CommandKeyRange range;
+  range.first_key = stream_pos + 1;
+  range.key_step = 1;
+  range.last_key = range.first_key + stream_size - 1;
+  return range;
+}
+}  // namespace
 
 class CommandXAck : public Commander {
  public:
@@ -1404,6 +1420,10 @@ class CommandXRead : public Commander,
     bufferevent_enable(bev, EV_READ);
   }
 
+  static const inline CommandKeyRangeGen keyRangeGen = [](const 
std::vector<std::string> &args) {
+    return ParseStreamReadRange(args, 0);
+  };
+
  private:
   std::vector<std::string> streams_;
   std::vector<StreamEntryID> ids_;
@@ -1715,6 +1735,10 @@ class CommandXReadGroup : public Commander,
     bufferevent_enable(bev, EV_READ);
   }
 
+  static const inline CommandKeyRangeGen keyRangeGen = [](const 
std::vector<std::string> &args) {
+    return ParseStreamReadRange(args, 4);
+  };
+
  private:
   std::vector<std::string> streams_;
   std::vector<StreamEntryID> ids_;
@@ -1896,8 +1920,9 @@ REDIS_REGISTER_COMMANDS(Stream, 
MakeCmdAttr<CommandXAck>("xack", -4, "write no-d
                         MakeCmdAttr<CommandXPending>("xpending", -3, 
"read-only", 1, 1, 1),
                         MakeCmdAttr<CommandXRange>("xrange", -4, "read-only", 
1, 1, 1),
                         MakeCmdAttr<CommandXRevRange>("xrevrange", -2, 
"read-only", 1, 1, 1),
-                        MakeCmdAttr<CommandXRead>("xread", -4, "read-only 
blocking", NO_KEY),
-                        MakeCmdAttr<CommandXReadGroup>("xreadgroup", -7, 
"write blocking", NO_KEY),
+                        MakeCmdAttr<CommandXRead>("xread", -4, "read-only 
blocking", CommandXRead::keyRangeGen),
+                        MakeCmdAttr<CommandXReadGroup>("xreadgroup", -7, 
"write blocking",
+                                                       
CommandXReadGroup::keyRangeGen),
                         MakeCmdAttr<CommandXTrim>("xtrim", -4, "write 
no-dbsize-check", 1, 1, 1),
                         MakeCmdAttr<CommandXSetId>("xsetid", -3, "write", 1, 
1, 1))
 
diff --git a/tests/gocase/unit/command/command_test.go 
b/tests/gocase/unit/command/command_test.go
index d6f233fa..27a6d536 100644
--- a/tests/gocase/unit/command/command_test.go
+++ b/tests/gocase/unit/command/command_test.go
@@ -289,4 +289,138 @@ func TestCommand(t *testing.T) {
                require.Equal(t, "dst", vs[0])
                require.Equal(t, "src", vs[1])
        })
+
+       t.Run("COMMAND GETKEYS XREAD", func(t *testing.T) {
+               {
+                       r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREAD",
+                               "STREAMS", "k0-1", "k0-2", "0-0", "0-0")
+                       vs, err := r.Slice()
+                       require.NoError(t, err)
+                       require.Len(t, vs, 2)
+                       require.Equal(t, "k0-1", vs[0])
+                       require.Equal(t, "k0-2", vs[1])
+               }
+
+               {
+                       r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREAD", 
"COUNT", "10",
+                               "STREAMS", "k1-1", "k1-2", "0-0", "0-0")
+                       vs, err := r.Slice()
+                       require.NoError(t, err)
+                       require.Len(t, vs, 2)
+                       require.Equal(t, "k1-1", vs[0])
+                       require.Equal(t, "k1-2", vs[1])
+               }
+
+               {
+                       r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREAD", 
"BLOCK", "1000",
+                               "STREAMS", "k2-1", "k2-2", "0-0", "0-0")
+                       vs, err := r.Slice()
+                       require.NoError(t, err)
+                       require.Len(t, vs, 2)
+                       require.Equal(t, "k2-1", vs[0])
+                       require.Equal(t, "k2-2", vs[1])
+               }
+
+               {
+                       r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREAD", 
"COUNT", "10",
+                               "BLOCK", "1000", "STREAMS", "k3-1", "k3-2", 
"0-0", "0-0")
+                       vs, err := r.Slice()
+                       require.NoError(t, err)
+                       require.Len(t, vs, 2)
+                       require.Equal(t, "k3-1", vs[0])
+                       require.Equal(t, "k3-2", vs[1])
+               }
+       })
+
+       t.Run("COMMAND GETKEYS XREADGROUP", func(t *testing.T) {
+               {
+                       r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", 
"GROUP", "group1", "consumer1",
+                               "STREAMS", "gk0-1", "gk0-2", "0-0", "0-0")
+                       vs, err := r.Slice()
+                       require.NoError(t, err)
+                       require.Len(t, vs, 2)
+                       require.Equal(t, "gk0-1", vs[0])
+                       require.Equal(t, "gk0-2", vs[1])
+               }
+
+               {
+                       r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", 
"GROUP", "streams", "streams",
+                               "STREAMS", "gk1-1", "gk1-2", "0-0", "0-0")
+                       vs, err := r.Slice()
+                       require.NoError(t, err)
+                       require.Len(t, vs, 2)
+                       require.Equal(t, "gk1-1", vs[0])
+                       require.Equal(t, "gk1-2", vs[1])
+               }
+
+               {
+                       r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", 
"GROUP", "group1", "consumer1",
+                               "COUNT", "10", "STREAMS", "gk3-1", "gk3-2", 
"0-0", "0-0")
+                       vs, err := r.Slice()
+                       require.NoError(t, err)
+                       require.Len(t, vs, 2)
+                       require.Equal(t, "gk3-1", vs[0])
+                       require.Equal(t, "gk3-2", vs[1])
+               }
+
+               {
+                       r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", 
"GROUP", "group1", "consumer1",
+                               "BLOCK", "10", "STREAMS", "gk4-1", "gk4-2", 
"0-0", "0-0")
+                       vs, err := r.Slice()
+                       require.NoError(t, err)
+                       require.Len(t, vs, 2)
+                       require.Equal(t, "gk4-1", vs[0])
+                       require.Equal(t, "gk4-2", vs[1])
+               }
+
+               {
+                       r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", 
"GROUP", "group1", "consumer1",
+                               "NOACK", "STREAMS", "gk5-1", "gk5-2", "0-0", 
"0-0")
+                       vs, err := r.Slice()
+                       require.NoError(t, err)
+                       require.Len(t, vs, 2)
+                       require.Equal(t, "gk5-1", vs[0])
+                       require.Equal(t, "gk5-2", vs[1])
+               }
+
+               {
+                       r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", 
"GROUP", "group1", "consumer1",
+                               "COUNT", "10", "NOACK", "STREAMS", "gk6-1", 
"gk6-2", "0-0", "0-0")
+                       vs, err := r.Slice()
+                       require.NoError(t, err)
+                       require.Len(t, vs, 2)
+                       require.Equal(t, "gk6-1", vs[0])
+                       require.Equal(t, "gk6-2", vs[1])
+               }
+
+               {
+                       r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", 
"GROUP", "group1", "consumer1",
+                               "BLOCK", "1000", "NOACK", "STREAMS", "gk7-1", 
"gk7-2", "0-0", "0-0")
+                       vs, err := r.Slice()
+                       require.NoError(t, err)
+                       require.Len(t, vs, 2)
+                       require.Equal(t, "gk7-1", vs[0])
+                       require.Equal(t, "gk7-2", vs[1])
+               }
+
+               {
+                       r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", 
"GROUP", "group1", "consumer1",
+                               "COUNT", "10", "BLOCK", "1000", "STREAMS", 
"gk8-1", "gk8-2", "0-0", "0-0")
+                       vs, err := r.Slice()
+                       require.NoError(t, err)
+                       require.Len(t, vs, 2)
+                       require.Equal(t, "gk8-1", vs[0])
+                       require.Equal(t, "gk8-2", vs[1])
+               }
+
+               {
+                       r := rdb.Do(ctx, "COMMAND", "GETKEYS", "XREADGROUP", 
"GROUP", "group1", "consumer1",
+                               "COUNT", "10", "BLOCK", "1000", "NOACK", 
"STREAMS", "gk9-1", "gk9-2", "0-0", "0-0")
+                       vs, err := r.Slice()
+                       require.NoError(t, err)
+                       require.Len(t, vs, 2)
+                       require.Equal(t, "gk9-1", vs[0])
+                       require.Equal(t, "gk9-2", vs[1])
+               }
+       })
 }

Reply via email to