git-hulk commented on code in PR #1870:
URL: https://github.com/apache/kvrocks/pull/1870#discussion_r1382410095
##########
src/types/redis_stream.cc:
##########
@@ -737,6 +774,73 @@ rocksdb::Status Stream::GetStreamInfo(const rocksdb::Slice
&stream_name, bool fu
return rocksdb::Status::OK();
}
+rocksdb::Status Stream::GetGroupInfo(const Slice &stream_name,
+ std::vector<std::pair<std::string,
StreamConsumerGroupMetadata>> &group_metadata) {
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+ LockGuard guard(storage_->GetLockManager(), ns_key);
+ StreamMetadata metadata(false);
+ rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ if (!s.ok()) return s;
+
+ std::string next_version_prefix_key =
+ InternalKey(ns_key, "", metadata.version + 1,
storage_->IsSlotIdEncoded()).Encode();
+ std::string prefix_key = InternalKey(ns_key, "", metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+
+ rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
+ LatestSnapShot ss(storage_);
+ read_options.snapshot = ss.GetSnapShot();
+ rocksdb::Slice upper_bound(next_version_prefix_key);
+ read_options.iterate_upper_bound = &upper_bound;
+ rocksdb::Slice lower_bound(prefix_key);
+ read_options.iterate_lower_bound = &lower_bound;
+
+ auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ if (identifySubkeyType(iter->key()) ==
StreamSubkeyType::StreamConsumerGroupMetadata) {
+ std::string group_name = groupNameFromInternalKey(iter->key());
+ StreamConsumerGroupMetadata cg_metadata =
decodeStreamConsumerGroupMetadataValue(iter->value().ToString());
+ std::pair<std::string, StreamConsumerGroupMetadata> tmp_item(group_name,
cg_metadata);
+ group_metadata.push_back(tmp_item);
+ }
+ }
+ return rocksdb::Status::OK();
+}
+
+rocksdb::Status Stream::GetConsumerInfo(
+ const Slice &stream_name, const std::string &group_name,
+ std::vector<std::pair<std::string, StreamConsumerMetadata>>
&consumer_metadata) {
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+ LockGuard guard(storage_->GetLockManager(), ns_key);
Review Comment:
ditto
##########
src/types/redis_stream.cc:
##########
@@ -737,6 +774,73 @@ rocksdb::Status Stream::GetStreamInfo(const rocksdb::Slice
&stream_name, bool fu
return rocksdb::Status::OK();
}
+rocksdb::Status Stream::GetGroupInfo(const Slice &stream_name,
+ std::vector<std::pair<std::string,
StreamConsumerGroupMetadata>> &group_metadata) {
+ std::string ns_key = AppendNamespacePrefix(stream_name);
Review Comment:
I think we don't need to lock the key? because it's good to read the
snapshot only.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]