This is an automated email from the ASF dual-hosted git repository. hulk pushed a commit to branch 2.10 in repository https://gitbox.apache.org/repos/asf/kvrocks.git
commit a811a1d3ea18cc7a4da358e686178abbfdb3ec90 Author: Edward Xu <[email protected]> AuthorDate: Sat Oct 5 00:13:35 2024 +0800 fix(stream): fix `XINFO` group metadata key overlapping (#2576) --- src/types/redis_stream.cc | 11 +++++++- tests/gocase/unit/type/stream/stream_test.go | 38 ++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index 1c64e9c6..009a98c0 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -1340,7 +1340,8 @@ rocksdb::Status Stream::GetGroupInfo(engine::Context &ctx, const Slice &stream_n rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return s; - std::string subkey_type_delimiter = std::to_string(UINT64_MAX); + std::string subkey_type_delimiter; + PutFixed64(&subkey_type_delimiter, UINT64_MAX); PutFixed8(&subkey_type_delimiter, (uint8_t)StreamSubkeyType::StreamConsumerGroupMetadata); std::string next_version_prefix_key = InternalKey(ns_key, subkey_type_delimiter, metadata.version + 1, storage_->IsSlotIdEncoded()).Encode(); @@ -1355,6 +1356,10 @@ rocksdb::Status Stream::GetGroupInfo(engine::Context &ctx, const Slice &stream_n auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + if (identifySubkeyType(iter->key()) != StreamSubkeyType::StreamConsumerGroupMetadata) { + continue; + } + std::string group_name = groupNameFromInternalKey(iter->key()); StreamConsumerGroupMetadata cg_metadata = decodeStreamConsumerGroupMetadataValue(iter->value().ToString()); CheckLagValid(metadata, cg_metadata); @@ -1390,6 +1395,10 @@ rocksdb::Status Stream::GetConsumerInfo( auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + if (identifySubkeyType(iter->key()) != StreamSubkeyType::StreamConsumerMetadata) { + continue; + } + std::string consumer_name = consumerNameFromInternalKey(iter->key()); StreamConsumerMetadata c_metadata = decodeStreamConsumerMetadataValue(iter->value().ToString()); std::pair<std::string, StreamConsumerMetadata> tmp_item(consumer_name, c_metadata); diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index 2bae5c54..429902ae 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -2231,6 +2231,44 @@ func TestStreamOffset(t *testing.T) { Consumers: map[string]int64{}, }, pending) }) + + t.Run("XINFO GROUPS, issue #2568", func(t *testing.T) { + streamName := "mystream-2568" + groupName := "mygroup-2568" + consumerName := "consumer-2568" + require.NoError(t, rdb.Del(ctx, streamName).Err()) + + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()) + + { + r, err := rdb.XInfoGroups(ctx, streamName).Result() + require.NoError(t, err) + require.Equal(t, []redis.XInfoGroup{{ + Name: groupName, + Consumers: 0, + Pending: 0, + LastDeliveredID: "0-0", + }}, r) + } + { + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: consumerName, + Streams: []string{streamName, "0"}, + }).Result() + require.NoError(t, err) + } + { + r, err := rdb.XInfoGroups(ctx, streamName).Result() + require.NoError(t, err) + require.Equal(t, []redis.XInfoGroup{{ + Name: groupName, + Consumers: 1, + Pending: 0, + LastDeliveredID: "0-0", + }}, r) + } + }) } func parseStreamEntryID(id string) (ts int64, seqNum int64) {
