This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch 2.10
in repository https://gitbox.apache.org/repos/asf/kvrocks.git

commit a811a1d3ea18cc7a4da358e686178abbfdb3ec90
Author: Edward Xu <[email protected]>
AuthorDate: Sat Oct 5 00:13:35 2024 +0800

    fix(stream): fix `XINFO` group metadata key overlapping (#2576)
---
 src/types/redis_stream.cc                    | 11 +++++++-
 tests/gocase/unit/type/stream/stream_test.go | 38 ++++++++++++++++++++++++++++
 2 files changed, 48 insertions(+), 1 deletion(-)

diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 1c64e9c6..009a98c0 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -1340,7 +1340,8 @@ rocksdb::Status Stream::GetGroupInfo(engine::Context 
&ctx, const Slice &stream_n
   rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
   if (!s.ok()) return s;
 
-  std::string subkey_type_delimiter = std::to_string(UINT64_MAX);
+  std::string subkey_type_delimiter;
+  PutFixed64(&subkey_type_delimiter, UINT64_MAX);
   PutFixed8(&subkey_type_delimiter, 
(uint8_t)StreamSubkeyType::StreamConsumerGroupMetadata);
   std::string next_version_prefix_key =
       InternalKey(ns_key, subkey_type_delimiter, metadata.version + 1, 
storage_->IsSlotIdEncoded()).Encode();
@@ -1355,6 +1356,10 @@ rocksdb::Status Stream::GetGroupInfo(engine::Context 
&ctx, const Slice &stream_n
 
   auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_);
   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+    if (identifySubkeyType(iter->key()) != 
StreamSubkeyType::StreamConsumerGroupMetadata) {
+      continue;
+    }
+
     std::string group_name = groupNameFromInternalKey(iter->key());
     StreamConsumerGroupMetadata cg_metadata = 
decodeStreamConsumerGroupMetadataValue(iter->value().ToString());
     CheckLagValid(metadata, cg_metadata);
@@ -1390,6 +1395,10 @@ rocksdb::Status Stream::GetConsumerInfo(
 
   auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_);
   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+    if (identifySubkeyType(iter->key()) != 
StreamSubkeyType::StreamConsumerMetadata) {
+      continue;
+    }
+
     std::string consumer_name = consumerNameFromInternalKey(iter->key());
     StreamConsumerMetadata c_metadata = 
decodeStreamConsumerMetadataValue(iter->value().ToString());
     std::pair<std::string, StreamConsumerMetadata> tmp_item(consumer_name, 
c_metadata);
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index 2bae5c54..429902ae 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -2231,6 +2231,44 @@ func TestStreamOffset(t *testing.T) {
                        Consumers: map[string]int64{},
                }, pending)
        })
+
+       t.Run("XINFO GROUPS, issue #2568", func(t *testing.T) {
+               streamName := "mystream-2568"
+               groupName := "mygroup-2568"
+               consumerName := "consumer-2568"
+               require.NoError(t, rdb.Del(ctx, streamName).Err())
+
+               require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, 
groupName, "0").Err())
+
+               {
+                       r, err := rdb.XInfoGroups(ctx, streamName).Result()
+                       require.NoError(t, err)
+                       require.Equal(t, []redis.XInfoGroup{{
+                               Name:            groupName,
+                               Consumers:       0,
+                               Pending:         0,
+                               LastDeliveredID: "0-0",
+                       }}, r)
+               }
+               {
+                       _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                               Group:    groupName,
+                               Consumer: consumerName,
+                               Streams:  []string{streamName, "0"},
+                       }).Result()
+                       require.NoError(t, err)
+               }
+               {
+                       r, err := rdb.XInfoGroups(ctx, streamName).Result()
+                       require.NoError(t, err)
+                       require.Equal(t, []redis.XInfoGroup{{
+                               Name:            groupName,
+                               Consumers:       1,
+                               Pending:         0,
+                               LastDeliveredID: "0-0",
+                       }}, r)
+               }
+       })
 }
 
 func parseStreamEntryID(id string) (ts int64, seqNum int64) {

Reply via email to