This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 290012ec fix(stream): Fix XPENDING serialization issue (#2566)
290012ec is described below
commit 290012ec8eb1bd942675677f894eafe2e2d451f1
Author: Nathan <[email protected]>
AuthorDate: Thu Oct 3 22:26:46 2024 -0400
fix(stream): Fix XPENDING serialization issue (#2566)
Co-authored-by: Aleks Lozovyuk <[email protected]>
Co-authored-by: Twice <[email protected]>
---
src/commands/cmd_stream.cc | 30 +++++++----
tests/gocase/unit/type/stream/stream_test.go | 79 ++++++++++++++++++++++++++--
2 files changed, 97 insertions(+), 12 deletions(-)
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index c9616db5..9a9c02a6 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -896,15 +896,27 @@ class CommandXPending : public Commander {
static Status SendResults([[maybe_unused]] Connection *conn, std::string
*output,
StreamGetPendingEntryResult &results) {
- output->append(redis::MultiLen(3 + results.consumer_infos.size()));
- output->append(redis::Integer(results.pending_number));
- output->append(redis::BulkString(results.first_entry_id.ToString()));
- output->append(redis::BulkString(results.last_entry_id.ToString()));
- output->append(redis::MultiLen(results.consumer_infos.size()));
- for (const auto &entry : results.consumer_infos) {
- output->append(redis::MultiLen(2));
- output->append(redis::BulkString(entry.first));
- output->append(redis::BulkString(std::to_string(entry.second)));
+ // NOTE: In the case that our stream has no pending elements, Redis will
+ // return NilString for the first and last entry IDs, and a nil array
+ // for the consumer infos. Make this a special case to maintain consistency
+ // with Redis.
+ if (results.pending_number == 0) {
+ output->append(redis::MultiLen(4));
+ output->append(redis::Integer(0));
+ output->append(conn->NilString());
+ output->append(conn->NilString());
+ output->append(conn->NilArray());
+ } else {
+ output->append(redis::MultiLen(4));
+ output->append(redis::Integer(results.pending_number));
+ output->append(redis::BulkString(results.first_entry_id.ToString()));
+ output->append(redis::BulkString(results.last_entry_id.ToString()));
+ output->append(redis::MultiLen(results.consumer_infos.size()));
+ for (const auto &entry : results.consumer_infos) {
+ output->append(redis::MultiLen(2));
+ output->append(redis::BulkString(entry.first));
+ output->append(redis::BulkString(std::to_string(entry.second)));
+ }
}
return Status::OK();
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index d852bbcf..2bae5c54 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -2086,16 +2086,19 @@ func TestStreamOffset(t *testing.T) {
t.Run("XPending with different kinds of commands", func(t *testing.T) {
streamName := "mystream"
groupName := "mygroup"
+
require.NoError(t, rdb.Del(ctx, streamName).Err())
r, err := rdb.XAck(ctx, streamName, groupName, "0-0").Result()
require.NoError(t, err)
require.Equal(t, int64(0), r)
+
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "1-0",
Values: []string{"field1", "data1"},
}).Err())
require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+
consumerName := "myconsumer"
err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
@@ -2113,7 +2116,7 @@ func TestStreamOffset(t *testing.T) {
Count: 1,
Lower: "1-0",
Higher: "1-0",
- Consumers: map[string]int64{"myconsumer": 1},
+ Consumers: map[string]int64{consumerName: 1},
}, r1)
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
@@ -2143,7 +2146,7 @@ func TestStreamOffset(t *testing.T) {
Count: 3,
Lower: "1-0",
Higher: "2-2",
- Consumers: map[string]int64{"myconsumer": 3},
+ Consumers: map[string]int64{consumerName: 3},
}, r1)
require.NoError(t, rdb.XAck(ctx, streamName, groupName,
"2-0").Err())
@@ -2155,9 +2158,79 @@ func TestStreamOffset(t *testing.T) {
Count: 2,
Lower: "1-0",
Higher: "2-2",
- Consumers: map[string]int64{"myconsumer": 2},
+ Consumers: map[string]int64{consumerName: 2},
+ }, r1)
+
+ // Add a second consumer and check that XPENDING still works
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "3-0",
+ Values: []string{"field1", "data1"},
+ }).Err())
+
+ consumerName2 := "myconsumer2"
+ err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName2,
+ Streams: []string{streamName, ">"},
+ Count: 1,
+ NoAck: false,
+ }).Err()
+ require.NoError(t, err)
+
+ r1, err1 = rdb.XPending(ctx, streamName, groupName).Result()
+ require.NoError(t, err1)
+
+ require.Equal(t, &redis.XPending{
+ Count: 3,
+ Lower: "1-0",
+ Higher: "3-0",
+ Consumers: map[string]int64{consumerName: 2,
consumerName2: 1},
}, r1)
})
+
+ t.Run("XPENDING on a consumer group with no pending messages", func(t
*testing.T) {
+ streamName := "stream"
+ groupName := "group"
+ consumerName := "consumer"
+ messageID := "1-0"
+
+ // Remove any existing data
+ require.NoError(t, rdb.Del(ctx, streamName).Err())
+ r, err := rdb.XAck(ctx, streamName, groupName, "0-0").Result()
+ require.NoError(t, err)
+ require.Equal(t, int64(0), r)
+
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: messageID,
+ Values: []string{"key", "value"},
+ }).Err())
+ require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName,
"0").Err())
+
+ // Have the consumer claim the message with ID [messageID]
+ err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, ">"},
+ Count: 1,
+ NoAck: false,
+ }).Err()
+ require.NoError(t, err)
+
+ // Acknowledge the message, so no messages remain pending
+ require.NoError(t, rdb.XAck(ctx, streamName, groupName,
messageID).Err())
+
+ // Check that XPENDING sets the min and max to nil, matching
Redis' behavior
+ pending, err := rdb.XPending(ctx, streamName,
groupName).Result()
+ require.NoError(t, err)
+ require.Equal(t, &redis.XPending{
+ Count: 0,
+ Lower: "",
+ Higher: "",
+ Consumers: map[string]int64{},
+ }, pending)
+ })
}
func parseStreamEntryID(id string) (ts int64, seqNum int64) {