jihuayu commented on code in PR #2120:
URL: https://github.com/apache/kvrocks/pull/2120#discussion_r1506911118
##########
src/types/redis_stream.cc:
##########
@@ -927,6 +971,143 @@ rocksdb::Status Stream::Range(const Slice &stream_name,
const StreamRangeOptions
return range(ns_key, metadata, options, entries);
}
+rocksdb::Status Stream::RangeWithPending(const Slice &stream_name,
StreamRangeOptions &options,
+ std::vector<StreamEntry> *entries,
std::string &group_name,
+ std::string &consumer_name, bool
noack, bool latest) {
+ entries->clear();
+
+ if (options.with_count && options.count == 0) {
+ return rocksdb::Status::OK();
+ }
+
+ if (options.exclude_start && options.start.IsMaximum()) {
+ return rocksdb::Status::InvalidArgument("invalid start ID for the
interval");
+ }
+
+ if (options.exclude_end && options.end.IsMinimum()) {
+ return rocksdb::Status::InvalidArgument("invalid end ID for the interval");
+ }
+
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+
+ StreamMetadata metadata(false);
+ rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ if (!s.ok()) {
+ return s.IsNotFound() ? rocksdb::Status::OK() : s;
+ }
+
+ std::string group_key = internalKeyFromGroupName(ns_key, metadata,
group_name);
+ std::string get_group_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key,
&get_group_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group "
+ group_name + " for key name " +
+ stream_name.ToString());
+ }
+
+ std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata,
group_name, consumer_name);
+ std::string get_consumer_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key,
&get_consumer_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ int created_number = 0;
+ s = CreateConsumer(stream_name, group_name, consumer_name,
&created_number);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ LockGuard guard(storage_->GetLockManager(), ns_key);
Review Comment:
Ok. This is indeed a problem. We can think about how to improve the
throughput of the stream later.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]