This is an automated email from the ASF dual-hosted git repository.

torwig pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 99bab29bc refactor: adding stream elements to response (#2836)
99bab29bc is described below

commit 99bab29bc7eb5d7993a5e2130b67aece40ddc7c2
Author: Yaroslav <[email protected]>
AuthorDate: Wed Mar 19 11:59:02 2025 +0200

    refactor: adding stream elements to response (#2836)
---
 src/commands/cmd_stream.cc | 79 ++++++++++++++--------------------------------
 1 file changed, 24 insertions(+), 55 deletions(-)

diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 89650bda8..fcc821cf1 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -33,6 +33,7 @@
 #include "types/redis_stream.h"
 
 namespace redis {
+
 namespace {
 // for XRead and XReadGroup stream range parse.
 CommandKeyRange ParseStreamReadRange(const std::vector<std::string> &args, 
uint32_t start_offset) {
@@ -50,6 +51,15 @@ CommandKeyRange ParseStreamReadRange(const 
std::vector<std::string> &args, uint3
 }
 }  // namespace
 
+void AddStreamEntriesToResponse(const Connection *conn, std::string *output, 
const std::vector<StreamEntry> &entries) {
+  output->append(redis::MultiLen(entries.size()));
+  for (const auto &entry : entries) {
+    output->append(redis::MultiLen(2));
+    output->append(redis::BulkString(entry.key));
+    output->append(conn->MultiBulkString(entry.values));
+  }
+}
+
 class CommandXAck : public Commander {
  public:
   Status Parse(const std::vector<std::string> &args) override {
@@ -349,13 +359,7 @@ class CommandXClaim : public Commander {
     }
 
     if (!stream_claim_options_.just_id) {
-      output->append(redis::MultiLen(result.entries.size()));
-
-      for (const auto &e : result.entries) {
-        output->append(redis::MultiLen(2));
-        output->append(redis::BulkString(e.key));
-        output->append(conn->MultiBulkString(e.values));
-      }
+      AddStreamEntriesToResponse(conn, output, result.entries);
     } else {
       output->append(redis::MultiLen(result.ids.size()));
       for (const auto &id : result.ids) {
@@ -444,18 +448,13 @@ class CommandAutoClaim : public Commander {
                      std::string *output) const {
     output->append(redis::MultiLen(3));
     output->append(redis::BulkString(result.next_claim_id));
-    output->append(redis::MultiLen(result.entries.size()));
-    for (const auto &item : result.entries) {
-      if (options_.just_id) {
-        output->append(redis::BulkString(item.key));
-      } else {
-        output->append(redis::MultiLen(2));
+    if (options_.just_id) {
+      output->append(redis::MultiLen(result.entries.size()));
+      for (const auto &item : result.entries) {
         output->append(redis::BulkString(item.key));
-        output->append(redis::MultiLen(item.values.size()));
-        for (const auto &value_item : item.values) {
-          output->append(redis::BulkString(value_item));
-        }
       }
+    } else {
+      AddStreamEntriesToResponse(conn, output, result.entries);
     }
 
     output->append(redis::MultiLen(result.deleted_ids.size()));
@@ -767,12 +766,7 @@ class CommandXInfo : public Commander {
       }
     } else {
       output->append(redis::BulkString("entries"));
-      output->append(redis::MultiLen(info.entries.size()));
-      for (const auto &e : info.entries) {
-        output->append(redis::MultiLen(2));
-        output->append(redis::BulkString(e.key));
-        output->append(conn->MultiBulkString(e.values));
-      }
+      AddStreamEntriesToResponse(conn, output, info.entries);
     }
 
     return Status::OK();
@@ -1030,13 +1024,7 @@ class CommandXRange : public Commander {
       return {Status::RedisExecErr, s.ToString()};
     }
 
-    output->append(redis::MultiLen(result.size()));
-
-    for (const auto &e : result) {
-      output->append(redis::MultiLen(2));
-      output->append(redis::BulkString(e.key));
-      output->append(conn->MultiBulkString(e.values));
-    }
+    AddStreamEntriesToResponse(conn, output, result);
 
     return Status::OK();
   }
@@ -1124,13 +1112,7 @@ class CommandXRevRange : public Commander {
       return {Status::RedisExecErr, s.ToString()};
     }
 
-    output->append(redis::MultiLen(result.size()));
-
-    for (const auto &e : result) {
-      output->append(redis::MultiLen(2));
-      output->append(redis::BulkString(e.key));
-      output->append(conn->MultiBulkString(e.values));
-    }
+    AddStreamEntriesToResponse(conn, output, result);
 
     return Status::OK();
   }
@@ -1282,12 +1264,8 @@ class CommandXRead : public Commander,
     for (const auto &result : results) {
       output->append(redis::MultiLen(2));
       output->append(redis::BulkString(result.name));
-      output->append(redis::MultiLen(result.entries.size()));
-      for (const auto &entry : result.entries) {
-        output->append(redis::MultiLen(2));
-        output->append(redis::BulkString(entry.key));
-        output->append(conn->MultiBulkString(entry.values));
-      }
+
+      AddStreamEntriesToResponse(conn, output, result.entries);
     }
 
     return Status::OK();
@@ -1387,12 +1365,8 @@ class CommandXRead : public Commander,
     for (const auto &result : results) {
       output.append(redis::MultiLen(2));
       output.append(redis::BulkString(result.name));
-      output.append(redis::MultiLen(result.entries.size()));
-      for (const auto &entry : result.entries) {
-        output.append(redis::MultiLen(2));
-        output.append(redis::BulkString(entry.key));
-        output.append(conn_->MultiBulkString(entry.values));
-      }
+
+      AddStreamEntriesToResponse(conn_, &output, result.entries);
     }
 
     conn_->Reply(output);
@@ -1702,12 +1676,7 @@ class CommandXReadGroup : public Commander,
     for (const auto &result : results) {
       output.append(redis::MultiLen(2));
       output.append(redis::BulkString(result.name));
-      output.append(redis::MultiLen(result.entries.size()));
-      for (const auto &entry : result.entries) {
-        output.append(redis::MultiLen(2));
-        output.append(redis::BulkString(entry.key));
-        output.append(conn_->MultiBulkString(entry.values));
-      }
+      AddStreamEntriesToResponse(conn_, &output, result.entries);
     }
 
     conn_->Reply(output);

Reply via email to