This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new a2a3826a refactor(stream): change the encoding of stream consumer
group (#2384)
a2a3826a is described below
commit a2a3826a3b78e36089de1cc862d4f88e99034a82
Author: Hauru <[email protected]>
AuthorDate: Wed Jul 17 12:23:16 2024 +0800
refactor(stream): change the encoding of stream consumer group (#2384)
---
src/commands/cmd_stream.cc | 8 +-
src/types/redis_stream.cc | 222 +++++++++++++--------------
src/types/redis_stream_base.cc | 10 +-
src/types/redis_stream_base.h | 4 +-
tests/cppunit/types/stream_test.cc | 10 +-
tests/gocase/unit/type/stream/stream_test.go | 6 +-
6 files changed, 128 insertions(+), 132 deletions(-)
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index f227aecf..ea448332 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -1107,7 +1107,7 @@ class CommandXRead : public Commander,
redis::StreamRangeOptions options;
options.reverse = false;
options.start = ids_[i];
- options.end = StreamEntryID{UINT64_MAX, UINT64_MAX};
+ options.end = StreamEntryID::Maximum();
options.with_count = with_count_;
options.count = count_;
options.exclude_start = true;
@@ -1218,7 +1218,7 @@ class CommandXRead : public Commander,
redis::StreamRangeOptions options;
options.reverse = false;
options.start = ids_[i];
- options.end = StreamEntryID{UINT64_MAX, UINT64_MAX};
+ options.end = StreamEntryID::Maximum();
options.with_count = with_count_;
options.count = count_;
options.exclude_start = true;
@@ -1405,7 +1405,7 @@ class CommandXReadGroup : public Commander,
redis::StreamRangeOptions options;
options.reverse = false;
options.start = ids_[i];
- options.end = StreamEntryID{UINT64_MAX, UINT64_MAX};
+ options.end = StreamEntryID::Maximum();
options.with_count = with_count_;
options.count = count_;
options.exclude_start = true;
@@ -1510,7 +1510,7 @@ class CommandXReadGroup : public Commander,
redis::StreamRangeOptions options;
options.reverse = false;
options.start = ids_[i];
- options.end = StreamEntryID{UINT64_MAX, UINT64_MAX};
+ options.end = StreamEntryID::Maximum();
options.with_count = with_count_;
options.count = count_;
options.exclude_start = true;
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index a3f6c330..7d12c46e 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -31,7 +31,6 @@
namespace redis {
-std::string_view consumerGroupMetadataDelimiter = "METADATA";
const char *errSetEntryIdSmallerThanLastGenerated =
"The ID specified in XSETID is smaller than the target stream top item";
const char *errEntriesAddedSmallerThanStreamSize =
@@ -169,9 +168,10 @@ rocksdb::Status Stream::Add(const Slice &stream_name,
const StreamAddOptions &op
std::string Stream::internalKeyFromGroupName(const std::string &ns_key, const
StreamMetadata &metadata,
const std::string &group_name)
const {
std::string sub_key;
+ PutFixed64(&sub_key, UINT64_MAX);
+ PutFixed8(&sub_key, (uint8_t)StreamSubkeyType::StreamConsumerGroupMetadata);
PutFixed64(&sub_key, group_name.size());
sub_key += group_name;
- sub_key += consumerGroupMetadataDelimiter;
std::string entry_key = InternalKey(ns_key, sub_key, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
return entry_key;
}
@@ -179,6 +179,10 @@ std::string Stream::internalKeyFromGroupName(const
std::string &ns_key, const St
std::string Stream::groupNameFromInternalKey(rocksdb::Slice key) const {
InternalKey ikey(key, storage_->IsSlotIdEncoded());
Slice group_name_metadata = ikey.GetSubKey();
+ uint64_t entry_delimiter = 0;
+ GetFixed64(&group_name_metadata, &entry_delimiter);
+ uint8_t type_delimiter = 0;
+ GetFixed8(&group_name_metadata, &type_delimiter);
uint64_t len = 0;
GetFixed64(&group_name_metadata, &len);
std::string group_name;
@@ -214,11 +218,12 @@ StreamConsumerGroupMetadata
Stream::decodeStreamConsumerGroupMetadataValue(const
std::string Stream::internalKeyFromConsumerName(const std::string &ns_key,
const StreamMetadata &metadata,
const std::string &group_name,
const std::string &consumer_name) const {
std::string sub_key;
+ PutFixed64(&sub_key, UINT64_MAX);
+ PutFixed8(&sub_key, (uint8_t)StreamSubkeyType::StreamConsumerMetadata);
PutFixed64(&sub_key, group_name.size());
sub_key += group_name;
PutFixed64(&sub_key, consumer_name.size());
sub_key += consumer_name;
- sub_key += consumerGroupMetadataDelimiter;
std::string entry_key = InternalKey(ns_key, sub_key, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
return entry_key;
}
@@ -226,6 +231,10 @@ std::string Stream::internalKeyFromConsumerName(const
std::string &ns_key, const
std::string Stream::consumerNameFromInternalKey(rocksdb::Slice key) const {
InternalKey ikey(key, storage_->IsSlotIdEncoded());
Slice subkey = ikey.GetSubKey();
+ uint64_t entry_delimiter = 0;
+ GetFixed64(&subkey, &entry_delimiter);
+ uint8_t type_delimiter = 0;
+ GetFixed8(&subkey, &type_delimiter);
uint64_t group_name_len = 0;
GetFixed64(&subkey, &group_name_len);
subkey.remove_prefix(group_name_len);
@@ -254,6 +263,8 @@ StreamConsumerMetadata
Stream::decodeStreamConsumerMetadataValue(const std::stri
std::string Stream::internalPelKeyFromGroupAndEntryId(const std::string
&ns_key, const StreamMetadata &metadata,
const std::string
&group_name, const StreamEntryID &id) {
std::string sub_key;
+ PutFixed64(&sub_key, UINT64_MAX);
+ PutFixed8(&sub_key, (uint8_t)StreamSubkeyType::StreamPelEntry);
PutFixed64(&sub_key, group_name.size());
sub_key += group_name;
PutFixed64(&sub_key, id.ms);
@@ -265,6 +276,10 @@ std::string
Stream::internalPelKeyFromGroupAndEntryId(const std::string &ns_key,
StreamEntryID Stream::groupAndEntryIdFromPelInternalKey(rocksdb::Slice key,
std::string &group_name) {
InternalKey ikey(key, storage_->IsSlotIdEncoded());
Slice subkey = ikey.GetSubKey();
+ uint64_t entry_delimiter = 0;
+ GetFixed64(&subkey, &entry_delimiter);
+ uint8_t type_delimiter = 0;
+ GetFixed8(&subkey, &type_delimiter);
uint64_t group_name_len = 0;
GetFixed64(&subkey, &group_name_len);
group_name = subkey.ToString().substr(0, group_name_len);
@@ -298,21 +313,14 @@ StreamPelEntry Stream::decodeStreamPelEntryValue(const
std::string &value) {
StreamSubkeyType Stream::identifySubkeyType(const rocksdb::Slice &key) const {
InternalKey ikey(key, storage_->IsSlotIdEncoded());
Slice subkey = ikey.GetSubKey();
- const size_t entry_id_size = sizeof(StreamEntryID);
- if (subkey.size() <= entry_id_size) {
+ uint64_t entry_delimiter = 0;
+ GetFixed64(&subkey, &entry_delimiter);
+ if (entry_delimiter != UINT64_MAX) {
return StreamSubkeyType::StreamEntry;
}
- uint64_t group_name_len = 0;
- GetFixed64(&subkey, &group_name_len);
- std::string without_group_name = subkey.ToString().substr(group_name_len);
- const size_t metadata_delimiter_size = consumerGroupMetadataDelimiter.size();
- if (without_group_name.size() <= metadata_delimiter_size) {
- return StreamSubkeyType::StreamConsumerGroupMetadata;
- }
- if (without_group_name.size() <= entry_id_size) {
- return StreamSubkeyType::StreamPelEntry;
- }
- return StreamSubkeyType::StreamConsumerMetadata;
+ uint8_t type_delimiter = 0;
+ GetFixed8(&subkey, &type_delimiter);
+ return (StreamSubkeyType)type_delimiter;
}
rocksdb::Status Stream::DeletePelEntries(const Slice &stream_name, const
std::string &group_name,
@@ -565,59 +573,54 @@ rocksdb::Status Stream::AutoClaim(const Slice
&stream_name, const std::string &g
auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
uint64_t total_claimed_count = 0;
for (iter->SeekToFirst(); iter->Valid() && count > 0 && attempts > 0;
iter->Next()) {
- if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) {
- std::string tmp_group_name;
- StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(),
tmp_group_name);
- if (tmp_group_name != group_name) {
- continue;
- }
+ std::string tmp_group_name;
+ StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(),
tmp_group_name);
- if (options.exclude_start && entry_id == options.start_id) {
- continue;
- }
+ if (options.exclude_start && entry_id == options.start_id) {
+ continue;
+ }
- attempts--;
+ attempts--;
- StreamPelEntry penl_entry =
decodeStreamPelEntryValue(iter->value().ToString());
- if ((now_ms - penl_entry.last_delivery_time_ms) <
options.min_idle_time_ms) {
+ StreamPelEntry penl_entry =
decodeStreamPelEntryValue(iter->value().ToString());
+ if ((now_ms - penl_entry.last_delivery_time_ms) <
options.min_idle_time_ms) {
+ continue;
+ }
+
+ auto entry_key = internalKeyFromEntryID(ns_key, metadata, entry_id);
+ std::string entry_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key,
&entry_value);
+ if (!s.ok()) {
+ if (s.IsNotFound()) {
+ deleted_entries.push_back(entry_id);
+ batch->Delete(stream_cf_handle_, iter->key());
+ --count;
continue;
}
+ return s;
+ }
- auto entry_key = internalKeyFromEntryID(ns_key, metadata, entry_id);
- std::string entry_value;
- s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key,
&entry_value);
- if (!s.ok()) {
- if (s.IsNotFound()) {
- deleted_entries.push_back(entry_id);
- batch->Delete(stream_cf_handle_, iter->key());
- --count;
- continue;
- }
- return s;
+ StreamEntry entry(entry_id.ToString(), {});
+ if (!options.just_id) {
+ auto rv_status = DecodeRawStreamEntryValue(entry_value, &entry.values);
+ if (!rv_status.OK()) {
+ return rocksdb::Status::InvalidArgument(rv_status.Msg());
}
+ }
- StreamEntry entry(entry_id.ToString(), {});
- if (!options.just_id) {
- auto rv_status = DecodeRawStreamEntryValue(entry_value, &entry.values);
- if (!rv_status.OK()) {
- return rocksdb::Status::InvalidArgument(rv_status.Msg());
- }
- }
+ pending_entries.emplace_back(std::move(entry));
+ --count;
- pending_entries.emplace_back(std::move(entry));
- --count;
-
- if (penl_entry.consumer_name != consumer_name) {
- ++total_claimed_count;
- claimed_consumer_entity_count[penl_entry.consumer_name] += 1;
- penl_entry.consumer_name = consumer_name;
- penl_entry.last_delivery_time_ms = now_ms;
- // Increment the delivery attempts counter unless JUSTID option
provided
- if (!options.just_id) {
- penl_entry.last_delivery_count += 1;
- }
- batch->Put(stream_cf_handle_, iter->key(),
encodeStreamPelEntryValue(penl_entry));
+ if (penl_entry.consumer_name != consumer_name) {
+ ++total_claimed_count;
+ claimed_consumer_entity_count[penl_entry.consumer_name] += 1;
+ penl_entry.consumer_name = consumer_name;
+ penl_entry.last_delivery_time_ms = now_ms;
+ // Increment the delivery attempts counter unless JUSTID option provided
+ if (!options.just_id) {
+ penl_entry.last_delivery_count += 1;
}
+ batch->Put(stream_cf_handle_, iter->key(),
encodeStreamPelEntryValue(penl_entry));
}
}
@@ -642,10 +645,8 @@ rocksdb::Status Stream::AutoClaim(const Slice
&stream_name, const std::string &g
bool has_next_entry = false;
for (; iter->Valid(); iter->Next()) {
- if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) {
- has_next_entry = true;
- break;
- }
+ has_next_entry = true;
+ break;
}
if (auto s = iter->status(); !s.ok()) {
@@ -875,14 +876,9 @@ rocksdb::Status Stream::DestroyConsumer(const Slice
&stream_name, const std::str
auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
- if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) {
- std::string tmp_group_name;
- FMT_MAYBE_UNUSED StreamEntryID entry_id =
groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
- if (tmp_group_name != group_name) continue;
- StreamPelEntry pel_entry =
decodeStreamPelEntryValue(iter->value().ToString());
- if (pel_entry.consumer_name == consumer_name) {
- batch->Delete(stream_cf_handle_, iter->key());
- }
+ StreamPelEntry pel_entry =
decodeStreamPelEntryValue(iter->value().ToString());
+ if (pel_entry.consumer_name == consumer_name) {
+ batch->Delete(stream_cf_handle_, iter->key());
}
}
@@ -1256,7 +1252,7 @@ rocksdb::Status Stream::GetStreamInfo(const
rocksdb::Slice &stream_name, bool fu
}
static bool StreamRangeHasTombstones(const StreamMetadata &metadata,
StreamEntryID start_id) {
- StreamEntryID end_id = StreamEntryID{UINT64_MAX, UINT64_MAX};
+ StreamEntryID end_id = StreamEntryID::Maximum();
if (metadata.size == 0 || metadata.max_deleted_entry_id == StreamEntryID{0,
0}) {
return false;
}
@@ -1316,9 +1312,12 @@ rocksdb::Status Stream::GetGroupInfo(const Slice
&stream_name,
rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s;
+ std::string subkey_type_delimiter = std::to_string(UINT64_MAX);
+ PutFixed8(&subkey_type_delimiter,
(uint8_t)StreamSubkeyType::StreamConsumerGroupMetadata);
std::string next_version_prefix_key =
- InternalKey(ns_key, "", metadata.version + 1,
storage_->IsSlotIdEncoded()).Encode();
- std::string prefix_key = InternalKey(ns_key, "", metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+ InternalKey(ns_key, subkey_type_delimiter, metadata.version + 1,
storage_->IsSlotIdEncoded()).Encode();
+ std::string prefix_key =
+ InternalKey(ns_key, subkey_type_delimiter, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
LatestSnapShot ss(storage_);
@@ -1330,13 +1329,11 @@ rocksdb::Status Stream::GetGroupInfo(const Slice
&stream_name,
auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
- if (identifySubkeyType(iter->key()) ==
StreamSubkeyType::StreamConsumerGroupMetadata) {
- std::string group_name = groupNameFromInternalKey(iter->key());
- StreamConsumerGroupMetadata cg_metadata =
decodeStreamConsumerGroupMetadataValue(iter->value().ToString());
- CheckLagValid(metadata, cg_metadata);
- std::pair<std::string, StreamConsumerGroupMetadata> tmp_item(group_name,
cg_metadata);
- group_metadata.push_back(tmp_item);
- }
+ std::string group_name = groupNameFromInternalKey(iter->key());
+ StreamConsumerGroupMetadata cg_metadata =
decodeStreamConsumerGroupMetadataValue(iter->value().ToString());
+ CheckLagValid(metadata, cg_metadata);
+ std::pair<std::string, StreamConsumerGroupMetadata> tmp_item(group_name,
cg_metadata);
+ group_metadata.push_back(tmp_item);
}
return iter->status();
}
@@ -1349,9 +1346,15 @@ rocksdb::Status Stream::GetConsumerInfo(
rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s;
+ std::string subkey_type_delimiter;
+ PutFixed64(&subkey_type_delimiter, UINT64_MAX);
+ PutFixed8(&subkey_type_delimiter,
(uint8_t)StreamSubkeyType::StreamConsumerMetadata);
+ PutFixed64(&subkey_type_delimiter, group_name.size());
+ subkey_type_delimiter += group_name;
std::string next_version_prefix_key =
- InternalKey(ns_key, "", metadata.version + 1,
storage_->IsSlotIdEncoded()).Encode();
- std::string prefix_key = InternalKey(ns_key, "", metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+ InternalKey(ns_key, subkey_type_delimiter, metadata.version + 1,
storage_->IsSlotIdEncoded()).Encode();
+ std::string prefix_key =
+ InternalKey(ns_key, subkey_type_delimiter, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
LatestSnapShot ss(storage_);
@@ -1363,14 +1366,10 @@ rocksdb::Status Stream::GetConsumerInfo(
auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
- if (identifySubkeyType(iter->key()) ==
StreamSubkeyType::StreamConsumerMetadata) {
- std::string cur_group_name = groupNameFromInternalKey(iter->key());
- if (cur_group_name != group_name) continue;
- std::string consumer_name = consumerNameFromInternalKey(iter->key());
- StreamConsumerMetadata c_metadata =
decodeStreamConsumerMetadataValue(iter->value().ToString());
- std::pair<std::string, StreamConsumerMetadata> tmp_item(consumer_name,
c_metadata);
- consumer_metadata.push_back(tmp_item);
- }
+ std::string consumer_name = consumerNameFromInternalKey(iter->key());
+ StreamConsumerMetadata c_metadata =
decodeStreamConsumerMetadataValue(iter->value().ToString());
+ std::pair<std::string, StreamConsumerMetadata> tmp_item(consumer_name,
c_metadata);
+ consumer_metadata.push_back(tmp_item);
}
return iter->status();
}
@@ -1506,29 +1505,26 @@ rocksdb::Status Stream::RangeWithPending(const Slice
&stream_name, StreamRangeOp
auto iter = util::UniqueIterator(storage_, read_options,
stream_cf_handle_);
uint64_t count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
- if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry)
{
- std::string tmp_group_name;
- StreamEntryID entry_id =
groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
- if (tmp_group_name != group_name) continue;
- StreamPelEntry pel_entry =
decodeStreamPelEntryValue(iter->value().ToString());
- if (pel_entry.consumer_name != consumer_name) continue;
- std::string raw_value;
- rocksdb::Status st = getEntryRawValue(ns_key, metadata, entry_id,
&raw_value);
- if (!st.ok() && !st.IsNotFound()) {
- return st;
- }
- std::vector<std::string> values;
- auto rv = DecodeRawStreamEntryValue(raw_value, &values);
- if (!rv.IsOK()) {
- return rocksdb::Status::InvalidArgument(rv.Msg());
- }
- entries->emplace_back(entry_id.ToString(), std::move(values));
- pel_entry.last_delivery_count += 1;
- pel_entry.last_delivery_time_ms = now_ms;
- batch->Put(stream_cf_handle_, iter->key(),
encodeStreamPelEntryValue(pel_entry));
- ++count;
- if (count >= options.count) break;
+ std::string tmp_group_name;
+ StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(),
tmp_group_name);
+ StreamPelEntry pel_entry =
decodeStreamPelEntryValue(iter->value().ToString());
+ if (pel_entry.consumer_name != consumer_name) continue;
+ std::string raw_value;
+ rocksdb::Status st = getEntryRawValue(ns_key, metadata, entry_id,
&raw_value);
+ if (!st.ok() && !st.IsNotFound()) {
+ return st;
+ }
+ std::vector<std::string> values;
+ auto rv = DecodeRawStreamEntryValue(raw_value, &values);
+ if (!rv.IsOK()) {
+ return rocksdb::Status::InvalidArgument(rv.Msg());
}
+ entries->emplace_back(entry_id.ToString(), std::move(values));
+ pel_entry.last_delivery_count += 1;
+ pel_entry.last_delivery_time_ms = now_ms;
+ batch->Put(stream_cf_handle_, iter->key(),
encodeStreamPelEntryValue(pel_entry));
+ ++count;
+ if (count >= options.count) break;
}
if (auto s = iter->status(); !s.ok()) {
diff --git a/src/types/redis_stream_base.cc b/src/types/redis_stream_base.cc
index 87141307..1e1e720c 100644
--- a/src/types/redis_stream_base.cc
+++ b/src/types/redis_stream_base.cc
@@ -37,8 +37,8 @@ const char *errEntryIdOutOfRange = "The ID specified in XADD
must be greater tha
const char *errStreamExhaustedEntryID = "The stream has exhausted the last
possible ID, unable to add more items";
Status IncrementStreamEntryID(StreamEntryID *id) {
- if (id->seq == UINT64_MAX) {
- if (id->ms == UINT64_MAX) {
+ if (id->seq == StreamEntryID::Maximum().seq) {
+ if (id->ms == StreamEntryID::Maximum().ms) {
// special case where 'id' is the last possible entry ID
id->ms = 0;
id->seq = 0;
@@ -146,7 +146,7 @@ Status ParseRangeEnd(const std::string &input,
StreamEntryID *id) {
}
id->ms = *parse_input;
- id->seq = UINT64_MAX;
+ id->seq = StreamEntryID::Maximum().seq;
}
return Status::OK();
@@ -179,7 +179,7 @@ Status DecodeRawStreamEntryValue(const std::string &value,
std::vector<std::stri
}
Status FullySpecifiedEntryID::GenerateID(const StreamEntryID &last_id,
StreamEntryID *next_id) {
- if (last_id.ms == UINT64_MAX && last_id.seq == UINT64_MAX) {
+ if (last_id.ms == StreamEntryID::Maximum().ms && last_id.seq ==
StreamEntryID::Maximum().seq) {
return {Status::RedisExecErr, errStreamExhaustedEntryID};
}
@@ -216,7 +216,7 @@ Status
SpecificTimestampWithAnySequenceNumber::GenerateID(const StreamEntryID &l
}
if (ms_ == last_id.ms) {
- if (last_id.seq == UINT64_MAX) {
+ if (last_id.seq == StreamEntryID::Maximum().seq) {
return {Status::RedisExecErr, errSequenceNumberOverflow};
}
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index 82c8f945..091c64bb 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -44,7 +44,7 @@ struct StreamEntryID {
seq = 0;
}
- bool IsMaximum() const { return ms == UINT64_MAX && seq == UINT64_MAX; }
+ bool IsMaximum() const { return ms == UINT64_MAX - 1 && seq == UINT64_MAX; }
bool IsMinimum() const { return ms == 0 && seq == 0; }
bool operator<(const StreamEntryID &rhs) const {
@@ -64,7 +64,7 @@ struct StreamEntryID {
std::string ToString() const { return fmt::format("{}-{}", ms, seq); }
static StreamEntryID Minimum() { return StreamEntryID{0, 0}; }
- static StreamEntryID Maximum() { return StreamEntryID{UINT64_MAX,
UINT64_MAX}; }
+ static StreamEntryID Maximum() { return StreamEntryID{UINT64_MAX - 1,
UINT64_MAX}; }
};
class NextStreamEntryIDGenerationStrategy {
diff --git a/tests/cppunit/types/stream_test.cc
b/tests/cppunit/types/stream_test.cc
index 60d812f5..f522b827 100644
--- a/tests/cppunit/types/stream_test.cc
+++ b/tests/cppunit/types/stream_test.cc
@@ -114,7 +114,7 @@ TEST_F(RedisStreamTest, NextIDFullySpecified) {
}
{
- last_id = redis::StreamEntryID(UINT64_MAX, UINT64_MAX);
+ last_id = redis::StreamEntryID::Maximum();
auto strategy = ParseNextStreamEntryIDStrategy("11111111-22222");
auto s = (*strategy)->GenerateID(last_id, &next_id);
EXPECT_FALSE(s.IsOK());
@@ -146,7 +146,7 @@ TEST_F(RedisStreamTest, NextIDAutoGenerated) {
}
{
- last_id = redis::StreamEntryID(UINT64_MAX, UINT64_MAX);
+ last_id = redis::StreamEntryID::Maximum();
auto strategy = ParseNextStreamEntryIDStrategy("*");
auto s = (*strategy)->GenerateID(last_id, &next_id);
EXPECT_FALSE(s.IsOK());
@@ -223,7 +223,7 @@ TEST_F(RedisStreamTest,
NextIDCurrentTimestampWithSpecificSeqNumber) {
}
{
- last_id = redis::StreamEntryID(UINT64_MAX, 0);
+ last_id = redis::StreamEntryID(UINT64_MAX - 1, 0);
auto strategy = ParseNextStreamEntryIDStrategy("*-123456");
auto s = (*strategy)->GenerateID(last_id, &next_id);
EXPECT_FALSE(s.IsOK());
@@ -239,7 +239,7 @@ TEST_F(RedisStreamTest,
NextIDCurrentTimestampWithSpecificSeqNumber) {
}
{
- last_id = redis::StreamEntryID(UINT64_MAX, UINT64_MAX);
+ last_id = redis::StreamEntryID::Maximum();
auto strategy = ParseNextStreamEntryIDStrategy("*-123456");
auto s = (*strategy)->GenerateID(last_id, &next_id);
EXPECT_FALSE(s.IsOK());
@@ -364,7 +364,7 @@ TEST_F(RedisStreamTest,
AddEntryWithExistingMsAnySeqNoAndExistingSeqNoIsAlreadyM
}
TEST_F(RedisStreamTest, AddEntryAndExistingMsAndSeqNoAreAlreadyMax) {
- uint64_t ms = UINT64_MAX;
+ uint64_t ms = UINT64_MAX - 1;
uint64_t seq = UINT64_MAX;
redis::StreamAddOptions options;
options.next_id_strategy =
*ParseNextStreamEntryIDStrategy(fmt::format("{}-{}", ms, seq));
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index e4048eaf..0288fb79 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -101,7 +101,7 @@ var streamTests = func(t *testing.T, enabledRESP3 string) {
t.Run("XADD IDs correctly report an error when overflowing", func(t
*testing.T) {
require.NoError(t, rdb.Del(ctx, "mystream").Err())
- require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"mystream", ID: "18446744073709551615-18446744073709551615", Values:
[]string{"a", "b"}}).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"mystream", ID: "18446744073709551614-18446744073709551615", Values:
[]string{"a", "b"}}).Err())
require.ErrorContains(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"mystream", ID: "*", Values: []string{"c", "d"}}).Err(), "ERR")
})
@@ -286,7 +286,7 @@ var streamTests = func(t *testing.T, enabledRESP3 string) {
})
t.Run("XRANGE exclusive ranges", func(t *testing.T) {
- ids := []string{"0-1", "0-18446744073709551615", "1-0", "42-0",
"42-42", "18446744073709551615-18446744073709551614",
"18446744073709551615-18446744073709551615"}
+ ids := []string{"0-1", "0-18446744073709551615", "1-0", "42-0",
"42-42", "18446744073709551614-18446744073709551614",
"18446744073709551614-18446744073709551615"}
total := len(ids)
require.NoError(t, rdb.Do(ctx, "MULTI").Err())
// DEL returns "QUEUED" here, so we use Do to avoid ParseInt.
@@ -306,7 +306,7 @@ var streamTests = func(t *testing.T, enabledRESP3 string) {
require.Len(t, rdb.XRange(ctx, "vipstream", "(1-0",
"(42-42").Val(), 1)
require.ErrorContains(t, rdb.XRange(ctx, "vipstream", "(-",
"+").Err(), "ERR")
require.ErrorContains(t, rdb.XRange(ctx, "vipstream", "-",
"(+").Err(), "ERR")
- require.ErrorContains(t, rdb.XRange(ctx, "vipstream",
"(18446744073709551615-18446744073709551615", "+").Err(), "ERR")
+ require.ErrorContains(t, rdb.XRange(ctx, "vipstream",
"(18446744073709551614-18446744073709551615", "+").Err(), "ERR")
require.ErrorContains(t, rdb.XRange(ctx, "vipstream", "-",
"(0-0").Err(), "ERR")
})