This is an automated email from the ASF dual-hosted git repository.
torwig pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 99bab29bc refactor: adding stream elements to response (#2836)
99bab29bc is described below
commit 99bab29bc7eb5d7993a5e2130b67aece40ddc7c2
Author: Yaroslav <[email protected]>
AuthorDate: Wed Mar 19 11:59:02 2025 +0200
refactor: adding stream elements to response (#2836)
---
src/commands/cmd_stream.cc | 79 ++++++++++++++--------------------------------
1 file changed, 24 insertions(+), 55 deletions(-)
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 89650bda8..fcc821cf1 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -33,6 +33,7 @@
#include "types/redis_stream.h"
namespace redis {
+
namespace {
// for XRead and XReadGroup stream range parse.
CommandKeyRange ParseStreamReadRange(const std::vector<std::string> &args,
uint32_t start_offset) {
@@ -50,6 +51,15 @@ CommandKeyRange ParseStreamReadRange(const
std::vector<std::string> &args, uint3
}
} // namespace
+void AddStreamEntriesToResponse(const Connection *conn, std::string *output,
const std::vector<StreamEntry> &entries) {
+ output->append(redis::MultiLen(entries.size()));
+ for (const auto &entry : entries) {
+ output->append(redis::MultiLen(2));
+ output->append(redis::BulkString(entry.key));
+ output->append(conn->MultiBulkString(entry.values));
+ }
+}
+
class CommandXAck : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
@@ -349,13 +359,7 @@ class CommandXClaim : public Commander {
}
if (!stream_claim_options_.just_id) {
- output->append(redis::MultiLen(result.entries.size()));
-
- for (const auto &e : result.entries) {
- output->append(redis::MultiLen(2));
- output->append(redis::BulkString(e.key));
- output->append(conn->MultiBulkString(e.values));
- }
+ AddStreamEntriesToResponse(conn, output, result.entries);
} else {
output->append(redis::MultiLen(result.ids.size()));
for (const auto &id : result.ids) {
@@ -444,18 +448,13 @@ class CommandAutoClaim : public Commander {
std::string *output) const {
output->append(redis::MultiLen(3));
output->append(redis::BulkString(result.next_claim_id));
- output->append(redis::MultiLen(result.entries.size()));
- for (const auto &item : result.entries) {
- if (options_.just_id) {
- output->append(redis::BulkString(item.key));
- } else {
- output->append(redis::MultiLen(2));
+ if (options_.just_id) {
+ output->append(redis::MultiLen(result.entries.size()));
+ for (const auto &item : result.entries) {
output->append(redis::BulkString(item.key));
- output->append(redis::MultiLen(item.values.size()));
- for (const auto &value_item : item.values) {
- output->append(redis::BulkString(value_item));
- }
}
+ } else {
+ AddStreamEntriesToResponse(conn, output, result.entries);
}
output->append(redis::MultiLen(result.deleted_ids.size()));
@@ -767,12 +766,7 @@ class CommandXInfo : public Commander {
}
} else {
output->append(redis::BulkString("entries"));
- output->append(redis::MultiLen(info.entries.size()));
- for (const auto &e : info.entries) {
- output->append(redis::MultiLen(2));
- output->append(redis::BulkString(e.key));
- output->append(conn->MultiBulkString(e.values));
- }
+ AddStreamEntriesToResponse(conn, output, info.entries);
}
return Status::OK();
@@ -1030,13 +1024,7 @@ class CommandXRange : public Commander {
return {Status::RedisExecErr, s.ToString()};
}
- output->append(redis::MultiLen(result.size()));
-
- for (const auto &e : result) {
- output->append(redis::MultiLen(2));
- output->append(redis::BulkString(e.key));
- output->append(conn->MultiBulkString(e.values));
- }
+ AddStreamEntriesToResponse(conn, output, result);
return Status::OK();
}
@@ -1124,13 +1112,7 @@ class CommandXRevRange : public Commander {
return {Status::RedisExecErr, s.ToString()};
}
- output->append(redis::MultiLen(result.size()));
-
- for (const auto &e : result) {
- output->append(redis::MultiLen(2));
- output->append(redis::BulkString(e.key));
- output->append(conn->MultiBulkString(e.values));
- }
+ AddStreamEntriesToResponse(conn, output, result);
return Status::OK();
}
@@ -1282,12 +1264,8 @@ class CommandXRead : public Commander,
for (const auto &result : results) {
output->append(redis::MultiLen(2));
output->append(redis::BulkString(result.name));
- output->append(redis::MultiLen(result.entries.size()));
- for (const auto &entry : result.entries) {
- output->append(redis::MultiLen(2));
- output->append(redis::BulkString(entry.key));
- output->append(conn->MultiBulkString(entry.values));
- }
+
+ AddStreamEntriesToResponse(conn, output, result.entries);
}
return Status::OK();
@@ -1387,12 +1365,8 @@ class CommandXRead : public Commander,
for (const auto &result : results) {
output.append(redis::MultiLen(2));
output.append(redis::BulkString(result.name));
- output.append(redis::MultiLen(result.entries.size()));
- for (const auto &entry : result.entries) {
- output.append(redis::MultiLen(2));
- output.append(redis::BulkString(entry.key));
- output.append(conn_->MultiBulkString(entry.values));
- }
+
+ AddStreamEntriesToResponse(conn_, &output, result.entries);
}
conn_->Reply(output);
@@ -1702,12 +1676,7 @@ class CommandXReadGroup : public Commander,
for (const auto &result : results) {
output.append(redis::MultiLen(2));
output.append(redis::BulkString(result.name));
- output.append(redis::MultiLen(result.entries.size()));
- for (const auto &entry : result.entries) {
- output.append(redis::MultiLen(2));
- output.append(redis::BulkString(entry.key));
- output.append(conn_->MultiBulkString(entry.values));
- }
+ AddStreamEntriesToResponse(conn_, &output, result.entries);
}
conn_->Reply(output);