This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 68523ac3 feat(stream): add support of XPENDING command (#2387)
68523ac3 is described below
commit 68523ac35dcb2d712dfac363c4cabe774ff927f6
Author: Hauru <[email protected]>
AuthorDate: Sun Jul 21 22:01:50 2024 +0800
feat(stream): add support of XPENDING command (#2387)
Co-authored-by: Twice <[email protected]>
---
src/commands/cmd_stream.cc | 93 ++++++++++++++++++++++++++++
src/types/redis_stream.cc | 88 ++++++++++++++++++++++++++
src/types/redis_stream.h | 2 +
src/types/redis_stream_base.h | 28 +++++++++
tests/gocase/unit/type/stream/stream_test.go | 76 +++++++++++++++++++++++
5 files changed, 287 insertions(+)
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index ea448332..081933a1 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -28,6 +28,7 @@
#include "error_constants.h"
#include "event_util.h"
#include "server/server.h"
+#include "status.h"
#include "time_util.h"
#include "types/redis_stream.h"
@@ -824,6 +825,97 @@ class CommandXInfo : public Commander {
}
};
+class CommandXPending : public Commander {
+ public:
+ Status Parse(const std::vector<std::string> &args) override {
+ CommandParser parser(args, 1);
+ stream_name_ = GET_OR_RET(parser.TakeStr());
+ group_name_ = GET_OR_RET(parser.TakeStr());
+ if (parser.EatEqICase("idle")) {
+ options_.idle_time = GET_OR_RET(parser.TakeInt<uint64_t>());
+ options_.with_time = true;
+ }
+
+ if (parser.Good()) {
+ std::string start_id, end_id;
+ start_id = GET_OR_RET(parser.TakeStr());
+ end_id = GET_OR_RET(parser.TakeStr());
+ if (start_id != "-") {
+ auto s = ParseStreamEntryID(start_id, &options_.start_id);
+ if (!s.IsOK()) {
+ return s;
+ }
+ }
+
+ if (end_id != "+") {
+ auto s = ParseStreamEntryID(start_id, &options_.end_id);
+ if (!s.IsOK()) {
+ return s;
+ }
+ }
+
+ options_.count = GET_OR_RET(parser.TakeInt<uint64_t>());
+ options_.with_count = true;
+ if (parser.Good()) {
+ options_.consumer = GET_OR_RET(parser.TakeStr());
+ options_.with_consumer = true;
+ }
+ }
+ return Status::OK();
+ }
+
+ Status Execute(Server *srv, Connection *conn, std::string *output) override {
+ redis::Stream stream_db(srv->storage, conn->GetNamespace());
+ std::vector<std::pair<std::string, int>> pending_infos;
+ StreamGetPendingEntryResult results;
+ options_.stream_name = stream_name_;
+ options_.group_name = group_name_;
+ std::vector<StreamNACK> ext_results;
+ auto s = stream_db.GetPendingEntries(options_, results, ext_results);
+ if (!s.ok()) {
+ return {Status::RedisExecErr, s.ToString()};
+ }
+ if (options_.with_count) {
+ return SendExtResults(conn, output, ext_results);
+ }
+ return SendResults(conn, output, results);
+ }
+
+ static Status SendResults(Connection *conn, std::string *output,
StreamGetPendingEntryResult &results) {
+ output->append(redis::MultiLen(3 + results.consumer_infos.size()));
+ output->append(redis::Integer(results.pending_number));
+ output->append(redis::BulkString(results.first_entry_id.ToString()));
+ output->append(redis::BulkString(results.last_entry_id.ToString()));
+ output->append(redis::MultiLen(results.consumer_infos.size()));
+ for (const auto &entry : results.consumer_infos) {
+ output->append(redis::MultiLen(2));
+ output->append(redis::BulkString(entry.first));
+ output->append(redis::BulkString(std::to_string(entry.second)));
+ }
+
+ return Status::OK();
+ }
+
+ static Status SendExtResults(Connection *conn, std::string *output,
std::vector<StreamNACK> &ext_results) {
+ output->append(redis::MultiLen(ext_results.size()));
+ for (const auto &entry : ext_results) {
+ output->append(redis::MultiLen(4));
+ output->append(redis::BulkString(entry.id.ToString()));
+ output->append(redis::BulkString(entry.pel_entry.consumer_name));
+ output->append(redis::Integer(entry.pel_entry.last_delivery_time_ms));
+ output->append(redis::Integer(entry.pel_entry.last_delivery_count));
+ }
+
+ return Status::OK();
+ }
+
+ private:
+ std::string group_name_;
+ std::string stream_name_;
+ std::string consumer_name_;
+ StreamPendingOptions options_;
+};
+
class CommandXRange : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
@@ -1754,6 +1846,7 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandXAck>("xack",
-4, "write no-dbsize-ch
MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2,
2, 1),
MakeCmdAttr<CommandXLen>("xlen", -2, "read-only", 1,
1, 1),
MakeCmdAttr<CommandXInfo>("xinfo", -2, "read-only", 0,
0, 0),
+ MakeCmdAttr<CommandXPending>("xpending", -3,
"read-only", 1, 1, 1),
MakeCmdAttr<CommandXRange>("xrange", -4, "read-only",
1, 1, 1),
MakeCmdAttr<CommandXRevRange>("xrevrange", -2,
"read-only", 1, 1, 1),
MakeCmdAttr<CommandXRead>("xread", -4, "read-only", 0,
0, 0),
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 7d12c46e..4df2a2ec 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -1698,4 +1698,92 @@ rocksdb::Status Stream::SetId(const Slice &stream_name,
const StreamEntryID &las
return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
+rocksdb::Status Stream::GetPendingEntries(StreamPendingOptions &options,
StreamGetPendingEntryResult &pending_infos,
+ std::vector<StreamNACK>
&ext_results) {
+ const std::string &stream_name = options.stream_name;
+ const std::string &group_name = options.group_name;
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+
+ StreamMetadata metadata(false);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
+ if (!s.ok()) {
+ return s.IsNotFound() ? rocksdb::Status::OK() : s;
+ }
+
+ std::string group_key = internalKeyFromGroupName(ns_key, metadata,
group_name);
+ std::string get_group_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key,
&get_group_value);
+ if (!s.ok()) {
+ return s.IsNotFound() ? rocksdb::Status::OK() : s;
+ }
+
+ std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata,
group_name, options.start_id);
+ std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata,
group_name, options.end_id);
+
+ rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
+ LatestSnapShot ss(storage_);
+ read_options.snapshot = ss.GetSnapShot();
+ rocksdb::Slice upper_bound(end_key);
+ read_options.iterate_upper_bound = &upper_bound;
+ rocksdb::Slice lower_bound(prefix_key);
+ read_options.iterate_lower_bound = &lower_bound;
+
+ auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
+ std::unordered_set<std::string> consumer_names;
+ StreamEntryID first_entry_id{StreamEntryID::Maximum()};
+ StreamEntryID last_entry_id{StreamEntryID::Minimum()};
+ uint64_t ext_result_count = 0;
+ uint64_t summary_result_count = 0;
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ if (options.with_count && options.count <= ext_result_count) {
+ break;
+ }
+ std::string tmp_group_name;
+ StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(),
tmp_group_name);
+
+ if (first_entry_id > entry_id) {
+ first_entry_id = entry_id;
+ }
+ if (last_entry_id < entry_id) {
+ last_entry_id = entry_id;
+ }
+ StreamPelEntry pel_entry =
decodeStreamPelEntryValue(iter->value().ToString());
+ if (options.with_time && util::GetTimeStampMS() -
pel_entry.last_delivery_time_ms < options.idle_time) {
+ continue;
+ }
+
+ const std::string &consumer_name = pel_entry.consumer_name;
+
+ if (options.with_consumer && options.consumer != consumer_name) {
+ continue;
+ }
+
+ if (options.with_count) {
+ ext_results.push_back({entry_id, pel_entry.last_delivery_time_ms,
pel_entry.last_delivery_count, consumer_name});
+ ext_result_count++;
+ continue;
+ }
+ std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata,
group_name, consumer_name);
+ std::string get_consumer_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key,
&get_consumer_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ return rocksdb::Status::OK();
+ }
+
+ StreamConsumerMetadata consumer_metadata =
decodeStreamConsumerMetadataValue(get_consumer_value);
+ if (consumer_names.find(consumer_name) == consumer_names.end()) {
+ consumer_names.insert(consumer_name);
+ pending_infos.consumer_infos.emplace_back(consumer_name,
consumer_metadata.pending_number);
+ }
+ summary_result_count++;
+ }
+ pending_infos.last_entry_id = last_entry_id;
+ pending_infos.first_entry_id = first_entry_id;
+ pending_infos.pending_number = summary_result_count;
+ return rocksdb::Status::OK();
+}
+
} // namespace redis
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index 510cbb66..57e7e978 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -63,6 +63,8 @@ class Stream : public SubKeyScanner {
std::vector<std::pair<std::string,
StreamConsumerGroupMetadata>> &group_metadata);
rocksdb::Status GetConsumerInfo(const Slice &stream_name, const std::string
&group_name,
std::vector<std::pair<std::string,
StreamConsumerMetadata>> &consumer_metadata);
+ rocksdb::Status GetPendingEntries(StreamPendingOptions &options,
StreamGetPendingEntryResult &pending_infos,
+ std::vector<StreamNACK> &ext_results);
rocksdb::Status Range(const Slice &stream_name, const StreamRangeOptions
&options, std::vector<StreamEntry> *entries);
rocksdb::Status RangeWithPending(const Slice &stream_name,
StreamRangeOptions &options,
std::vector<StreamEntry> *entries,
std::string &group_name,
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index 091c64bb..8bda122b 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -239,6 +239,34 @@ struct StreamAutoClaimResult {
std::vector<std::string> deleted_ids;
};
+struct StreamPendingOptions {
+ uint64_t idle_time = 0;
+ bool with_time = false;
+
+ StreamEntryID start_id{StreamEntryID::Minimum()};
+ StreamEntryID end_id{StreamEntryID::Maximum()};
+
+ uint64_t count;
+ bool with_count = false;
+ bool with_consumer = false;
+
+ std::string consumer;
+ std::string stream_name;
+ std::string group_name;
+};
+
+struct StreamGetPendingEntryResult {
+ uint64_t pending_number;
+ StreamEntryID first_entry_id;
+ StreamEntryID last_entry_id;
+ std::vector<std::pair<std::string, int>> consumer_infos;
+};
+
+struct StreamNACK {
+ StreamEntryID id;
+ StreamPelEntry pel_entry;
+};
+
Status IncrementStreamEntryID(StreamEntryID *id);
Status ParseStreamEntryID(const std::string &input, StreamEntryID *id);
StatusOr<std::unique_ptr<NextStreamEntryIDGenerationStrategy>>
ParseNextStreamEntryIDStrategy(const std::string &input);
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index 0288fb79..8ff7a503 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -1932,6 +1932,82 @@ func TestStreamOffset(t *testing.T) {
require.Error(t, cmd.Err())
require.Equal(t, "ERR COUNT must be > 0", cmd.Err().Error())
})
+
+ t.Run("XPending with different kinds of commands", func(t *testing.T) {
+ streamName := "mystream"
+ groupName := "mygroup"
+ require.NoError(t, rdb.Del(ctx, streamName).Err())
+ r, err := rdb.XAck(ctx, streamName, groupName, "0-0").Result()
+ require.NoError(t, err)
+ require.Equal(t, int64(0), r)
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "1-0",
+ Values: []string{"field1", "data1"},
+ }).Err())
+ require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+ consumerName := "myconsumer"
+ err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, ">"},
+ Count: 1,
+ NoAck: false,
+ }).Err()
+ require.NoError(t, err)
+
+ r1, err1 := rdb.XPending(ctx, streamName, groupName).Result()
+ require.NoError(t, err1)
+
+ require.Equal(t, &redis.XPending{
+ Count: 1,
+ Lower: "1-0",
+ Higher: "1-0",
+ Consumers: map[string]int64{"myconsumer": 1},
+ }, r1)
+
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "2-0",
+ Values: []string{"field1", "data1"},
+ }).Err())
+
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "2-2",
+ Values: []string{"field1", "data1"},
+ }).Err())
+
+ require.NoError(t, rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, ">"},
+ Count: 2,
+ NoAck: false,
+ }).Err())
+
+ r1, err1 = rdb.XPending(ctx, streamName, groupName).Result()
+ require.NoError(t, err1)
+
+ require.Equal(t, &redis.XPending{
+ Count: 3,
+ Lower: "1-0",
+ Higher: "2-2",
+ Consumers: map[string]int64{"myconsumer": 3},
+ }, r1)
+
+ require.NoError(t, rdb.XAck(ctx, streamName, groupName,
"2-0").Err())
+
+ r1, err1 = rdb.XPending(ctx, streamName, groupName).Result()
+ require.NoError(t, err1)
+
+ require.Equal(t, &redis.XPending{
+ Count: 2,
+ Lower: "1-0",
+ Higher: "2-2",
+ Consumers: map[string]int64{"myconsumer": 2},
+ }, r1)
+ })
}
func parseStreamEntryID(id string) (ts int64, seqNum int64) {