This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 68523ac3 feat(stream): add support of XPENDING command (#2387)
68523ac3 is described below

commit 68523ac35dcb2d712dfac363c4cabe774ff927f6
Author: Hauru <[email protected]>
AuthorDate: Sun Jul 21 22:01:50 2024 +0800

    feat(stream): add support of XPENDING command (#2387)
    
    Co-authored-by: Twice <[email protected]>
---
 src/commands/cmd_stream.cc                   | 93 ++++++++++++++++++++++++++++
 src/types/redis_stream.cc                    | 88 ++++++++++++++++++++++++++
 src/types/redis_stream.h                     |  2 +
 src/types/redis_stream_base.h                | 28 +++++++++
 tests/gocase/unit/type/stream/stream_test.go | 76 +++++++++++++++++++++++
 5 files changed, 287 insertions(+)

diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index ea448332..081933a1 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -28,6 +28,7 @@
 #include "error_constants.h"
 #include "event_util.h"
 #include "server/server.h"
+#include "status.h"
 #include "time_util.h"
 #include "types/redis_stream.h"
 
@@ -824,6 +825,97 @@ class CommandXInfo : public Commander {
   }
 };
 
+class CommandXPending : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    CommandParser parser(args, 1);
+    stream_name_ = GET_OR_RET(parser.TakeStr());
+    group_name_ = GET_OR_RET(parser.TakeStr());
+    if (parser.EatEqICase("idle")) {
+      options_.idle_time = GET_OR_RET(parser.TakeInt<uint64_t>());
+      options_.with_time = true;
+    }
+
+    if (parser.Good()) {
+      std::string start_id, end_id;
+      start_id = GET_OR_RET(parser.TakeStr());
+      end_id = GET_OR_RET(parser.TakeStr());
+      if (start_id != "-") {
+        auto s = ParseStreamEntryID(start_id, &options_.start_id);
+        if (!s.IsOK()) {
+          return s;
+        }
+      }
+
+      if (end_id != "+") {
+        auto s = ParseStreamEntryID(start_id, &options_.end_id);
+        if (!s.IsOK()) {
+          return s;
+        }
+      }
+
+      options_.count = GET_OR_RET(parser.TakeInt<uint64_t>());
+      options_.with_count = true;
+      if (parser.Good()) {
+        options_.consumer = GET_OR_RET(parser.TakeStr());
+        options_.with_consumer = true;
+      }
+    }
+    return Status::OK();
+  }
+
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    redis::Stream stream_db(srv->storage, conn->GetNamespace());
+    std::vector<std::pair<std::string, int>> pending_infos;
+    StreamGetPendingEntryResult results;
+    options_.stream_name = stream_name_;
+    options_.group_name = group_name_;
+    std::vector<StreamNACK> ext_results;
+    auto s = stream_db.GetPendingEntries(options_, results, ext_results);
+    if (!s.ok()) {
+      return {Status::RedisExecErr, s.ToString()};
+    }
+    if (options_.with_count) {
+      return SendExtResults(conn, output, ext_results);
+    }
+    return SendResults(conn, output, results);
+  }
+
+  static Status SendResults(Connection *conn, std::string *output, 
StreamGetPendingEntryResult &results) {
+    output->append(redis::MultiLen(3 + results.consumer_infos.size()));
+    output->append(redis::Integer(results.pending_number));
+    output->append(redis::BulkString(results.first_entry_id.ToString()));
+    output->append(redis::BulkString(results.last_entry_id.ToString()));
+    output->append(redis::MultiLen(results.consumer_infos.size()));
+    for (const auto &entry : results.consumer_infos) {
+      output->append(redis::MultiLen(2));
+      output->append(redis::BulkString(entry.first));
+      output->append(redis::BulkString(std::to_string(entry.second)));
+    }
+
+    return Status::OK();
+  }
+
+  static Status SendExtResults(Connection *conn, std::string *output, 
std::vector<StreamNACK> &ext_results) {
+    output->append(redis::MultiLen(ext_results.size()));
+    for (const auto &entry : ext_results) {
+      output->append(redis::MultiLen(4));
+      output->append(redis::BulkString(entry.id.ToString()));
+      output->append(redis::BulkString(entry.pel_entry.consumer_name));
+      output->append(redis::Integer(entry.pel_entry.last_delivery_time_ms));
+      output->append(redis::Integer(entry.pel_entry.last_delivery_count));
+    }
+
+    return Status::OK();
+  }
+
+ private:
+  std::string group_name_;
+  std::string stream_name_;
+  std::string consumer_name_;
+  StreamPendingOptions options_;
+};
+
 class CommandXRange : public Commander {
  public:
   Status Parse(const std::vector<std::string> &args) override {
@@ -1754,6 +1846,7 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandXAck>("xack", 
-4, "write no-dbsize-ch
                         MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2, 
2, 1),
                         MakeCmdAttr<CommandXLen>("xlen", -2, "read-only", 1, 
1, 1),
                         MakeCmdAttr<CommandXInfo>("xinfo", -2, "read-only", 0, 
0, 0),
+                        MakeCmdAttr<CommandXPending>("xpending", -3, 
"read-only", 1, 1, 1),
                         MakeCmdAttr<CommandXRange>("xrange", -4, "read-only", 
1, 1, 1),
                         MakeCmdAttr<CommandXRevRange>("xrevrange", -2, 
"read-only", 1, 1, 1),
                         MakeCmdAttr<CommandXRead>("xread", -4, "read-only", 0, 
0, 0),
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 7d12c46e..4df2a2ec 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -1698,4 +1698,92 @@ rocksdb::Status Stream::SetId(const Slice &stream_name, 
const StreamEntryID &las
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+rocksdb::Status Stream::GetPendingEntries(StreamPendingOptions &options, 
StreamGetPendingEntryResult &pending_infos,
+                                          std::vector<StreamNACK> 
&ext_results) {
+  const std::string &stream_name = options.stream_name;
+  const std::string &group_name = options.group_name;
+  std::string ns_key = AppendNamespacePrefix(stream_name);
+
+  StreamMetadata metadata(false);
+  rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
+  if (!s.ok()) {
+    return s.IsNotFound() ? rocksdb::Status::OK() : s;
+  }
+
+  std::string group_key = internalKeyFromGroupName(ns_key, metadata, 
group_name);
+  std::string get_group_value;
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key, 
&get_group_value);
+  if (!s.ok()) {
+    return s.IsNotFound() ? rocksdb::Status::OK() : s;
+  }
+
+  std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, 
group_name, options.start_id);
+  std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, 
group_name, options.end_id);
+
+  rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
+  LatestSnapShot ss(storage_);
+  read_options.snapshot = ss.GetSnapShot();
+  rocksdb::Slice upper_bound(end_key);
+  read_options.iterate_upper_bound = &upper_bound;
+  rocksdb::Slice lower_bound(prefix_key);
+  read_options.iterate_lower_bound = &lower_bound;
+
+  auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
+  std::unordered_set<std::string> consumer_names;
+  StreamEntryID first_entry_id{StreamEntryID::Maximum()};
+  StreamEntryID last_entry_id{StreamEntryID::Minimum()};
+  uint64_t ext_result_count = 0;
+  uint64_t summary_result_count = 0;
+  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+    if (options.with_count && options.count <= ext_result_count) {
+      break;
+    }
+    std::string tmp_group_name;
+    StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), 
tmp_group_name);
+
+    if (first_entry_id > entry_id) {
+      first_entry_id = entry_id;
+    }
+    if (last_entry_id < entry_id) {
+      last_entry_id = entry_id;
+    }
+    StreamPelEntry pel_entry = 
decodeStreamPelEntryValue(iter->value().ToString());
+    if (options.with_time && util::GetTimeStampMS() - 
pel_entry.last_delivery_time_ms < options.idle_time) {
+      continue;
+    }
+
+    const std::string &consumer_name = pel_entry.consumer_name;
+
+    if (options.with_consumer && options.consumer != consumer_name) {
+      continue;
+    }
+
+    if (options.with_count) {
+      ext_results.push_back({entry_id, pel_entry.last_delivery_time_ms, 
pel_entry.last_delivery_count, consumer_name});
+      ext_result_count++;
+      continue;
+    }
+    std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, 
group_name, consumer_name);
+    std::string get_consumer_value;
+    s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, 
&get_consumer_value);
+    if (!s.ok() && !s.IsNotFound()) {
+      return s;
+    }
+    if (s.IsNotFound()) {
+      return rocksdb::Status::OK();
+    }
+
+    StreamConsumerMetadata consumer_metadata = 
decodeStreamConsumerMetadataValue(get_consumer_value);
+    if (consumer_names.find(consumer_name) == consumer_names.end()) {
+      consumer_names.insert(consumer_name);
+      pending_infos.consumer_infos.emplace_back(consumer_name, 
consumer_metadata.pending_number);
+    }
+    summary_result_count++;
+  }
+  pending_infos.last_entry_id = last_entry_id;
+  pending_infos.first_entry_id = first_entry_id;
+  pending_infos.pending_number = summary_result_count;
+  return rocksdb::Status::OK();
+}
+
 }  // namespace redis
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index 510cbb66..57e7e978 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -63,6 +63,8 @@ class Stream : public SubKeyScanner {
                                std::vector<std::pair<std::string, 
StreamConsumerGroupMetadata>> &group_metadata);
   rocksdb::Status GetConsumerInfo(const Slice &stream_name, const std::string 
&group_name,
                                   std::vector<std::pair<std::string, 
StreamConsumerMetadata>> &consumer_metadata);
+  rocksdb::Status GetPendingEntries(StreamPendingOptions &options, 
StreamGetPendingEntryResult &pending_infos,
+                                    std::vector<StreamNACK> &ext_results);
   rocksdb::Status Range(const Slice &stream_name, const StreamRangeOptions 
&options, std::vector<StreamEntry> *entries);
   rocksdb::Status RangeWithPending(const Slice &stream_name, 
StreamRangeOptions &options,
                                    std::vector<StreamEntry> *entries, 
std::string &group_name,
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index 091c64bb..8bda122b 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -239,6 +239,34 @@ struct StreamAutoClaimResult {
   std::vector<std::string> deleted_ids;
 };
 
+struct StreamPendingOptions {
+  uint64_t idle_time = 0;
+  bool with_time = false;
+
+  StreamEntryID start_id{StreamEntryID::Minimum()};
+  StreamEntryID end_id{StreamEntryID::Maximum()};
+
+  uint64_t count;
+  bool with_count = false;
+  bool with_consumer = false;
+
+  std::string consumer;
+  std::string stream_name;
+  std::string group_name;
+};
+
+struct StreamGetPendingEntryResult {
+  uint64_t pending_number;
+  StreamEntryID first_entry_id;
+  StreamEntryID last_entry_id;
+  std::vector<std::pair<std::string, int>> consumer_infos;
+};
+
+struct StreamNACK {
+  StreamEntryID id;
+  StreamPelEntry pel_entry;
+};
+
 Status IncrementStreamEntryID(StreamEntryID *id);
 Status ParseStreamEntryID(const std::string &input, StreamEntryID *id);
 StatusOr<std::unique_ptr<NextStreamEntryIDGenerationStrategy>> 
ParseNextStreamEntryIDStrategy(const std::string &input);
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index 0288fb79..8ff7a503 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -1932,6 +1932,82 @@ func TestStreamOffset(t *testing.T) {
                require.Error(t, cmd.Err())
                require.Equal(t, "ERR COUNT must be > 0", cmd.Err().Error())
        })
+
+       t.Run("XPending with different kinds of commands", func(t *testing.T) {
+               streamName := "mystream"
+               groupName := "mygroup"
+               require.NoError(t, rdb.Del(ctx, streamName).Err())
+               r, err := rdb.XAck(ctx, streamName, groupName, "0-0").Result()
+               require.NoError(t, err)
+               require.Equal(t, int64(0), r)
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "1-0",
+                       Values: []string{"field1", "data1"},
+               }).Err())
+               require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, 
"0").Err())
+               consumerName := "myconsumer"
+               err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, ">"},
+                       Count:    1,
+                       NoAck:    false,
+               }).Err()
+               require.NoError(t, err)
+
+               r1, err1 := rdb.XPending(ctx, streamName, groupName).Result()
+               require.NoError(t, err1)
+
+               require.Equal(t, &redis.XPending{
+                       Count:     1,
+                       Lower:     "1-0",
+                       Higher:    "1-0",
+                       Consumers: map[string]int64{"myconsumer": 1},
+               }, r1)
+
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "2-0",
+                       Values: []string{"field1", "data1"},
+               }).Err())
+
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "2-2",
+                       Values: []string{"field1", "data1"},
+               }).Err())
+
+               require.NoError(t, rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, ">"},
+                       Count:    2,
+                       NoAck:    false,
+               }).Err())
+
+               r1, err1 = rdb.XPending(ctx, streamName, groupName).Result()
+               require.NoError(t, err1)
+
+               require.Equal(t, &redis.XPending{
+                       Count:     3,
+                       Lower:     "1-0",
+                       Higher:    "2-2",
+                       Consumers: map[string]int64{"myconsumer": 3},
+               }, r1)
+
+               require.NoError(t, rdb.XAck(ctx, streamName, groupName, 
"2-0").Err())
+
+               r1, err1 = rdb.XPending(ctx, streamName, groupName).Result()
+               require.NoError(t, err1)
+
+               require.Equal(t, &redis.XPending{
+                       Count:     2,
+                       Lower:     "1-0",
+                       Higher:    "2-2",
+                       Consumers: map[string]int64{"myconsumer": 2},
+               }, r1)
+       })
 }
 
 func parseStreamEntryID(id string) (ts int64, seqNum int64) {

Reply via email to