This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 8a19bd50 Check if the type is an entry while iterating the stream
subkeys (#2112)
8a19bd50 is described below
commit 8a19bd50b0127f1d4882459d85eb6ea03a0d45c6
Author: 纪华裕 <[email protected]>
AuthorDate: Fri Feb 23 11:58:51 2024 +0800
Check if the type is an entry while iterating the stream subkeys (#2112)
---
src/types/redis_stream.cc | 9 +++++++--
src/types/redis_stream.h | 2 +-
tests/gocase/unit/type/stream/stream_test.go | 15 +++++++++++++++
3 files changed, 23 insertions(+), 3 deletions(-)
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 48fe7928..b03dc378 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -250,7 +250,7 @@ StreamConsumerMetadata
Stream::decodeStreamConsumerMetadataValue(const std::stri
return consumer_metadata;
}
-StreamSubkeyType Stream::identifySubkeyType(const rocksdb::Slice &key) {
+StreamSubkeyType Stream::identifySubkeyType(const rocksdb::Slice &key) const {
InternalKey ikey(key, storage_->IsSlotIdEncoded());
Slice subkey = ikey.GetSubKey();
const size_t entry_id_size = sizeof(StreamEntryID);
@@ -618,7 +618,9 @@ rocksdb::Status Stream::Len(const Slice &stream_name, const
StreamLenOptions &op
}
for (; iter->Valid(); options.to_first ? iter->Prev() : iter->Next()) {
- *size += 1;
+ if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamEntry) {
+ *size += 1;
+ }
}
return rocksdb::Status::OK();
@@ -674,6 +676,9 @@ rocksdb::Status Stream::range(const std::string &ns_key,
const StreamMetadata &m
for (; iter->Valid() && (options.reverse ? iter->key().ToString() >= end_key
: iter->key().ToString() <= end_key);
options.reverse ? iter->Prev() : iter->Next()) {
+ if (identifySubkeyType(iter->key()) != StreamSubkeyType::StreamEntry) {
+ continue;
+ }
if (options.exclude_start && iter->key().ToString() == start_key) {
continue;
}
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index 8ae5a14d..8f6367a7 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -82,7 +82,7 @@ class Stream : public SubKeyScanner {
std::string consumerNameFromInternalKey(rocksdb::Slice key) const;
static std::string encodeStreamConsumerMetadataValue(const
StreamConsumerMetadata &consumer_metadata);
static StreamConsumerMetadata decodeStreamConsumerMetadataValue(const
std::string &value);
- StreamSubkeyType identifySubkeyType(const rocksdb::Slice &key);
+ StreamSubkeyType identifySubkeyType(const rocksdb::Slice &key) const;
};
} // namespace redis
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index 7dee10b6..7bbce02b 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -985,6 +985,21 @@ func TestStreamOffset(t *testing.T) {
r1 = rdb.XInfoConsumers(ctx, streamName, group2).Val()
require.Equal(t, consumer3, r1[0].Name)
})
+
+ t.Run("XREAD After XGroupCreate and XGroupCreateConsumer, for issue
#2109", func(t *testing.T) {
+ streamName := "test-stream"
+ group := "group"
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: "*",
+ Values: []string{"data1", "b"},
+ }).Err())
+ require.NoError(t, rdb.XGroupCreate(ctx, streamName, group,
"0").Err())
+ require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName,
group, "consumer").Err())
+ require.NoError(t, rdb.XRead(ctx, &redis.XReadArgs{
+ Streams: []string{streamName, "0"},
+ }).Err())
+ })
}
func parseStreamEntryID(id string) (ts int64, seqNum int64) {