This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 5333c346c fix(stream): should return an empty string instead of nil in 
XREAD/XRANGE (#2897)
5333c346c is described below

commit 5333c346c360fe59228711ecef57aa3fb96bddb0
Author: hulk <[email protected]>
AuthorDate: Fri Apr 25 07:33:39 2025 +0800

    fix(stream): should return an empty string instead of nil in XREAD/XRANGE 
(#2897)
    
    Co-authored-by: Twice <[email protected]>
---
 src/commands/cmd_stream.cc                   | 24 ++++++++++++------------
 tests/gocase/unit/type/stream/stream_test.go | 20 ++++++++++++++++++++
 2 files changed, 32 insertions(+), 12 deletions(-)

diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index fcc821cf1..d67cf778f 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -51,12 +51,12 @@ CommandKeyRange ParseStreamReadRange(const 
std::vector<std::string> &args, uint3
 }
 }  // namespace
 
-void AddStreamEntriesToResponse(const Connection *conn, std::string *output, 
const std::vector<StreamEntry> &entries) {
+void AddStreamEntriesToResponse(std::string *output, const 
std::vector<StreamEntry> &entries) {
   output->append(redis::MultiLen(entries.size()));
   for (const auto &entry : entries) {
     output->append(redis::MultiLen(2));
     output->append(redis::BulkString(entry.key));
-    output->append(conn->MultiBulkString(entry.values));
+    output->append(redis::ArrayOfBulkStrings(entry.values));
   }
 }
 
@@ -359,7 +359,7 @@ class CommandXClaim : public Commander {
     }
 
     if (!stream_claim_options_.just_id) {
-      AddStreamEntriesToResponse(conn, output, result.entries);
+      AddStreamEntriesToResponse(output, result.entries);
     } else {
       output->append(redis::MultiLen(result.ids.size()));
       for (const auto &id : result.ids) {
@@ -454,7 +454,7 @@ class CommandAutoClaim : public Commander {
         output->append(redis::BulkString(item.key));
       }
     } else {
-      AddStreamEntriesToResponse(conn, output, result.entries);
+      AddStreamEntriesToResponse(output, result.entries);
     }
 
     output->append(redis::MultiLen(result.deleted_ids.size()));
@@ -766,7 +766,7 @@ class CommandXInfo : public Commander {
       }
     } else {
       output->append(redis::BulkString("entries"));
-      AddStreamEntriesToResponse(conn, output, info.entries);
+      AddStreamEntriesToResponse(output, info.entries);
     }
 
     return Status::OK();
@@ -1024,7 +1024,7 @@ class CommandXRange : public Commander {
       return {Status::RedisExecErr, s.ToString()};
     }
 
-    AddStreamEntriesToResponse(conn, output, result);
+    AddStreamEntriesToResponse(output, result);
 
     return Status::OK();
   }
@@ -1112,7 +1112,7 @@ class CommandXRevRange : public Commander {
       return {Status::RedisExecErr, s.ToString()};
     }
 
-    AddStreamEntriesToResponse(conn, output, result);
+    AddStreamEntriesToResponse(output, result);
 
     return Status::OK();
   }
@@ -1255,17 +1255,17 @@ class CommandXRead : public Commander,
       return Status::OK();
     }
 
-    return SendResults(conn, output, results);
+    return SendResults(output, results);
   }
 
-  static Status SendResults(Connection *conn, std::string *output, const 
std::vector<StreamReadResult> &results) {
+  static Status SendResults(std::string *output, const 
std::vector<StreamReadResult> &results) {
     output->append(redis::MultiLen(results.size()));
 
     for (const auto &result : results) {
       output->append(redis::MultiLen(2));
       output->append(redis::BulkString(result.name));
 
-      AddStreamEntriesToResponse(conn, output, result.entries);
+      AddStreamEntriesToResponse(output, result.entries);
     }
 
     return Status::OK();
@@ -1366,7 +1366,7 @@ class CommandXRead : public Commander,
       output.append(redis::MultiLen(2));
       output.append(redis::BulkString(result.name));
 
-      AddStreamEntriesToResponse(conn_, &output, result.entries);
+      AddStreamEntriesToResponse(&output, result.entries);
     }
 
     conn_->Reply(output);
@@ -1676,7 +1676,7 @@ class CommandXReadGroup : public Commander,
     for (const auto &result : results) {
       output.append(redis::MultiLen(2));
       output.append(redis::BulkString(result.name));
-      AddStreamEntriesToResponse(conn_, &output, result.entries);
+      AddStreamEntriesToResponse(&output, result.entries);
     }
 
     conn_->Reply(output);
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index 429902ae6..6e8d45dba 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -266,6 +266,26 @@ var streamTests = func(t *testing.T, configs 
util.KvrocksServerConfigs) {
                require.Zero(t, rdb.Exists(ctx, "otherstream").Val())
        })
 
+       t.Run("XADD with empty string, for issue #2830", func(t *testing.T) {
+               streamName := "empty-string-stream"
+               require.NoError(t, rdb.Del(ctx, streamName).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       Values: []string{"a", "b", "c", "", "", "d"},
+               }).Err())
+
+               messages, err := rdb.XRange(ctx, streamName, "-", "+").Result()
+               require.NoError(t, err)
+               require.Len(t, messages, 1)
+               require.EqualValues(t, map[string]interface{}{"a": "b", "c": 
"", "": "d"}, messages[0].Values)
+
+               result, err := rdb.XRead(ctx, &redis.XReadArgs{Streams: 
[]string{streamName, "0"}, Count: 100}).Result()
+               require.NoError(t, err)
+               require.Len(t, result, 1)
+               require.Len(t, result[0].Messages, 1)
+               require.EqualValues(t, map[string]interface{}{"a": "b", "c": 
"", "": "d"}, result[0].Messages[0].Values)
+       })
+
        t.Run("XRANGE COUNT works as expected", func(t *testing.T) {
                require.Len(t, rdb.XRangeN(ctx, "mystream", "-", "+", 
10).Val(), 10)
        })

Reply via email to