This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 3b8dcf6e fix(stream): fix `XINFO` group metadata key overlapping
(#2576)
3b8dcf6e is described below
commit 3b8dcf6e9eb53153c98213b173e9f20c34f2b163
Author: Edward Xu <[email protected]>
AuthorDate: Sat Oct 5 00:13:35 2024 +0800
fix(stream): fix `XINFO` group metadata key overlapping (#2576)
---
src/types/redis_stream.cc | 11 +++++++-
tests/gocase/unit/type/stream/stream_test.go | 38 ++++++++++++++++++++++++++++
2 files changed, 48 insertions(+), 1 deletion(-)
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 1c64e9c6..009a98c0 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -1340,7 +1340,8 @@ rocksdb::Status Stream::GetGroupInfo(engine::Context
&ctx, const Slice &stream_n
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s;
- std::string subkey_type_delimiter = std::to_string(UINT64_MAX);
+ std::string subkey_type_delimiter;
+ PutFixed64(&subkey_type_delimiter, UINT64_MAX);
PutFixed8(&subkey_type_delimiter,
(uint8_t)StreamSubkeyType::StreamConsumerGroupMetadata);
std::string next_version_prefix_key =
InternalKey(ns_key, subkey_type_delimiter, metadata.version + 1,
storage_->IsSlotIdEncoded()).Encode();
@@ -1355,6 +1356,10 @@ rocksdb::Status Stream::GetGroupInfo(engine::Context
&ctx, const Slice &stream_n
auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ if (identifySubkeyType(iter->key()) !=
StreamSubkeyType::StreamConsumerGroupMetadata) {
+ continue;
+ }
+
std::string group_name = groupNameFromInternalKey(iter->key());
StreamConsumerGroupMetadata cg_metadata =
decodeStreamConsumerGroupMetadataValue(iter->value().ToString());
CheckLagValid(metadata, cg_metadata);
@@ -1390,6 +1395,10 @@ rocksdb::Status Stream::GetConsumerInfo(
auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ if (identifySubkeyType(iter->key()) !=
StreamSubkeyType::StreamConsumerMetadata) {
+ continue;
+ }
+
std::string consumer_name = consumerNameFromInternalKey(iter->key());
StreamConsumerMetadata c_metadata =
decodeStreamConsumerMetadataValue(iter->value().ToString());
std::pair<std::string, StreamConsumerMetadata> tmp_item(consumer_name,
c_metadata);
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index 2bae5c54..429902ae 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -2231,6 +2231,44 @@ func TestStreamOffset(t *testing.T) {
Consumers: map[string]int64{},
}, pending)
})
+
+ t.Run("XINFO GROUPS, issue #2568", func(t *testing.T) {
+ streamName := "mystream-2568"
+ groupName := "mygroup-2568"
+ consumerName := "consumer-2568"
+ require.NoError(t, rdb.Del(ctx, streamName).Err())
+
+ require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName,
groupName, "0").Err())
+
+ {
+ r, err := rdb.XInfoGroups(ctx, streamName).Result()
+ require.NoError(t, err)
+ require.Equal(t, []redis.XInfoGroup{{
+ Name: groupName,
+ Consumers: 0,
+ Pending: 0,
+ LastDeliveredID: "0-0",
+ }}, r)
+ }
+ {
+ _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, "0"},
+ }).Result()
+ require.NoError(t, err)
+ }
+ {
+ r, err := rdb.XInfoGroups(ctx, streamName).Result()
+ require.NoError(t, err)
+ require.Equal(t, []redis.XInfoGroup{{
+ Name: groupName,
+ Consumers: 1,
+ Pending: 0,
+ LastDeliveredID: "0-0",
+ }}, r)
+ }
+ })
}
func parseStreamEntryID(id string) (ts int64, seqNum int64) {