Yangsx-1 commented on code in PR #1704:
URL: https://github.com/apache/kvrocks/pull/1704#discussion_r1310566961


##########
src/types/redis_stream.cc:
##########
@@ -160,6 +163,142 @@ rocksdb::Status Stream::Add(const Slice &stream_name, 
const StreamAddOptions &op
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+std::string Stream::internalKeyFromGroupName(const std::string &ns_key, const 
StreamMetadata &metadata,
+                                             const std::string &group_name) 
const {
+  std::string sub_key;
+  PutFixed64(&sub_key, group_name.size());
+  sub_key += group_name;
+  sub_key += "METADATA";
+  std::string entry_key = InternalKey(ns_key, sub_key, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+  return entry_key;
+}
+
+std::string Stream::groupNameFromInternalKey(const rocksdb::Slice &key) const {
+  InternalKey ikey(key, storage_->IsSlotIdEncoded());
+  Slice group_name_metadata = ikey.GetSubKey();
+  uint64_t len = 0;
+  GetFixed64(&group_name_metadata, &len);
+  std::string group_name = group_name_metadata.ToString().substr(0, len);
+  return group_name;
+}
+
+std::string Stream::encodeStreamConsumerGroupMetadataValue(const 
StreamConsumerGroupMetadata &consumer_group_metadata) {
+  std::string dst;
+  PutFixed64(&dst, consumer_group_metadata.consumer_number);
+  PutFixed64(&dst, consumer_group_metadata.pending_number);
+  PutFixed64(&dst, consumer_group_metadata.last_delivered_id.ms);
+  PutFixed64(&dst, consumer_group_metadata.last_delivered_id.seq);
+  PutFixed64(&dst, 
static_cast<uint64_t>(consumer_group_metadata.entries_read));
+  PutFixed64(&dst, consumer_group_metadata.lag);
+  return dst;
+}
+
+StreamConsumerGroupMetadata 
Stream::decodeStreamConsumerGroupMetadataValue(const std::string &value) {
+  StreamConsumerGroupMetadata consumer_group_metadata;
+  rocksdb::Slice input(value);
+  GetFixed64(&input, &consumer_group_metadata.consumer_number);
+  GetFixed64(&input, &consumer_group_metadata.pending_number);
+  GetFixed64(&input, &consumer_group_metadata.last_delivered_id.ms);
+  GetFixed64(&input, &consumer_group_metadata.last_delivered_id.seq);
+  uint64_t entries_read = 0;
+  GetFixed64(&input, &entries_read);
+  consumer_group_metadata.entries_read = static_cast<int64_t>(entries_read);
+  GetFixed64(&input, &consumer_group_metadata.lag);
+  return consumer_group_metadata;
+}
+
+rocksdb::Status Stream::CreateGroup(const Slice &stream_name, const 
StreamXGroupCreateOptions &options,
+                                    const std::string &group_name) {
+  if (std::isdigit(group_name[0])) {
+    return rocksdb::Status::InvalidArgument("group name cannot start with 
number");
+  }
+  std::string ns_key = AppendNamespacePrefix(stream_name);
+
+  LockGuard guard(storage_->GetLockManager(), ns_key);
+  StreamMetadata metadata;
+  rocksdb::Status s = GetMetadata(ns_key, &metadata);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+
+  if (s.IsNotFound() && !options.mkstream) {
+    return 
rocksdb::Status::InvalidArgument(errXGroupSubcommandRequiresKeyExist);
+  }
+
+  StreamConsumerGroupMetadata consumer_group_metadata;
+  if (options.last_id == "$") {
+    consumer_group_metadata.last_delivered_id = metadata.last_entry_id;
+  } else {
+    auto s = ParseStreamEntryID(options.last_id, 
&consumer_group_metadata.last_delivered_id);
+    if (!s.IsOK()) {
+      return rocksdb::Status::InvalidArgument(s.Msg());
+    }
+  }
+  consumer_group_metadata.entries_read = options.entries_read;
+  std::string entry_key = internalKeyFromGroupName(ns_key, metadata, 
group_name);
+  std::string entry_value = 
encodeStreamConsumerGroupMetadataValue(consumer_group_metadata);
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisStream);
+  batch->PutLogData(log_data.Encode());
+  batch->Put(stream_cf_handle_, entry_key, entry_value);

Review Comment:
   Good catch! I think it's necessary and i'll add it later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to