Yangsx-1 commented on code in PR #2202:
URL: https://github.com/apache/kvrocks/pull/2202#discussion_r1575884298


##########
src/types/redis_stream.cc:
##########
@@ -357,6 +357,134 @@ rocksdb::Status Stream::DeletePelEntries(const Slice 
&stream_name, const std::st
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+rocksdb::Status Stream::ClaimPelEntries(const Slice &stream_name, const 
std::string &group_name,
+                                        const std::string &consumer_name, 
const uint64_t min_idle_time,
+                                        const std::vector<StreamEntryID> 
&entry_ids, const StreamClaimOptions &options,
+                                        StreamClaimResult *result) {
+  std::string ns_key = AppendNamespacePrefix(stream_name);
+  LockGuard guard(storage_->GetLockManager(), ns_key);
+  StreamMetadata metadata(false);
+  rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
+  if (!s.ok()) return s;
+
+  std::string group_key = internalKeyFromGroupName(ns_key, metadata, 
group_name);
+  std::string get_group_value;
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key, 
&get_group_value);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+  if (s.IsNotFound()) {
+    return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group " 
+ group_name + " for key name " +
+                                            stream_name.ToString());
+  }
+  StreamConsumerGroupMetadata group_metadata = 
decodeStreamConsumerGroupMetadataValue(get_group_value);
+
+  std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, 
group_name, consumer_name);
+  std::string get_consumer_value;
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, 
&get_consumer_value);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+  if (s.IsNotFound()) {
+    int created_number = 0;
+    s = createConsumerWithoutLock(stream_name, group_name, consumer_name, 
&created_number);
+    if (!s.ok()) {
+      return s;
+    }
+    group_metadata.consumer_number += created_number;
+  }
+  StreamConsumerMetadata consumer_metadata;
+  if (!s.IsNotFound()) {
+    consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
+  }
+  auto now = util::GetTimeStampMS();
+  consumer_metadata.last_idle = now;
+  consumer_metadata.last_active = now;
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisStream);
+  batch->PutLogData(log_data.Encode());
+
+  for (const auto &id : entry_ids) {
+    std::string raw_value;
+    rocksdb::Status s = getEntryRawValue(ns_key, metadata, id, &raw_value);
+    if (!s.ok() && !s.IsNotFound()) {
+      return s;
+    }
+    if (s.IsNotFound()) continue;
+
+    std::string entry_key = internalPelKeyFromGroupAndEntryId(ns_key, 
metadata, group_name, id);
+    std::string value;
+    s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, 
&value);
+    StreamPelEntry pel_entry;
+
+    if (!s.ok() && s.IsNotFound() && options.force) {
+      pel_entry = {0, 0, ""};
+      group_metadata.pending_number += 1;
+    }
+
+    if (s.ok()) {
+      pel_entry = decodeStreamPelEntryValue(value);
+    }
+
+    if (s.ok() || (!s.ok() && s.IsNotFound() && options.force)) {

Review Comment:
   Maybe !s.ok() is not necessary here.



##########
src/types/redis_stream.cc:
##########
@@ -357,6 +357,134 @@ rocksdb::Status Stream::DeletePelEntries(const Slice 
&stream_name, const std::st
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+rocksdb::Status Stream::ClaimPelEntries(const Slice &stream_name, const 
std::string &group_name,
+                                        const std::string &consumer_name, 
const uint64_t min_idle_time,
+                                        const std::vector<StreamEntryID> 
&entry_ids, const StreamClaimOptions &options,
+                                        StreamClaimResult *result) {
+  std::string ns_key = AppendNamespacePrefix(stream_name);
+  LockGuard guard(storage_->GetLockManager(), ns_key);
+  StreamMetadata metadata(false);
+  rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
+  if (!s.ok()) return s;
+
+  std::string group_key = internalKeyFromGroupName(ns_key, metadata, 
group_name);
+  std::string get_group_value;
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key, 
&get_group_value);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+  if (s.IsNotFound()) {
+    return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group " 
+ group_name + " for key name " +
+                                            stream_name.ToString());
+  }
+  StreamConsumerGroupMetadata group_metadata = 
decodeStreamConsumerGroupMetadataValue(get_group_value);
+
+  std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, 
group_name, consumer_name);
+  std::string get_consumer_value;
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, 
&get_consumer_value);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+  if (s.IsNotFound()) {
+    int created_number = 0;
+    s = createConsumerWithoutLock(stream_name, group_name, consumer_name, 
&created_number);
+    if (!s.ok()) {
+      return s;
+    }
+    group_metadata.consumer_number += created_number;
+  }
+  StreamConsumerMetadata consumer_metadata;
+  if (!s.IsNotFound()) {
+    consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
+  }
+  auto now = util::GetTimeStampMS();
+  consumer_metadata.last_idle = now;
+  consumer_metadata.last_active = now;
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisStream);
+  batch->PutLogData(log_data.Encode());
+
+  for (const auto &id : entry_ids) {
+    std::string raw_value;
+    rocksdb::Status s = getEntryRawValue(ns_key, metadata, id, &raw_value);
+    if (!s.ok() && !s.IsNotFound()) {
+      return s;
+    }
+    if (s.IsNotFound()) continue;
+
+    std::string entry_key = internalPelKeyFromGroupAndEntryId(ns_key, 
metadata, group_name, id);
+    std::string value;
+    s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, 
&value);
+    StreamPelEntry pel_entry;
+
+    if (!s.ok() && s.IsNotFound() && options.force) {
+      pel_entry = {0, 0, ""};
+      group_metadata.pending_number += 1;
+    }
+
+    if (s.ok()) {
+      pel_entry = decodeStreamPelEntryValue(value);
+    }
+
+    if (s.ok() || (!s.ok() && s.IsNotFound() && options.force)) {
+      if (now - pel_entry.last_delivery_time < min_idle_time) continue;
+
+      std::vector<std::string> values;
+      if (options.just_id) {
+        result->ids.emplace_back(id.ToString());
+      } else {
+        auto rv = DecodeRawStreamEntryValue(raw_value, &values);
+        if (!rv.IsOK()) {
+          return rocksdb::Status::InvalidArgument(rv.Msg());
+        }
+        result->entries.emplace_back(id.ToString(), std::move(values));
+      }
+
+      if (pel_entry.consumer_name != "") {
+        std::string original_consumer_key =
+            internalKeyFromConsumerName(ns_key, metadata, group_name, 
pel_entry.consumer_name);
+        std::string get_original_consumer_value;
+        s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, 
original_consumer_key,
+                          &get_original_consumer_value);
+        if (!s.ok()) {
+          return s;
+        }
+        StreamConsumerMetadata original_consumer_metadata =
+            decodeStreamConsumerMetadataValue(get_original_consumer_value);
+        original_consumer_metadata.pending_number -= 1;
+        batch->Put(stream_cf_handle_, original_consumer_key,
+                   
encodeStreamConsumerMetadataValue(original_consumer_metadata));
+      }
+
+      pel_entry.consumer_name = consumer_name;
+      consumer_metadata.pending_number += 1;
+      if (options.with_time) {
+        pel_entry.last_delivery_time = options.last_delivery_time;
+      } else {
+        pel_entry.last_delivery_time = now - options.idle_time;
+      }
+
+      if (options.with_retry_count) {
+        pel_entry.last_delivery_count = options.last_delivery_count;
+      } else if (!options.just_id) {
+        pel_entry.last_delivery_count += 1;
+      }
+
+      std::string pel_value = encodeStreamPelEntryValue(pel_entry);
+      batch->Put(stream_cf_handle_, entry_key, pel_value);
+    }
+  }
+
+  if (options.with_last_id) {
+    group_metadata.last_delivered_id = options.last_delivered_id;

Review Comment:
   Only when options.last_deliverd_id > group_metadata.last_delivered_id can we 
update it.



##########
src/commands/cmd_stream.cc:
##########
@@ -242,6 +242,100 @@ class CommandXDel : public Commander {
   std::vector<redis::StreamEntryID> ids_;
 };
 
+class CommandXClaim : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    if (args.size() < 6) {
+      return {Status::RedisParseErr, errWrongNumOfArguments};
+    }
+
+    CommandParser parser(args, 1);
+    stream_name_ = GET_OR_RET(parser.TakeStr());
+    group_name_ = GET_OR_RET(parser.TakeStr());
+    consumer_name_ = GET_OR_RET(parser.TakeStr());
+    min_idle_time_ = GET_OR_RET(parser.TakeInt<uint64_t>());

Review Comment:
   Here should be int64_t, and if the min_idle_time < 0, we need to reset it to 
0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to