Yangsx-1 commented on code in PR #1704:
URL: https://github.com/apache/kvrocks/pull/1704#discussion_r1310521010
##########
src/types/redis_stream.cc:
##########
@@ -160,6 +163,142 @@ rocksdb::Status Stream::Add(const Slice &stream_name,
const StreamAddOptions &op
return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
+std::string Stream::internalKeyFromGroupName(const std::string &ns_key, const
StreamMetadata &metadata,
+ const std::string &group_name)
const {
+ std::string sub_key;
+ PutFixed64(&sub_key, group_name.size());
+ sub_key += group_name;
+ sub_key += "METADATA";
+ std::string entry_key = InternalKey(ns_key, sub_key, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+ return entry_key;
+}
+
+std::string Stream::groupNameFromInternalKey(const rocksdb::Slice &key) const {
+ InternalKey ikey(key, storage_->IsSlotIdEncoded());
+ Slice group_name_metadata = ikey.GetSubKey();
+ uint64_t len = 0;
+ GetFixed64(&group_name_metadata, &len);
+ std::string group_name = group_name_metadata.ToString().substr(0, len);
+ return group_name;
+}
+
+std::string Stream::encodeStreamConsumerGroupMetadataValue(const
StreamConsumerGroupMetadata &consumer_group_metadata) {
+ std::string dst;
+ PutFixed64(&dst, consumer_group_metadata.consumer_number);
+ PutFixed64(&dst, consumer_group_metadata.pending_number);
+ PutFixed64(&dst, consumer_group_metadata.last_delivered_id.ms);
+ PutFixed64(&dst, consumer_group_metadata.last_delivered_id.seq);
+ PutFixed64(&dst,
static_cast<uint64_t>(consumer_group_metadata.entries_read));
+ PutFixed64(&dst, consumer_group_metadata.lag);
+ return dst;
+}
+
+StreamConsumerGroupMetadata
Stream::decodeStreamConsumerGroupMetadataValue(const std::string &value) {
+ StreamConsumerGroupMetadata consumer_group_metadata;
+ rocksdb::Slice input(value);
+ GetFixed64(&input, &consumer_group_metadata.consumer_number);
+ GetFixed64(&input, &consumer_group_metadata.pending_number);
+ GetFixed64(&input, &consumer_group_metadata.last_delivered_id.ms);
+ GetFixed64(&input, &consumer_group_metadata.last_delivered_id.seq);
+ uint64_t entries_read = 0;
+ GetFixed64(&input, &entries_read);
+ consumer_group_metadata.entries_read = static_cast<int64_t>(entries_read);
+ GetFixed64(&input, &consumer_group_metadata.lag);
+ return consumer_group_metadata;
+}
+
+rocksdb::Status Stream::CreateGroup(const Slice &stream_name, const
StreamXGroupCreateOptions &options,
+ const std::string &group_name) {
+ if (std::isdigit(group_name[0])) {
+ return rocksdb::Status::InvalidArgument("group name cannot start with
number");
+ }
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+
+ LockGuard guard(storage_->GetLockManager(), ns_key);
+ StreamMetadata metadata;
+ rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+
+ if (s.IsNotFound() && !options.mkstream) {
+ return
rocksdb::Status::InvalidArgument(errXGroupSubcommandRequiresKeyExist);
+ }
+
+ StreamConsumerGroupMetadata consumer_group_metadata;
+ if (options.last_id == "$") {
+ consumer_group_metadata.last_delivered_id = metadata.last_entry_id;
+ } else {
+ auto s = ParseStreamEntryID(options.last_id,
&consumer_group_metadata.last_delivered_id);
+ if (!s.IsOK()) {
+ return rocksdb::Status::InvalidArgument(s.Msg());
+ }
+ }
+ consumer_group_metadata.entries_read = options.entries_read;
+ std::string entry_key = internalKeyFromGroupName(ns_key, metadata,
group_name);
+ std::string entry_value =
encodeStreamConsumerGroupMetadataValue(consumer_group_metadata);
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisStream);
+ batch->PutLogData(log_data.Encode());
+ batch->Put(stream_cf_handle_, entry_key, entry_value);
+ metadata.group_number += 1;
+ std::string metadata_bytes;
+ metadata.Encode(&metadata_bytes);
+ batch->Put(metadata_cf_handle_, ns_key, metadata_bytes);
+ return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+}
+
+rocksdb::Status Stream::DestroyGroup(const Slice &stream_name, const
std::string &group_name, uint64_t *delete_cnt) {
+ *delete_cnt = 0;
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+
+ LockGuard guard(storage_->GetLockManager(), ns_key);
+ StreamMetadata metadata;
+ rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+
+ if (s.IsNotFound()) {
+ return
rocksdb::Status::InvalidArgument(errXGroupSubcommandRequiresKeyExist);
+ }
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisStream);
+ batch->PutLogData(log_data.Encode());
+
+ std::string sub_key_prefix;
+ PutFixed64(&sub_key_prefix, group_name.size());
+ sub_key_prefix += group_name;
+ std::string next_version_prefix_key =
+ InternalKey(ns_key, sub_key_prefix, metadata.version + 1,
storage_->IsSlotIdEncoded()).Encode();
+ std::string prefix_key = InternalKey(ns_key, sub_key_prefix,
metadata.version, storage_->IsSlotIdEncoded()).Encode();
+
+ rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
+ LatestSnapShot ss(storage_);
+ read_options.snapshot = ss.GetSnapShot();
+ rocksdb::Slice upper_bound(next_version_prefix_key);
+ read_options.iterate_upper_bound = &upper_bound;
+ rocksdb::Slice lower_bound(prefix_key);
+ read_options.iterate_lower_bound = &lower_bound;
+
+ auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
Review Comment:
I think it's ok, because there are something deleted if `delete_cnt != 0`.
When `delete_cnt == 1`, the deleted thing can only be consumer group metadata
in the design. So the `-= 1` in L293 only means a consumer group are deleted
and the counter in stream metadata changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]