This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 290012ec fix(stream): Fix XPENDING serialization issue (#2566)
290012ec is described below

commit 290012ec8eb1bd942675677f894eafe2e2d451f1
Author: Nathan <[email protected]>
AuthorDate: Thu Oct 3 22:26:46 2024 -0400

    fix(stream): Fix XPENDING serialization issue (#2566)
    
    Co-authored-by: Aleks Lozovyuk <[email protected]>
    Co-authored-by: Twice <[email protected]>
---
 src/commands/cmd_stream.cc                   | 30 +++++++----
 tests/gocase/unit/type/stream/stream_test.go | 79 ++++++++++++++++++++++++++--
 2 files changed, 97 insertions(+), 12 deletions(-)

diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index c9616db5..9a9c02a6 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -896,15 +896,27 @@ class CommandXPending : public Commander {
 
   static Status SendResults([[maybe_unused]] Connection *conn, std::string 
*output,
                             StreamGetPendingEntryResult &results) {
-    output->append(redis::MultiLen(3 + results.consumer_infos.size()));
-    output->append(redis::Integer(results.pending_number));
-    output->append(redis::BulkString(results.first_entry_id.ToString()));
-    output->append(redis::BulkString(results.last_entry_id.ToString()));
-    output->append(redis::MultiLen(results.consumer_infos.size()));
-    for (const auto &entry : results.consumer_infos) {
-      output->append(redis::MultiLen(2));
-      output->append(redis::BulkString(entry.first));
-      output->append(redis::BulkString(std::to_string(entry.second)));
+    // NOTE: In the case that our stream has no pending elements, Redis will
+    // return NilString for the first and last entry IDs, and a nil array
+    // for the consumer infos. Make this a special case to maintain consistency
+    // with Redis.
+    if (results.pending_number == 0) {
+      output->append(redis::MultiLen(4));
+      output->append(redis::Integer(0));
+      output->append(conn->NilString());
+      output->append(conn->NilString());
+      output->append(conn->NilArray());
+    } else {
+      output->append(redis::MultiLen(4));
+      output->append(redis::Integer(results.pending_number));
+      output->append(redis::BulkString(results.first_entry_id.ToString()));
+      output->append(redis::BulkString(results.last_entry_id.ToString()));
+      output->append(redis::MultiLen(results.consumer_infos.size()));
+      for (const auto &entry : results.consumer_infos) {
+        output->append(redis::MultiLen(2));
+        output->append(redis::BulkString(entry.first));
+        output->append(redis::BulkString(std::to_string(entry.second)));
+      }
     }
 
     return Status::OK();
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index d852bbcf..2bae5c54 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -2086,16 +2086,19 @@ func TestStreamOffset(t *testing.T) {
        t.Run("XPending with different kinds of commands", func(t *testing.T) {
                streamName := "mystream"
                groupName := "mygroup"
+
                require.NoError(t, rdb.Del(ctx, streamName).Err())
                r, err := rdb.XAck(ctx, streamName, groupName, "0-0").Result()
                require.NoError(t, err)
                require.Equal(t, int64(0), r)
+
                require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
                        Stream: streamName,
                        ID:     "1-0",
                        Values: []string{"field1", "data1"},
                }).Err())
                require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, 
"0").Err())
+
                consumerName := "myconsumer"
                err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
                        Group:    groupName,
@@ -2113,7 +2116,7 @@ func TestStreamOffset(t *testing.T) {
                        Count:     1,
                        Lower:     "1-0",
                        Higher:    "1-0",
-                       Consumers: map[string]int64{"myconsumer": 1},
+                       Consumers: map[string]int64{consumerName: 1},
                }, r1)
 
                require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
@@ -2143,7 +2146,7 @@ func TestStreamOffset(t *testing.T) {
                        Count:     3,
                        Lower:     "1-0",
                        Higher:    "2-2",
-                       Consumers: map[string]int64{"myconsumer": 3},
+                       Consumers: map[string]int64{consumerName: 3},
                }, r1)
 
                require.NoError(t, rdb.XAck(ctx, streamName, groupName, 
"2-0").Err())
@@ -2155,9 +2158,79 @@ func TestStreamOffset(t *testing.T) {
                        Count:     2,
                        Lower:     "1-0",
                        Higher:    "2-2",
-                       Consumers: map[string]int64{"myconsumer": 2},
+                       Consumers: map[string]int64{consumerName: 2},
+               }, r1)
+
+               // Add a second consumer and check that XPENDING still works
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "3-0",
+                       Values: []string{"field1", "data1"},
+               }).Err())
+
+               consumerName2 := "myconsumer2"
+               err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName2,
+                       Streams:  []string{streamName, ">"},
+                       Count:    1,
+                       NoAck:    false,
+               }).Err()
+               require.NoError(t, err)
+
+               r1, err1 = rdb.XPending(ctx, streamName, groupName).Result()
+               require.NoError(t, err1)
+
+               require.Equal(t, &redis.XPending{
+                       Count:     3,
+                       Lower:     "1-0",
+                       Higher:    "3-0",
+                       Consumers: map[string]int64{consumerName: 2, 
consumerName2: 1},
                }, r1)
        })
+
+       t.Run("XPENDING on a consumer group with no pending messages", func(t 
*testing.T) {
+               streamName := "stream"
+               groupName := "group"
+               consumerName := "consumer"
+               messageID := "1-0"
+
+               // Remove any existing data
+               require.NoError(t, rdb.Del(ctx, streamName).Err())
+               r, err := rdb.XAck(ctx, streamName, groupName, "0-0").Result()
+               require.NoError(t, err)
+               require.Equal(t, int64(0), r)
+
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     messageID,
+                       Values: []string{"key", "value"},
+               }).Err())
+               require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, 
"0").Err())
+
+               // Have the consumer claim the message with ID [messageID]
+               err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, ">"},
+                       Count:    1,
+                       NoAck:    false,
+               }).Err()
+               require.NoError(t, err)
+
+               // Acknowledge the message, so no messages remain pending
+               require.NoError(t, rdb.XAck(ctx, streamName, groupName, 
messageID).Err())
+
+               // Check that XPENDING sets the min and max to nil, matching 
Redis' behavior
+               pending, err := rdb.XPending(ctx, streamName, 
groupName).Result()
+               require.NoError(t, err)
+               require.Equal(t, &redis.XPending{
+                       Count:     0,
+                       Lower:     "",
+                       Higher:    "",
+                       Consumers: map[string]int64{},
+               }, pending)
+       })
 }
 
 func parseStreamEntryID(id string) (ts int64, seqNum int64) {

Reply via email to