This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 8a19bd50 Check if the type is an entry  while iterating the stream 
subkeys (#2112)
8a19bd50 is described below

commit 8a19bd50b0127f1d4882459d85eb6ea03a0d45c6
Author: 纪华裕 <[email protected]>
AuthorDate: Fri Feb 23 11:58:51 2024 +0800

    Check if the type is an entry  while iterating the stream subkeys (#2112)
---
 src/types/redis_stream.cc                    |  9 +++++++--
 src/types/redis_stream.h                     |  2 +-
 tests/gocase/unit/type/stream/stream_test.go | 15 +++++++++++++++
 3 files changed, 23 insertions(+), 3 deletions(-)

diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 48fe7928..b03dc378 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -250,7 +250,7 @@ StreamConsumerMetadata 
Stream::decodeStreamConsumerMetadataValue(const std::stri
   return consumer_metadata;
 }
 
-StreamSubkeyType Stream::identifySubkeyType(const rocksdb::Slice &key) {
+StreamSubkeyType Stream::identifySubkeyType(const rocksdb::Slice &key) const {
   InternalKey ikey(key, storage_->IsSlotIdEncoded());
   Slice subkey = ikey.GetSubKey();
   const size_t entry_id_size = sizeof(StreamEntryID);
@@ -618,7 +618,9 @@ rocksdb::Status Stream::Len(const Slice &stream_name, const 
StreamLenOptions &op
   }
 
   for (; iter->Valid(); options.to_first ? iter->Prev() : iter->Next()) {
-    *size += 1;
+    if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamEntry) {
+      *size += 1;
+    }
   }
 
   return rocksdb::Status::OK();
@@ -674,6 +676,9 @@ rocksdb::Status Stream::range(const std::string &ns_key, 
const StreamMetadata &m
 
   for (; iter->Valid() && (options.reverse ? iter->key().ToString() >= end_key 
: iter->key().ToString() <= end_key);
        options.reverse ? iter->Prev() : iter->Next()) {
+    if (identifySubkeyType(iter->key()) != StreamSubkeyType::StreamEntry) {
+      continue;
+    }
     if (options.exclude_start && iter->key().ToString() == start_key) {
       continue;
     }
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index 8ae5a14d..8f6367a7 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -82,7 +82,7 @@ class Stream : public SubKeyScanner {
   std::string consumerNameFromInternalKey(rocksdb::Slice key) const;
   static std::string encodeStreamConsumerMetadataValue(const 
StreamConsumerMetadata &consumer_metadata);
   static StreamConsumerMetadata decodeStreamConsumerMetadataValue(const 
std::string &value);
-  StreamSubkeyType identifySubkeyType(const rocksdb::Slice &key);
+  StreamSubkeyType identifySubkeyType(const rocksdb::Slice &key) const;
 };
 
 }  // namespace redis
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index 7dee10b6..7bbce02b 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -985,6 +985,21 @@ func TestStreamOffset(t *testing.T) {
                r1 = rdb.XInfoConsumers(ctx, streamName, group2).Val()
                require.Equal(t, consumer3, r1[0].Name)
        })
+
+       t.Run("XREAD After XGroupCreate and XGroupCreateConsumer, for issue 
#2109", func(t *testing.T) {
+               streamName := "test-stream"
+               group := "group"
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     "*",
+                       Values: []string{"data1", "b"},
+               }).Err())
+               require.NoError(t, rdb.XGroupCreate(ctx, streamName, group, 
"0").Err())
+               require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, 
group, "consumer").Err())
+               require.NoError(t, rdb.XRead(ctx, &redis.XReadArgs{
+                       Streams: []string{streamName, "0"},
+               }).Err())
+       })
 }
 
 func parseStreamEntryID(id string) (ts int64, seqNum int64) {

Reply via email to