This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 5333c346c fix(stream): should return an empty string instead of nil in
XREAD/XRANGE (#2897)
5333c346c is described below
commit 5333c346c360fe59228711ecef57aa3fb96bddb0
Author: hulk <[email protected]>
AuthorDate: Fri Apr 25 07:33:39 2025 +0800
fix(stream): should return an empty string instead of nil in XREAD/XRANGE
(#2897)
Co-authored-by: Twice <[email protected]>
---
src/commands/cmd_stream.cc | 24 ++++++++++++------------
tests/gocase/unit/type/stream/stream_test.go | 20 ++++++++++++++++++++
2 files changed, 32 insertions(+), 12 deletions(-)
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index fcc821cf1..d67cf778f 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -51,12 +51,12 @@ CommandKeyRange ParseStreamReadRange(const
std::vector<std::string> &args, uint3
}
} // namespace
-void AddStreamEntriesToResponse(const Connection *conn, std::string *output,
const std::vector<StreamEntry> &entries) {
+void AddStreamEntriesToResponse(std::string *output, const
std::vector<StreamEntry> &entries) {
output->append(redis::MultiLen(entries.size()));
for (const auto &entry : entries) {
output->append(redis::MultiLen(2));
output->append(redis::BulkString(entry.key));
- output->append(conn->MultiBulkString(entry.values));
+ output->append(redis::ArrayOfBulkStrings(entry.values));
}
}
@@ -359,7 +359,7 @@ class CommandXClaim : public Commander {
}
if (!stream_claim_options_.just_id) {
- AddStreamEntriesToResponse(conn, output, result.entries);
+ AddStreamEntriesToResponse(output, result.entries);
} else {
output->append(redis::MultiLen(result.ids.size()));
for (const auto &id : result.ids) {
@@ -454,7 +454,7 @@ class CommandAutoClaim : public Commander {
output->append(redis::BulkString(item.key));
}
} else {
- AddStreamEntriesToResponse(conn, output, result.entries);
+ AddStreamEntriesToResponse(output, result.entries);
}
output->append(redis::MultiLen(result.deleted_ids.size()));
@@ -766,7 +766,7 @@ class CommandXInfo : public Commander {
}
} else {
output->append(redis::BulkString("entries"));
- AddStreamEntriesToResponse(conn, output, info.entries);
+ AddStreamEntriesToResponse(output, info.entries);
}
return Status::OK();
@@ -1024,7 +1024,7 @@ class CommandXRange : public Commander {
return {Status::RedisExecErr, s.ToString()};
}
- AddStreamEntriesToResponse(conn, output, result);
+ AddStreamEntriesToResponse(output, result);
return Status::OK();
}
@@ -1112,7 +1112,7 @@ class CommandXRevRange : public Commander {
return {Status::RedisExecErr, s.ToString()};
}
- AddStreamEntriesToResponse(conn, output, result);
+ AddStreamEntriesToResponse(output, result);
return Status::OK();
}
@@ -1255,17 +1255,17 @@ class CommandXRead : public Commander,
return Status::OK();
}
- return SendResults(conn, output, results);
+ return SendResults(output, results);
}
- static Status SendResults(Connection *conn, std::string *output, const
std::vector<StreamReadResult> &results) {
+ static Status SendResults(std::string *output, const
std::vector<StreamReadResult> &results) {
output->append(redis::MultiLen(results.size()));
for (const auto &result : results) {
output->append(redis::MultiLen(2));
output->append(redis::BulkString(result.name));
- AddStreamEntriesToResponse(conn, output, result.entries);
+ AddStreamEntriesToResponse(output, result.entries);
}
return Status::OK();
@@ -1366,7 +1366,7 @@ class CommandXRead : public Commander,
output.append(redis::MultiLen(2));
output.append(redis::BulkString(result.name));
- AddStreamEntriesToResponse(conn_, &output, result.entries);
+ AddStreamEntriesToResponse(&output, result.entries);
}
conn_->Reply(output);
@@ -1676,7 +1676,7 @@ class CommandXReadGroup : public Commander,
for (const auto &result : results) {
output.append(redis::MultiLen(2));
output.append(redis::BulkString(result.name));
- AddStreamEntriesToResponse(conn_, &output, result.entries);
+ AddStreamEntriesToResponse(&output, result.entries);
}
conn_->Reply(output);
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index 429902ae6..6e8d45dba 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -266,6 +266,26 @@ var streamTests = func(t *testing.T, configs
util.KvrocksServerConfigs) {
require.Zero(t, rdb.Exists(ctx, "otherstream").Val())
})
+ t.Run("XADD with empty string, for issue #2830", func(t *testing.T) {
+ streamName := "empty-string-stream"
+ require.NoError(t, rdb.Del(ctx, streamName).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ Values: []string{"a", "b", "c", "", "", "d"},
+ }).Err())
+
+ messages, err := rdb.XRange(ctx, streamName, "-", "+").Result()
+ require.NoError(t, err)
+ require.Len(t, messages, 1)
+ require.EqualValues(t, map[string]interface{}{"a": "b", "c":
"", "": "d"}, messages[0].Values)
+
+ result, err := rdb.XRead(ctx, &redis.XReadArgs{Streams:
[]string{streamName, "0"}, Count: 100}).Result()
+ require.NoError(t, err)
+ require.Len(t, result, 1)
+ require.Len(t, result[0].Messages, 1)
+ require.EqualValues(t, map[string]interface{}{"a": "b", "c":
"", "": "d"}, result[0].Messages[0].Values)
+ })
+
t.Run("XRANGE COUNT works as expected", func(t *testing.T) {
require.Len(t, rdb.XRangeN(ctx, "mystream", "-", "+",
10).Val(), 10)
})