Beihao-Zhou commented on code in PR #2202:
URL: https://github.com/apache/kvrocks/pull/2202#discussion_r1597668646
##########
src/types/redis_stream.cc:
##########
@@ -357,6 +357,134 @@ rocksdb::Status Stream::DeletePelEntries(const Slice
&stream_name, const std::st
return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
+rocksdb::Status Stream::ClaimPelEntries(const Slice &stream_name, const
std::string &group_name,
+ const std::string &consumer_name,
const uint64_t min_idle_time,
+ const std::vector<StreamEntryID>
&entry_ids, const StreamClaimOptions &options,
+ StreamClaimResult *result) {
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+ LockGuard guard(storage_->GetLockManager(), ns_key);
+ StreamMetadata metadata(false);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
+ if (!s.ok()) return s;
+
+ std::string group_key = internalKeyFromGroupName(ns_key, metadata,
group_name);
+ std::string get_group_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key,
&get_group_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group "
+ group_name + " for key name " +
+ stream_name.ToString());
+ }
+ StreamConsumerGroupMetadata group_metadata =
decodeStreamConsumerGroupMetadataValue(get_group_value);
+
+ std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata,
group_name, consumer_name);
+ std::string get_consumer_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key,
&get_consumer_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ int created_number = 0;
+ s = createConsumerWithoutLock(stream_name, group_name, consumer_name,
&created_number);
+ if (!s.ok()) {
+ return s;
+ }
+ group_metadata.consumer_number += created_number;
+ }
+ StreamConsumerMetadata consumer_metadata;
+ if (!s.IsNotFound()) {
+ consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
+ }
+ auto now = util::GetTimeStampMS();
+ consumer_metadata.last_idle = now;
+ consumer_metadata.last_active = now;
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisStream);
+ batch->PutLogData(log_data.Encode());
+
+ for (const auto &id : entry_ids) {
+ std::string raw_value;
+ rocksdb::Status s = getEntryRawValue(ns_key, metadata, id, &raw_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) continue;
+
+ std::string entry_key = internalPelKeyFromGroupAndEntryId(ns_key,
metadata, group_name, id);
+ std::string value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key,
&value);
+ StreamPelEntry pel_entry;
+
+ if (!s.ok() && s.IsNotFound() && options.force) {
+ pel_entry = {0, 0, ""};
+ group_metadata.pending_number += 1;
+ }
+
+ if (s.ok()) {
+ pel_entry = decodeStreamPelEntryValue(value);
+ }
+
+ if (s.ok() || (s.IsNotFound() && options.force)) {
+ if (now - pel_entry.last_delivery_time < min_idle_time) continue;
+
+ std::vector<std::string> values;
+ if (options.just_id) {
+ result->ids.emplace_back(id.ToString());
+ } else {
+ auto rv = DecodeRawStreamEntryValue(raw_value, &values);
+ if (!rv.IsOK()) {
+ return rocksdb::Status::InvalidArgument(rv.Msg());
+ }
+ result->entries.emplace_back(id.ToString(), std::move(values));
+ }
+
+ if (pel_entry.consumer_name != "") {
+ std::string original_consumer_key =
+ internalKeyFromConsumerName(ns_key, metadata, group_name,
pel_entry.consumer_name);
+ std::string get_original_consumer_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_,
original_consumer_key,
+ &get_original_consumer_value);
+ if (!s.ok()) {
+ return s;
+ }
+ StreamConsumerMetadata original_consumer_metadata =
+ decodeStreamConsumerMetadataValue(get_original_consumer_value);
+ original_consumer_metadata.pending_number -= 1;
+ batch->Put(stream_cf_handle_, original_consumer_key,
+
encodeStreamConsumerMetadataValue(original_consumer_metadata));
+ }
+
+ pel_entry.consumer_name = consumer_name;
+ consumer_metadata.pending_number += 1;
+ if (options.with_time) {
+ pel_entry.last_delivery_time = options.last_delivery_time;
+ } else {
+ pel_entry.last_delivery_time = now - options.idle_time;
Review Comment:
Thanks for pointing it out! The `last_delivery_time` boundary check has been
added now. But for `idle_time`, as commented below, it's been initialized as 0
if the client doesn't pass any value in struct `StreamClaimOptions`, so I'm
wondering if we still need to check it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]