Yangsx-1 commented on code in PR #2202:
URL: https://github.com/apache/kvrocks/pull/2202#discussion_r1593311060
##########
src/types/redis_stream.cc:
##########
@@ -357,6 +357,134 @@ rocksdb::Status Stream::DeletePelEntries(const Slice
&stream_name, const std::st
return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
+rocksdb::Status Stream::ClaimPelEntries(const Slice &stream_name, const
std::string &group_name,
+ const std::string &consumer_name,
const uint64_t min_idle_time,
+ const std::vector<StreamEntryID>
&entry_ids, const StreamClaimOptions &options,
+ StreamClaimResult *result) {
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+ LockGuard guard(storage_->GetLockManager(), ns_key);
+ StreamMetadata metadata(false);
+ rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
+ if (!s.ok()) return s;
+
+ std::string group_key = internalKeyFromGroupName(ns_key, metadata,
group_name);
+ std::string get_group_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key,
&get_group_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group "
+ group_name + " for key name " +
+ stream_name.ToString());
+ }
+ StreamConsumerGroupMetadata group_metadata =
decodeStreamConsumerGroupMetadataValue(get_group_value);
+
+ std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata,
group_name, consumer_name);
+ std::string get_consumer_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key,
&get_consumer_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ int created_number = 0;
+ s = createConsumerWithoutLock(stream_name, group_name, consumer_name,
&created_number);
+ if (!s.ok()) {
+ return s;
+ }
+ group_metadata.consumer_number += created_number;
+ }
+ StreamConsumerMetadata consumer_metadata;
+ if (!s.IsNotFound()) {
+ consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
+ }
+ auto now = util::GetTimeStampMS();
+ consumer_metadata.last_idle = now;
+ consumer_metadata.last_active = now;
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisStream);
+ batch->PutLogData(log_data.Encode());
+
+ for (const auto &id : entry_ids) {
+ std::string raw_value;
+ rocksdb::Status s = getEntryRawValue(ns_key, metadata, id, &raw_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) continue;
+
+ std::string entry_key = internalPelKeyFromGroupAndEntryId(ns_key,
metadata, group_name, id);
+ std::string value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key,
&value);
+ StreamPelEntry pel_entry;
+
+ if (!s.ok() && s.IsNotFound() && options.force) {
+ pel_entry = {0, 0, ""};
+ group_metadata.pending_number += 1;
+ }
+
+ if (s.ok()) {
+ pel_entry = decodeStreamPelEntryValue(value);
+ }
+
+ if (s.ok() || (s.IsNotFound() && options.force)) {
+ if (now - pel_entry.last_delivery_time < min_idle_time) continue;
+
+ std::vector<std::string> values;
+ if (options.just_id) {
+ result->ids.emplace_back(id.ToString());
+ } else {
+ auto rv = DecodeRawStreamEntryValue(raw_value, &values);
+ if (!rv.IsOK()) {
+ return rocksdb::Status::InvalidArgument(rv.Msg());
+ }
+ result->entries.emplace_back(id.ToString(), std::move(values));
+ }
+
+ if (pel_entry.consumer_name != "") {
+ std::string original_consumer_key =
+ internalKeyFromConsumerName(ns_key, metadata, group_name,
pel_entry.consumer_name);
+ std::string get_original_consumer_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_,
original_consumer_key,
+ &get_original_consumer_value);
+ if (!s.ok()) {
+ return s;
+ }
+ StreamConsumerMetadata original_consumer_metadata =
+ decodeStreamConsumerMetadataValue(get_original_consumer_value);
+ original_consumer_metadata.pending_number -= 1;
+ batch->Put(stream_cf_handle_, original_consumer_key,
+
encodeStreamConsumerMetadataValue(original_consumer_metadata));
+ }
+
+ pel_entry.consumer_name = consumer_name;
+ consumer_metadata.pending_number += 1;
+ if (options.with_time) {
+ pel_entry.last_delivery_time = options.last_delivery_time;
+ } else {
+ pel_entry.last_delivery_time = now - options.idle_time;
Review Comment:
`If no IDLE/TIME option was passed, we want the last delivery time to be
now, so that the idle time of the message will be zero.`
I think we should check this little proplem. And if the
options.last_delivery_time is larger than now, we should modify it equal to now:
`if (deliverytime < 0 || deliverytime > now) deliverytime = now`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]