Yangsx-1 commented on code in PR #1704:
URL: https://github.com/apache/kvrocks/pull/1704#discussion_r1310535934
##########
src/types/redis_stream.cc:
##########
@@ -160,6 +163,142 @@ rocksdb::Status Stream::Add(const Slice &stream_name,
const StreamAddOptions &op
return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
+std::string Stream::internalKeyFromGroupName(const std::string &ns_key, const
StreamMetadata &metadata,
+ const std::string &group_name)
const {
+ std::string sub_key;
+ PutFixed64(&sub_key, group_name.size());
+ sub_key += group_name;
+ sub_key += "METADATA";
+ std::string entry_key = InternalKey(ns_key, sub_key, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+ return entry_key;
+}
+
+std::string Stream::groupNameFromInternalKey(const rocksdb::Slice &key) const {
+ InternalKey ikey(key, storage_->IsSlotIdEncoded());
+ Slice group_name_metadata = ikey.GetSubKey();
+ uint64_t len = 0;
+ GetFixed64(&group_name_metadata, &len);
+ std::string group_name = group_name_metadata.ToString().substr(0, len);
+ return group_name;
+}
+
+std::string Stream::encodeStreamConsumerGroupMetadataValue(const
StreamConsumerGroupMetadata &consumer_group_metadata) {
+ std::string dst;
+ PutFixed64(&dst, consumer_group_metadata.consumer_number);
+ PutFixed64(&dst, consumer_group_metadata.pending_number);
+ PutFixed64(&dst, consumer_group_metadata.last_delivered_id.ms);
+ PutFixed64(&dst, consumer_group_metadata.last_delivered_id.seq);
+ PutFixed64(&dst,
static_cast<uint64_t>(consumer_group_metadata.entries_read));
+ PutFixed64(&dst, consumer_group_metadata.lag);
+ return dst;
+}
+
+StreamConsumerGroupMetadata
Stream::decodeStreamConsumerGroupMetadataValue(const std::string &value) {
+ StreamConsumerGroupMetadata consumer_group_metadata;
+ rocksdb::Slice input(value);
+ GetFixed64(&input, &consumer_group_metadata.consumer_number);
+ GetFixed64(&input, &consumer_group_metadata.pending_number);
+ GetFixed64(&input, &consumer_group_metadata.last_delivered_id.ms);
+ GetFixed64(&input, &consumer_group_metadata.last_delivered_id.seq);
+ uint64_t entries_read = 0;
+ GetFixed64(&input, &entries_read);
+ consumer_group_metadata.entries_read = static_cast<int64_t>(entries_read);
+ GetFixed64(&input, &consumer_group_metadata.lag);
+ return consumer_group_metadata;
+}
+
+rocksdb::Status Stream::CreateGroup(const Slice &stream_name, const
StreamXGroupCreateOptions &options,
+ const std::string &group_name) {
+ if (std::isdigit(group_name[0])) {
+ return rocksdb::Status::InvalidArgument("group name cannot start with
number");
+ }
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+
+ LockGuard guard(storage_->GetLockManager(), ns_key);
+ StreamMetadata metadata;
+ rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+
+ if (s.IsNotFound() && !options.mkstream) {
+ return
rocksdb::Status::InvalidArgument(errXGroupSubcommandRequiresKeyExist);
+ }
+
+ StreamConsumerGroupMetadata consumer_group_metadata;
+ if (options.last_id == "$") {
+ consumer_group_metadata.last_delivered_id = metadata.last_entry_id;
+ } else {
+ auto s = ParseStreamEntryID(options.last_id,
&consumer_group_metadata.last_delivered_id);
+ if (!s.IsOK()) {
+ return rocksdb::Status::InvalidArgument(s.Msg());
+ }
+ }
+ consumer_group_metadata.entries_read = options.entries_read;
+ std::string entry_key = internalKeyFromGroupName(ns_key, metadata,
group_name);
+ std::string entry_value =
encodeStreamConsumerGroupMetadataValue(consumer_group_metadata);
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisStream);
+ batch->PutLogData(log_data.Encode());
+ batch->Put(stream_cf_handle_, entry_key, entry_value);
+ metadata.group_number += 1;
+ std::string metadata_bytes;
+ metadata.Encode(&metadata_bytes);
+ batch->Put(metadata_cf_handle_, ns_key, metadata_bytes);
+ return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+}
+
+rocksdb::Status Stream::DestroyGroup(const Slice &stream_name, const
std::string &group_name, uint64_t *delete_cnt) {
+ *delete_cnt = 0;
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+
+ LockGuard guard(storage_->GetLockManager(), ns_key);
+ StreamMetadata metadata;
+ rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+
+ if (s.IsNotFound()) {
+ return
rocksdb::Status::InvalidArgument(errXGroupSubcommandRequiresKeyExist);
+ }
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisStream);
+ batch->PutLogData(log_data.Encode());
+
+ std::string sub_key_prefix;
+ PutFixed64(&sub_key_prefix, group_name.size());
+ sub_key_prefix += group_name;
+ std::string next_version_prefix_key =
+ InternalKey(ns_key, sub_key_prefix, metadata.version + 1,
storage_->IsSlotIdEncoded()).Encode();
+ std::string prefix_key = InternalKey(ns_key, sub_key_prefix,
metadata.version, storage_->IsSlotIdEncoded()).Encode();
+
+ rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
+ LatestSnapShot ss(storage_);
+ read_options.snapshot = ss.GetSnapShot();
+ rocksdb::Slice upper_bound(next_version_prefix_key);
+ read_options.iterate_upper_bound = &upper_bound;
+ rocksdb::Slice lower_bound(prefix_key);
+ read_options.iterate_lower_bound = &lower_bound;
+
+ auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
Review Comment:
@torwig In L271-L284, i used the subkey prefix to scan the snapshot. The
prefix include the consumer groups name size and its name, so the scan result
only includes the subkey with the prefix and there is no chance to delete
something doesn't belong to the group.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]