jihuayu commented on code in PR #2190:
URL: https://github.com/apache/kvrocks/pull/2190#discussion_r1545868533
##########
src/types/redis_stream.cc:
##########
@@ -1393,4 +1414,91 @@ rocksdb::Status Stream::SetId(const Slice &stream_name,
const StreamEntryID &las
return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
+rocksdb::Status Stream::GetPendingEntries(StreamPendingOptions &options,
StreamGetPendingEntryResult &pending_infos,
+
std::vector<StreamGetExtPendingEntryResult> &ext_results) {
+ const std::string &stream_name = options.stream_name;
+ const std::string &group_name = options.group_name;
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+
+ LockGuard guard(storage_->GetLockManager(), ns_key);
Review Comment:
Can we use the snapshot instead of the lock? @mapleFU
https://github.com/apache/kvrocks/pull/2174
##########
src/types/redis_stream.cc:
##########
@@ -23,11 +23,14 @@
#include <rocksdb/status.h>
#include <memory>
+#include <unordered_set>
Review Comment:
Do we need this included?
##########
src/types/redis_stream.cc:
##########
@@ -343,8 +346,26 @@ rocksdb::Status Stream::DeletePelEntries(const Slice
&stream_name, const std::st
std::string entry_key = internalPelKeyFromGroupAndEntryId(ns_key,
metadata, group_name, id);
std::string value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key,
&value);
+
if (s.ok()) {
*acknowledged += 1;
+ StreamPelEntry pel_entry = decodeStreamPelEntryValue(value);
+
+ const std::string &consumer_name = pel_entry.consumer_name;
+ std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata,
group_name, consumer_name);
+ std::string get_consumer_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_,
consumer_key, &get_consumer_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ return rocksdb::Status::OK();
+ }
+
+ StreamConsumerMetadata consumer_metadata =
decodeStreamConsumerMetadataValue(get_consumer_value);
+ consumer_metadata.pending_number -= 1;
Review Comment:
Has this patch about `add support of XPENDING`?
If not can you move the change to a new PR? @LiuYuHui
##########
src/types/redis_stream.h:
##########
@@ -66,6 +66,8 @@ class Stream : public SubKeyScanner {
rocksdb::Status GetLastGeneratedID(const Slice &stream_name, StreamEntryID
*id);
rocksdb::Status SetId(const Slice &stream_name, const StreamEntryID
&last_generated_id,
std::optional<uint64_t> entries_added,
std::optional<StreamEntryID> max_deleted_id);
+ rocksdb::Status GetPendingEntries(StreamPendingOptions &options,
StreamGetPendingEntryResult &pending_infos,
+
std::vector<StreamGetExtPendingEntryResult> &ext_results);
Review Comment:
`options` here can be const.
The signature of this function is different from what we are accustomed to.
1. We are accustomed to using pointers instead of references to represent
return values.
2. In general, we usually have only one return value.
3. We usually place the key name(here is stream_name) outside instead of
inside the options.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]