This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new a2a3826a refactor(stream): change the encoding of stream consumer 
group (#2384)
a2a3826a is described below

commit a2a3826a3b78e36089de1cc862d4f88e99034a82
Author: Hauru <[email protected]>
AuthorDate: Wed Jul 17 12:23:16 2024 +0800

    refactor(stream): change the encoding of stream consumer group (#2384)
---
 src/commands/cmd_stream.cc                   |   8 +-
 src/types/redis_stream.cc                    | 222 +++++++++++++--------------
 src/types/redis_stream_base.cc               |  10 +-
 src/types/redis_stream_base.h                |   4 +-
 tests/cppunit/types/stream_test.cc           |  10 +-
 tests/gocase/unit/type/stream/stream_test.go |   6 +-
 6 files changed, 128 insertions(+), 132 deletions(-)

diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index f227aecf..ea448332 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -1107,7 +1107,7 @@ class CommandXRead : public Commander,
       redis::StreamRangeOptions options;
       options.reverse = false;
       options.start = ids_[i];
-      options.end = StreamEntryID{UINT64_MAX, UINT64_MAX};
+      options.end = StreamEntryID::Maximum();
       options.with_count = with_count_;
       options.count = count_;
       options.exclude_start = true;
@@ -1218,7 +1218,7 @@ class CommandXRead : public Commander,
       redis::StreamRangeOptions options;
       options.reverse = false;
       options.start = ids_[i];
-      options.end = StreamEntryID{UINT64_MAX, UINT64_MAX};
+      options.end = StreamEntryID::Maximum();
       options.with_count = with_count_;
       options.count = count_;
       options.exclude_start = true;
@@ -1405,7 +1405,7 @@ class CommandXReadGroup : public Commander,
       redis::StreamRangeOptions options;
       options.reverse = false;
       options.start = ids_[i];
-      options.end = StreamEntryID{UINT64_MAX, UINT64_MAX};
+      options.end = StreamEntryID::Maximum();
       options.with_count = with_count_;
       options.count = count_;
       options.exclude_start = true;
@@ -1510,7 +1510,7 @@ class CommandXReadGroup : public Commander,
       redis::StreamRangeOptions options;
       options.reverse = false;
       options.start = ids_[i];
-      options.end = StreamEntryID{UINT64_MAX, UINT64_MAX};
+      options.end = StreamEntryID::Maximum();
       options.with_count = with_count_;
       options.count = count_;
       options.exclude_start = true;
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index a3f6c330..7d12c46e 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -31,7 +31,6 @@
 
 namespace redis {
 
-std::string_view consumerGroupMetadataDelimiter = "METADATA";
 const char *errSetEntryIdSmallerThanLastGenerated =
     "The ID specified in XSETID is smaller than the target stream top item";
 const char *errEntriesAddedSmallerThanStreamSize =
@@ -169,9 +168,10 @@ rocksdb::Status Stream::Add(const Slice &stream_name, 
const StreamAddOptions &op
 std::string Stream::internalKeyFromGroupName(const std::string &ns_key, const 
StreamMetadata &metadata,
                                              const std::string &group_name) 
const {
   std::string sub_key;
+  PutFixed64(&sub_key, UINT64_MAX);
+  PutFixed8(&sub_key, (uint8_t)StreamSubkeyType::StreamConsumerGroupMetadata);
   PutFixed64(&sub_key, group_name.size());
   sub_key += group_name;
-  sub_key += consumerGroupMetadataDelimiter;
   std::string entry_key = InternalKey(ns_key, sub_key, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
   return entry_key;
 }
@@ -179,6 +179,10 @@ std::string Stream::internalKeyFromGroupName(const 
std::string &ns_key, const St
 std::string Stream::groupNameFromInternalKey(rocksdb::Slice key) const {
   InternalKey ikey(key, storage_->IsSlotIdEncoded());
   Slice group_name_metadata = ikey.GetSubKey();
+  uint64_t entry_delimiter = 0;
+  GetFixed64(&group_name_metadata, &entry_delimiter);
+  uint8_t type_delimiter = 0;
+  GetFixed8(&group_name_metadata, &type_delimiter);
   uint64_t len = 0;
   GetFixed64(&group_name_metadata, &len);
   std::string group_name;
@@ -214,11 +218,12 @@ StreamConsumerGroupMetadata 
Stream::decodeStreamConsumerGroupMetadataValue(const
 std::string Stream::internalKeyFromConsumerName(const std::string &ns_key, 
const StreamMetadata &metadata,
                                                 const std::string &group_name, 
const std::string &consumer_name) const {
   std::string sub_key;
+  PutFixed64(&sub_key, UINT64_MAX);
+  PutFixed8(&sub_key, (uint8_t)StreamSubkeyType::StreamConsumerMetadata);
   PutFixed64(&sub_key, group_name.size());
   sub_key += group_name;
   PutFixed64(&sub_key, consumer_name.size());
   sub_key += consumer_name;
-  sub_key += consumerGroupMetadataDelimiter;
   std::string entry_key = InternalKey(ns_key, sub_key, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
   return entry_key;
 }
@@ -226,6 +231,10 @@ std::string Stream::internalKeyFromConsumerName(const 
std::string &ns_key, const
 std::string Stream::consumerNameFromInternalKey(rocksdb::Slice key) const {
   InternalKey ikey(key, storage_->IsSlotIdEncoded());
   Slice subkey = ikey.GetSubKey();
+  uint64_t entry_delimiter = 0;
+  GetFixed64(&subkey, &entry_delimiter);
+  uint8_t type_delimiter = 0;
+  GetFixed8(&subkey, &type_delimiter);
   uint64_t group_name_len = 0;
   GetFixed64(&subkey, &group_name_len);
   subkey.remove_prefix(group_name_len);
@@ -254,6 +263,8 @@ StreamConsumerMetadata 
Stream::decodeStreamConsumerMetadataValue(const std::stri
 std::string Stream::internalPelKeyFromGroupAndEntryId(const std::string 
&ns_key, const StreamMetadata &metadata,
                                                       const std::string 
&group_name, const StreamEntryID &id) {
   std::string sub_key;
+  PutFixed64(&sub_key, UINT64_MAX);
+  PutFixed8(&sub_key, (uint8_t)StreamSubkeyType::StreamPelEntry);
   PutFixed64(&sub_key, group_name.size());
   sub_key += group_name;
   PutFixed64(&sub_key, id.ms);
@@ -265,6 +276,10 @@ std::string 
Stream::internalPelKeyFromGroupAndEntryId(const std::string &ns_key,
 StreamEntryID Stream::groupAndEntryIdFromPelInternalKey(rocksdb::Slice key, 
std::string &group_name) {
   InternalKey ikey(key, storage_->IsSlotIdEncoded());
   Slice subkey = ikey.GetSubKey();
+  uint64_t entry_delimiter = 0;
+  GetFixed64(&subkey, &entry_delimiter);
+  uint8_t type_delimiter = 0;
+  GetFixed8(&subkey, &type_delimiter);
   uint64_t group_name_len = 0;
   GetFixed64(&subkey, &group_name_len);
   group_name = subkey.ToString().substr(0, group_name_len);
@@ -298,21 +313,14 @@ StreamPelEntry Stream::decodeStreamPelEntryValue(const 
std::string &value) {
 StreamSubkeyType Stream::identifySubkeyType(const rocksdb::Slice &key) const {
   InternalKey ikey(key, storage_->IsSlotIdEncoded());
   Slice subkey = ikey.GetSubKey();
-  const size_t entry_id_size = sizeof(StreamEntryID);
-  if (subkey.size() <= entry_id_size) {
+  uint64_t entry_delimiter = 0;
+  GetFixed64(&subkey, &entry_delimiter);
+  if (entry_delimiter != UINT64_MAX) {
     return StreamSubkeyType::StreamEntry;
   }
-  uint64_t group_name_len = 0;
-  GetFixed64(&subkey, &group_name_len);
-  std::string without_group_name = subkey.ToString().substr(group_name_len);
-  const size_t metadata_delimiter_size = consumerGroupMetadataDelimiter.size();
-  if (without_group_name.size() <= metadata_delimiter_size) {
-    return StreamSubkeyType::StreamConsumerGroupMetadata;
-  }
-  if (without_group_name.size() <= entry_id_size) {
-    return StreamSubkeyType::StreamPelEntry;
-  }
-  return StreamSubkeyType::StreamConsumerMetadata;
+  uint8_t type_delimiter = 0;
+  GetFixed8(&subkey, &type_delimiter);
+  return (StreamSubkeyType)type_delimiter;
 }
 
 rocksdb::Status Stream::DeletePelEntries(const Slice &stream_name, const 
std::string &group_name,
@@ -565,59 +573,54 @@ rocksdb::Status Stream::AutoClaim(const Slice 
&stream_name, const std::string &g
   auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
   uint64_t total_claimed_count = 0;
   for (iter->SeekToFirst(); iter->Valid() && count > 0 && attempts > 0; 
iter->Next()) {
-    if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) {
-      std::string tmp_group_name;
-      StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), 
tmp_group_name);
-      if (tmp_group_name != group_name) {
-        continue;
-      }
+    std::string tmp_group_name;
+    StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), 
tmp_group_name);
 
-      if (options.exclude_start && entry_id == options.start_id) {
-        continue;
-      }
+    if (options.exclude_start && entry_id == options.start_id) {
+      continue;
+    }
 
-      attempts--;
+    attempts--;
 
-      StreamPelEntry penl_entry = 
decodeStreamPelEntryValue(iter->value().ToString());
-      if ((now_ms - penl_entry.last_delivery_time_ms) < 
options.min_idle_time_ms) {
+    StreamPelEntry penl_entry = 
decodeStreamPelEntryValue(iter->value().ToString());
+    if ((now_ms - penl_entry.last_delivery_time_ms) < 
options.min_idle_time_ms) {
+      continue;
+    }
+
+    auto entry_key = internalKeyFromEntryID(ns_key, metadata, entry_id);
+    std::string entry_value;
+    s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, 
&entry_value);
+    if (!s.ok()) {
+      if (s.IsNotFound()) {
+        deleted_entries.push_back(entry_id);
+        batch->Delete(stream_cf_handle_, iter->key());
+        --count;
         continue;
       }
+      return s;
+    }
 
-      auto entry_key = internalKeyFromEntryID(ns_key, metadata, entry_id);
-      std::string entry_value;
-      s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, 
&entry_value);
-      if (!s.ok()) {
-        if (s.IsNotFound()) {
-          deleted_entries.push_back(entry_id);
-          batch->Delete(stream_cf_handle_, iter->key());
-          --count;
-          continue;
-        }
-        return s;
+    StreamEntry entry(entry_id.ToString(), {});
+    if (!options.just_id) {
+      auto rv_status = DecodeRawStreamEntryValue(entry_value, &entry.values);
+      if (!rv_status.OK()) {
+        return rocksdb::Status::InvalidArgument(rv_status.Msg());
       }
+    }
 
-      StreamEntry entry(entry_id.ToString(), {});
-      if (!options.just_id) {
-        auto rv_status = DecodeRawStreamEntryValue(entry_value, &entry.values);
-        if (!rv_status.OK()) {
-          return rocksdb::Status::InvalidArgument(rv_status.Msg());
-        }
-      }
+    pending_entries.emplace_back(std::move(entry));
+    --count;
 
-      pending_entries.emplace_back(std::move(entry));
-      --count;
-
-      if (penl_entry.consumer_name != consumer_name) {
-        ++total_claimed_count;
-        claimed_consumer_entity_count[penl_entry.consumer_name] += 1;
-        penl_entry.consumer_name = consumer_name;
-        penl_entry.last_delivery_time_ms = now_ms;
-        // Increment the delivery attempts counter unless JUSTID option 
provided
-        if (!options.just_id) {
-          penl_entry.last_delivery_count += 1;
-        }
-        batch->Put(stream_cf_handle_, iter->key(), 
encodeStreamPelEntryValue(penl_entry));
+    if (penl_entry.consumer_name != consumer_name) {
+      ++total_claimed_count;
+      claimed_consumer_entity_count[penl_entry.consumer_name] += 1;
+      penl_entry.consumer_name = consumer_name;
+      penl_entry.last_delivery_time_ms = now_ms;
+      // Increment the delivery attempts counter unless JUSTID option provided
+      if (!options.just_id) {
+        penl_entry.last_delivery_count += 1;
       }
+      batch->Put(stream_cf_handle_, iter->key(), 
encodeStreamPelEntryValue(penl_entry));
     }
   }
 
@@ -642,10 +645,8 @@ rocksdb::Status Stream::AutoClaim(const Slice 
&stream_name, const std::string &g
 
   bool has_next_entry = false;
   for (; iter->Valid(); iter->Next()) {
-    if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) {
-      has_next_entry = true;
-      break;
-    }
+    has_next_entry = true;
+    break;
   }
 
   if (auto s = iter->status(); !s.ok()) {
@@ -875,14 +876,9 @@ rocksdb::Status Stream::DestroyConsumer(const Slice 
&stream_name, const std::str
 
   auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
-    if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) {
-      std::string tmp_group_name;
-      FMT_MAYBE_UNUSED StreamEntryID entry_id = 
groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
-      if (tmp_group_name != group_name) continue;
-      StreamPelEntry pel_entry = 
decodeStreamPelEntryValue(iter->value().ToString());
-      if (pel_entry.consumer_name == consumer_name) {
-        batch->Delete(stream_cf_handle_, iter->key());
-      }
+    StreamPelEntry pel_entry = 
decodeStreamPelEntryValue(iter->value().ToString());
+    if (pel_entry.consumer_name == consumer_name) {
+      batch->Delete(stream_cf_handle_, iter->key());
     }
   }
 
@@ -1256,7 +1252,7 @@ rocksdb::Status Stream::GetStreamInfo(const 
rocksdb::Slice &stream_name, bool fu
 }
 
 static bool StreamRangeHasTombstones(const StreamMetadata &metadata, 
StreamEntryID start_id) {
-  StreamEntryID end_id = StreamEntryID{UINT64_MAX, UINT64_MAX};
+  StreamEntryID end_id = StreamEntryID::Maximum();
   if (metadata.size == 0 || metadata.max_deleted_entry_id == StreamEntryID{0, 
0}) {
     return false;
   }
@@ -1316,9 +1312,12 @@ rocksdb::Status Stream::GetGroupInfo(const Slice 
&stream_name,
   rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
   if (!s.ok()) return s;
 
+  std::string subkey_type_delimiter = std::to_string(UINT64_MAX);
+  PutFixed8(&subkey_type_delimiter, 
(uint8_t)StreamSubkeyType::StreamConsumerGroupMetadata);
   std::string next_version_prefix_key =
-      InternalKey(ns_key, "", metadata.version + 1, 
storage_->IsSlotIdEncoded()).Encode();
-  std::string prefix_key = InternalKey(ns_key, "", metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+      InternalKey(ns_key, subkey_type_delimiter, metadata.version + 1, 
storage_->IsSlotIdEncoded()).Encode();
+  std::string prefix_key =
+      InternalKey(ns_key, subkey_type_delimiter, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
 
   rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
   LatestSnapShot ss(storage_);
@@ -1330,13 +1329,11 @@ rocksdb::Status Stream::GetGroupInfo(const Slice 
&stream_name,
 
   auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
-    if (identifySubkeyType(iter->key()) == 
StreamSubkeyType::StreamConsumerGroupMetadata) {
-      std::string group_name = groupNameFromInternalKey(iter->key());
-      StreamConsumerGroupMetadata cg_metadata = 
decodeStreamConsumerGroupMetadataValue(iter->value().ToString());
-      CheckLagValid(metadata, cg_metadata);
-      std::pair<std::string, StreamConsumerGroupMetadata> tmp_item(group_name, 
cg_metadata);
-      group_metadata.push_back(tmp_item);
-    }
+    std::string group_name = groupNameFromInternalKey(iter->key());
+    StreamConsumerGroupMetadata cg_metadata = 
decodeStreamConsumerGroupMetadataValue(iter->value().ToString());
+    CheckLagValid(metadata, cg_metadata);
+    std::pair<std::string, StreamConsumerGroupMetadata> tmp_item(group_name, 
cg_metadata);
+    group_metadata.push_back(tmp_item);
   }
   return iter->status();
 }
@@ -1349,9 +1346,15 @@ rocksdb::Status Stream::GetConsumerInfo(
   rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
   if (!s.ok()) return s;
 
+  std::string subkey_type_delimiter;
+  PutFixed64(&subkey_type_delimiter, UINT64_MAX);
+  PutFixed8(&subkey_type_delimiter, 
(uint8_t)StreamSubkeyType::StreamConsumerMetadata);
+  PutFixed64(&subkey_type_delimiter, group_name.size());
+  subkey_type_delimiter += group_name;
   std::string next_version_prefix_key =
-      InternalKey(ns_key, "", metadata.version + 1, 
storage_->IsSlotIdEncoded()).Encode();
-  std::string prefix_key = InternalKey(ns_key, "", metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+      InternalKey(ns_key, subkey_type_delimiter, metadata.version + 1, 
storage_->IsSlotIdEncoded()).Encode();
+  std::string prefix_key =
+      InternalKey(ns_key, subkey_type_delimiter, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
 
   rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
   LatestSnapShot ss(storage_);
@@ -1363,14 +1366,10 @@ rocksdb::Status Stream::GetConsumerInfo(
 
   auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
-    if (identifySubkeyType(iter->key()) == 
StreamSubkeyType::StreamConsumerMetadata) {
-      std::string cur_group_name = groupNameFromInternalKey(iter->key());
-      if (cur_group_name != group_name) continue;
-      std::string consumer_name = consumerNameFromInternalKey(iter->key());
-      StreamConsumerMetadata c_metadata = 
decodeStreamConsumerMetadataValue(iter->value().ToString());
-      std::pair<std::string, StreamConsumerMetadata> tmp_item(consumer_name, 
c_metadata);
-      consumer_metadata.push_back(tmp_item);
-    }
+    std::string consumer_name = consumerNameFromInternalKey(iter->key());
+    StreamConsumerMetadata c_metadata = 
decodeStreamConsumerMetadataValue(iter->value().ToString());
+    std::pair<std::string, StreamConsumerMetadata> tmp_item(consumer_name, 
c_metadata);
+    consumer_metadata.push_back(tmp_item);
   }
   return iter->status();
 }
@@ -1506,29 +1505,26 @@ rocksdb::Status Stream::RangeWithPending(const Slice 
&stream_name, StreamRangeOp
     auto iter = util::UniqueIterator(storage_, read_options, 
stream_cf_handle_);
     uint64_t count = 0;
     for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
-      if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) 
{
-        std::string tmp_group_name;
-        StreamEntryID entry_id = 
groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
-        if (tmp_group_name != group_name) continue;
-        StreamPelEntry pel_entry = 
decodeStreamPelEntryValue(iter->value().ToString());
-        if (pel_entry.consumer_name != consumer_name) continue;
-        std::string raw_value;
-        rocksdb::Status st = getEntryRawValue(ns_key, metadata, entry_id, 
&raw_value);
-        if (!st.ok() && !st.IsNotFound()) {
-          return st;
-        }
-        std::vector<std::string> values;
-        auto rv = DecodeRawStreamEntryValue(raw_value, &values);
-        if (!rv.IsOK()) {
-          return rocksdb::Status::InvalidArgument(rv.Msg());
-        }
-        entries->emplace_back(entry_id.ToString(), std::move(values));
-        pel_entry.last_delivery_count += 1;
-        pel_entry.last_delivery_time_ms = now_ms;
-        batch->Put(stream_cf_handle_, iter->key(), 
encodeStreamPelEntryValue(pel_entry));
-        ++count;
-        if (count >= options.count) break;
+      std::string tmp_group_name;
+      StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), 
tmp_group_name);
+      StreamPelEntry pel_entry = 
decodeStreamPelEntryValue(iter->value().ToString());
+      if (pel_entry.consumer_name != consumer_name) continue;
+      std::string raw_value;
+      rocksdb::Status st = getEntryRawValue(ns_key, metadata, entry_id, 
&raw_value);
+      if (!st.ok() && !st.IsNotFound()) {
+        return st;
+      }
+      std::vector<std::string> values;
+      auto rv = DecodeRawStreamEntryValue(raw_value, &values);
+      if (!rv.IsOK()) {
+        return rocksdb::Status::InvalidArgument(rv.Msg());
       }
+      entries->emplace_back(entry_id.ToString(), std::move(values));
+      pel_entry.last_delivery_count += 1;
+      pel_entry.last_delivery_time_ms = now_ms;
+      batch->Put(stream_cf_handle_, iter->key(), 
encodeStreamPelEntryValue(pel_entry));
+      ++count;
+      if (count >= options.count) break;
     }
 
     if (auto s = iter->status(); !s.ok()) {
diff --git a/src/types/redis_stream_base.cc b/src/types/redis_stream_base.cc
index 87141307..1e1e720c 100644
--- a/src/types/redis_stream_base.cc
+++ b/src/types/redis_stream_base.cc
@@ -37,8 +37,8 @@ const char *errEntryIdOutOfRange = "The ID specified in XADD 
must be greater tha
 const char *errStreamExhaustedEntryID = "The stream has exhausted the last 
possible ID, unable to add more items";
 
 Status IncrementStreamEntryID(StreamEntryID *id) {
-  if (id->seq == UINT64_MAX) {
-    if (id->ms == UINT64_MAX) {
+  if (id->seq == StreamEntryID::Maximum().seq) {
+    if (id->ms == StreamEntryID::Maximum().ms) {
       // special case where 'id' is the last possible entry ID
       id->ms = 0;
       id->seq = 0;
@@ -146,7 +146,7 @@ Status ParseRangeEnd(const std::string &input, 
StreamEntryID *id) {
     }
 
     id->ms = *parse_input;
-    id->seq = UINT64_MAX;
+    id->seq = StreamEntryID::Maximum().seq;
   }
 
   return Status::OK();
@@ -179,7 +179,7 @@ Status DecodeRawStreamEntryValue(const std::string &value, 
std::vector<std::stri
 }
 
 Status FullySpecifiedEntryID::GenerateID(const StreamEntryID &last_id, 
StreamEntryID *next_id) {
-  if (last_id.ms == UINT64_MAX && last_id.seq == UINT64_MAX) {
+  if (last_id.ms == StreamEntryID::Maximum().ms && last_id.seq == 
StreamEntryID::Maximum().seq) {
     return {Status::RedisExecErr, errStreamExhaustedEntryID};
   }
 
@@ -216,7 +216,7 @@ Status 
SpecificTimestampWithAnySequenceNumber::GenerateID(const StreamEntryID &l
   }
 
   if (ms_ == last_id.ms) {
-    if (last_id.seq == UINT64_MAX) {
+    if (last_id.seq == StreamEntryID::Maximum().seq) {
       return {Status::RedisExecErr, errSequenceNumberOverflow};
     }
 
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index 82c8f945..091c64bb 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -44,7 +44,7 @@ struct StreamEntryID {
     seq = 0;
   }
 
-  bool IsMaximum() const { return ms == UINT64_MAX && seq == UINT64_MAX; }
+  bool IsMaximum() const { return ms == UINT64_MAX - 1 && seq == UINT64_MAX; }
   bool IsMinimum() const { return ms == 0 && seq == 0; }
 
   bool operator<(const StreamEntryID &rhs) const {
@@ -64,7 +64,7 @@ struct StreamEntryID {
   std::string ToString() const { return fmt::format("{}-{}", ms, seq); }
 
   static StreamEntryID Minimum() { return StreamEntryID{0, 0}; }
-  static StreamEntryID Maximum() { return StreamEntryID{UINT64_MAX, 
UINT64_MAX}; }
+  static StreamEntryID Maximum() { return StreamEntryID{UINT64_MAX - 1, 
UINT64_MAX}; }
 };
 
 class NextStreamEntryIDGenerationStrategy {
diff --git a/tests/cppunit/types/stream_test.cc 
b/tests/cppunit/types/stream_test.cc
index 60d812f5..f522b827 100644
--- a/tests/cppunit/types/stream_test.cc
+++ b/tests/cppunit/types/stream_test.cc
@@ -114,7 +114,7 @@ TEST_F(RedisStreamTest, NextIDFullySpecified) {
   }
 
   {
-    last_id = redis::StreamEntryID(UINT64_MAX, UINT64_MAX);
+    last_id = redis::StreamEntryID::Maximum();
     auto strategy = ParseNextStreamEntryIDStrategy("11111111-22222");
     auto s = (*strategy)->GenerateID(last_id, &next_id);
     EXPECT_FALSE(s.IsOK());
@@ -146,7 +146,7 @@ TEST_F(RedisStreamTest, NextIDAutoGenerated) {
   }
 
   {
-    last_id = redis::StreamEntryID(UINT64_MAX, UINT64_MAX);
+    last_id = redis::StreamEntryID::Maximum();
     auto strategy = ParseNextStreamEntryIDStrategy("*");
     auto s = (*strategy)->GenerateID(last_id, &next_id);
     EXPECT_FALSE(s.IsOK());
@@ -223,7 +223,7 @@ TEST_F(RedisStreamTest, 
NextIDCurrentTimestampWithSpecificSeqNumber) {
   }
 
   {
-    last_id = redis::StreamEntryID(UINT64_MAX, 0);
+    last_id = redis::StreamEntryID(UINT64_MAX - 1, 0);
     auto strategy = ParseNextStreamEntryIDStrategy("*-123456");
     auto s = (*strategy)->GenerateID(last_id, &next_id);
     EXPECT_FALSE(s.IsOK());
@@ -239,7 +239,7 @@ TEST_F(RedisStreamTest, 
NextIDCurrentTimestampWithSpecificSeqNumber) {
   }
 
   {
-    last_id = redis::StreamEntryID(UINT64_MAX, UINT64_MAX);
+    last_id = redis::StreamEntryID::Maximum();
     auto strategy = ParseNextStreamEntryIDStrategy("*-123456");
     auto s = (*strategy)->GenerateID(last_id, &next_id);
     EXPECT_FALSE(s.IsOK());
@@ -364,7 +364,7 @@ TEST_F(RedisStreamTest, 
AddEntryWithExistingMsAnySeqNoAndExistingSeqNoIsAlreadyM
 }
 
 TEST_F(RedisStreamTest, AddEntryAndExistingMsAndSeqNoAreAlreadyMax) {
-  uint64_t ms = UINT64_MAX;
+  uint64_t ms = UINT64_MAX - 1;
   uint64_t seq = UINT64_MAX;
   redis::StreamAddOptions options;
   options.next_id_strategy = 
*ParseNextStreamEntryIDStrategy(fmt::format("{}-{}", ms, seq));
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index e4048eaf..0288fb79 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -101,7 +101,7 @@ var streamTests = func(t *testing.T, enabledRESP3 string) {
 
        t.Run("XADD IDs correctly report an error when overflowing", func(t 
*testing.T) {
                require.NoError(t, rdb.Del(ctx, "mystream").Err())
-               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: 
"mystream", ID: "18446744073709551615-18446744073709551615", Values: 
[]string{"a", "b"}}).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: 
"mystream", ID: "18446744073709551614-18446744073709551615", Values: 
[]string{"a", "b"}}).Err())
                require.ErrorContains(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: 
"mystream", ID: "*", Values: []string{"c", "d"}}).Err(), "ERR")
        })
 
@@ -286,7 +286,7 @@ var streamTests = func(t *testing.T, enabledRESP3 string) {
        })
 
        t.Run("XRANGE exclusive ranges", func(t *testing.T) {
-               ids := []string{"0-1", "0-18446744073709551615", "1-0", "42-0", 
"42-42", "18446744073709551615-18446744073709551614", 
"18446744073709551615-18446744073709551615"}
+               ids := []string{"0-1", "0-18446744073709551615", "1-0", "42-0", 
"42-42", "18446744073709551614-18446744073709551614", 
"18446744073709551614-18446744073709551615"}
                total := len(ids)
                require.NoError(t, rdb.Do(ctx, "MULTI").Err())
                // DEL returns "QUEUED" here, so we use Do to avoid ParseInt.
@@ -306,7 +306,7 @@ var streamTests = func(t *testing.T, enabledRESP3 string) {
                require.Len(t, rdb.XRange(ctx, "vipstream", "(1-0", 
"(42-42").Val(), 1)
                require.ErrorContains(t, rdb.XRange(ctx, "vipstream", "(-", 
"+").Err(), "ERR")
                require.ErrorContains(t, rdb.XRange(ctx, "vipstream", "-", 
"(+").Err(), "ERR")
-               require.ErrorContains(t, rdb.XRange(ctx, "vipstream", 
"(18446744073709551615-18446744073709551615", "+").Err(), "ERR")
+               require.ErrorContains(t, rdb.XRange(ctx, "vipstream", 
"(18446744073709551614-18446744073709551615", "+").Err(), "ERR")
                require.ErrorContains(t, rdb.XRange(ctx, "vipstream", "-", 
"(0-0").Err(), "ERR")
        })
 

Reply via email to