Yangsx-1 commented on code in PR #1704:
URL: https://github.com/apache/kvrocks/pull/1704#discussion_r1310535934


##########
src/types/redis_stream.cc:
##########
@@ -160,6 +163,142 @@ rocksdb::Status Stream::Add(const Slice &stream_name, 
const StreamAddOptions &op
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+std::string Stream::internalKeyFromGroupName(const std::string &ns_key, const 
StreamMetadata &metadata,
+                                             const std::string &group_name) 
const {
+  std::string sub_key;
+  PutFixed64(&sub_key, group_name.size());
+  sub_key += group_name;
+  sub_key += "METADATA";
+  std::string entry_key = InternalKey(ns_key, sub_key, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+  return entry_key;
+}
+
+std::string Stream::groupNameFromInternalKey(const rocksdb::Slice &key) const {
+  InternalKey ikey(key, storage_->IsSlotIdEncoded());
+  Slice group_name_metadata = ikey.GetSubKey();
+  uint64_t len = 0;
+  GetFixed64(&group_name_metadata, &len);
+  std::string group_name = group_name_metadata.ToString().substr(0, len);
+  return group_name;
+}
+
+std::string Stream::encodeStreamConsumerGroupMetadataValue(const 
StreamConsumerGroupMetadata &consumer_group_metadata) {
+  std::string dst;
+  PutFixed64(&dst, consumer_group_metadata.consumer_number);
+  PutFixed64(&dst, consumer_group_metadata.pending_number);
+  PutFixed64(&dst, consumer_group_metadata.last_delivered_id.ms);
+  PutFixed64(&dst, consumer_group_metadata.last_delivered_id.seq);
+  PutFixed64(&dst, 
static_cast<uint64_t>(consumer_group_metadata.entries_read));
+  PutFixed64(&dst, consumer_group_metadata.lag);
+  return dst;
+}
+
+StreamConsumerGroupMetadata 
Stream::decodeStreamConsumerGroupMetadataValue(const std::string &value) {
+  StreamConsumerGroupMetadata consumer_group_metadata;
+  rocksdb::Slice input(value);
+  GetFixed64(&input, &consumer_group_metadata.consumer_number);
+  GetFixed64(&input, &consumer_group_metadata.pending_number);
+  GetFixed64(&input, &consumer_group_metadata.last_delivered_id.ms);
+  GetFixed64(&input, &consumer_group_metadata.last_delivered_id.seq);
+  uint64_t entries_read = 0;
+  GetFixed64(&input, &entries_read);
+  consumer_group_metadata.entries_read = static_cast<int64_t>(entries_read);
+  GetFixed64(&input, &consumer_group_metadata.lag);
+  return consumer_group_metadata;
+}
+
+rocksdb::Status Stream::CreateGroup(const Slice &stream_name, const 
StreamXGroupCreateOptions &options,
+                                    const std::string &group_name) {
+  if (std::isdigit(group_name[0])) {
+    return rocksdb::Status::InvalidArgument("group name cannot start with 
number");
+  }
+  std::string ns_key = AppendNamespacePrefix(stream_name);
+
+  LockGuard guard(storage_->GetLockManager(), ns_key);
+  StreamMetadata metadata;
+  rocksdb::Status s = GetMetadata(ns_key, &metadata);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+
+  if (s.IsNotFound() && !options.mkstream) {
+    return 
rocksdb::Status::InvalidArgument(errXGroupSubcommandRequiresKeyExist);
+  }
+
+  StreamConsumerGroupMetadata consumer_group_metadata;
+  if (options.last_id == "$") {
+    consumer_group_metadata.last_delivered_id = metadata.last_entry_id;
+  } else {
+    auto s = ParseStreamEntryID(options.last_id, 
&consumer_group_metadata.last_delivered_id);
+    if (!s.IsOK()) {
+      return rocksdb::Status::InvalidArgument(s.Msg());
+    }
+  }
+  consumer_group_metadata.entries_read = options.entries_read;
+  std::string entry_key = internalKeyFromGroupName(ns_key, metadata, 
group_name);
+  std::string entry_value = 
encodeStreamConsumerGroupMetadataValue(consumer_group_metadata);
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisStream);
+  batch->PutLogData(log_data.Encode());
+  batch->Put(stream_cf_handle_, entry_key, entry_value);
+  metadata.group_number += 1;
+  std::string metadata_bytes;
+  metadata.Encode(&metadata_bytes);
+  batch->Put(metadata_cf_handle_, ns_key, metadata_bytes);
+  return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+}
+
+rocksdb::Status Stream::DestroyGroup(const Slice &stream_name, const 
std::string &group_name, uint64_t *delete_cnt) {
+  *delete_cnt = 0;
+  std::string ns_key = AppendNamespacePrefix(stream_name);
+
+  LockGuard guard(storage_->GetLockManager(), ns_key);
+  StreamMetadata metadata;
+  rocksdb::Status s = GetMetadata(ns_key, &metadata);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+
+  if (s.IsNotFound()) {
+    return 
rocksdb::Status::InvalidArgument(errXGroupSubcommandRequiresKeyExist);
+  }
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisStream);
+  batch->PutLogData(log_data.Encode());
+
+  std::string sub_key_prefix;
+  PutFixed64(&sub_key_prefix, group_name.size());
+  sub_key_prefix += group_name;
+  std::string next_version_prefix_key =
+      InternalKey(ns_key, sub_key_prefix, metadata.version + 1, 
storage_->IsSlotIdEncoded()).Encode();
+  std::string prefix_key = InternalKey(ns_key, sub_key_prefix, 
metadata.version, storage_->IsSlotIdEncoded()).Encode();
+
+  rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
+  LatestSnapShot ss(storage_);
+  read_options.snapshot = ss.GetSnapShot();
+  rocksdb::Slice upper_bound(next_version_prefix_key);
+  read_options.iterate_upper_bound = &upper_bound;
+  rocksdb::Slice lower_bound(prefix_key);
+  read_options.iterate_lower_bound = &lower_bound;
+
+  auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);

Review Comment:
   @torwig  In L271-L284, i used the subkey prefix to scan the snapshot. The 
prefix include the consumer groups name size and its name, so the scan result 
only includes the subkey with the prefix and there is no chance to delete 
something doesn't belong to the group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to