This is an automated email from the ASF dual-hosted git repository.
edwardxu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new e51d2cc02 feat(tdigest): add the support of TDIGEST.BYRANK and
TDIGEST.BYREVRANK command (#3296)
e51d2cc02 is described below
commit e51d2cc02c6d701bb4d8befa290c89d914e8dcbf
Author: Hao Dong <[email protected]>
AuthorDate: Fri Dec 19 11:40:17 2025 +0800
feat(tdigest): add the support of TDIGEST.BYRANK and TDIGEST.BYREVRANK
command (#3296)
Co-authored-by: Twice <[email protected]>
Co-authored-by: Edward Xu <[email protected]>
Co-authored-by: donghao526 <[email protected]>
Co-authored-by: Copilot <[email protected]>
---
src/commands/cmd_tdigest.cc | 77 +++++++-
src/commands/error_constants.h | 1 +
src/types/redis_tdigest.cc | 48 ++++-
src/types/redis_tdigest.h | 8 +-
src/types/tdigest.h | 62 +++++-
tests/cppunit/types/tdigest_test.cc | 97 ++++++++--
tests/gocase/unit/type/tdigest/tdigest_test.go | 258 +++++++++++++++++++++++++
7 files changed, 522 insertions(+), 29 deletions(-)
diff --git a/src/commands/cmd_tdigest.cc b/src/commands/cmd_tdigest.cc
index a5479cb12..f839a3f7f 100644
--- a/src/commands/cmd_tdigest.cc
+++ b/src/commands/cmd_tdigest.cc
@@ -206,9 +206,9 @@ class TDigestRankCommand : public Commander {
if (const auto s =
[&]() {
if constexpr (Reverse) {
- return tdigest.RevRank(ctx, key_name_, unique_inputs_, result);
+ return tdigest.RevRank(ctx, key_name_, unique_inputs_,
&result);
} else {
- return tdigest.Rank(ctx, key_name_, unique_inputs_, result);
+ return tdigest.Rank(ctx, key_name_, unique_inputs_, &result);
}
}();
!s.ok()) {
@@ -238,6 +238,77 @@ class CommandTDigestRevRank : public
TDigestRankCommand<true> {};
class CommandTDigestRank : public TDigestRankCommand<false> {};
+template <bool Reverse>
+class TDigestByRankCommand : public Commander {
+ public:
+ Status Parse(const std::vector<std::string> &args) override {
+ key_name_ = args[1];
+
+ std::set<std::string> unique_inputs_set(args.begin() + 2, args.end());
+ origin_inputs_.assign(args.begin() + 2, args.end());
+
+ unique_inputs_.reserve(unique_inputs_set.size());
+ size_t i = 0;
+ for (const auto &input : unique_inputs_set) {
+ auto value = ParseInt<int>(input);
+ if (!value) {
+ return {Status::RedisParseErr, errValueNotInteger};
+ }
+ if (*value < 0) {
+ return {Status::InvalidArgument, errInvalidRankValue};
+ }
+ unique_inputs_.push_back(*value);
+ unique_inputs_order_[input] = i;
+ ++i;
+ }
+ return Status::OK();
+ }
+ Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
+ TDigest tdigest(srv->storage, conn->GetNamespace());
+ std::vector<double> result;
+ result.reserve(origin_inputs_.size());
+
+ if (const auto s =
+ [&]() {
+ if constexpr (Reverse) {
+ return tdigest.ByRevRank(ctx, key_name_, unique_inputs_,
&result);
+ } else {
+ return tdigest.ByRank(ctx, key_name_, unique_inputs_, &result);
+ }
+ }();
+ !s.ok()) {
+ if (s.IsNotFound()) {
+ return {Status::RedisExecErr, errKeyNotFound};
+ }
+ return {Status::RedisExecErr, s.ToString()};
+ }
+
+ std::vector<std::string> ranks;
+ ranks.reserve(origin_inputs_.size());
+ auto is_resp3 = conn->GetProtocolVersion() == RESP::v3;
+ for (const auto &v : origin_inputs_) {
+ auto rank_value = result[unique_inputs_order_[v]];
+ if (is_resp3) {
+ ranks.push_back(redis::Double(redis::RESP::v3, rank_value));
+ } else {
+ ranks.push_back(redis::BulkString(fmt::format("{}", rank_value)));
+ }
+ }
+ *output = redis::Array(ranks);
+ return Status::OK();
+ }
+
+ private:
+ std::string key_name_;
+ std::vector<int> unique_inputs_;
+ std::map<std::string, size_t> unique_inputs_order_;
+ std::vector<std::string> origin_inputs_;
+};
+
+class CommandTDigestByRevRank : public TDigestByRankCommand<true> {};
+
+class CommandTDigestByRank : public TDigestByRankCommand<false> {};
+
class CommandTDigestMinMax : public Commander {
public:
explicit CommandTDigestMinMax(bool is_min) : is_min_(is_min) {}
@@ -433,6 +504,8 @@ REDIS_REGISTER_COMMANDS(TDigest,
MakeCmdAttr<CommandTDigestCreate>("tdigest.crea
MakeCmdAttr<CommandTDigestMin>("tdigest.min", 2,
"read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestRevRank>("tdigest.revrank",
-3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestRank>("tdigest.rank", -3,
"read-only", 1, 1, 1),
+
MakeCmdAttr<CommandTDigestByRevRank>("tdigest.byrevrank", -3, "read-only", 1,
1, 1),
+ MakeCmdAttr<CommandTDigestByRank>("tdigest.byrank",
-3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestQuantile>("tdigest.quantile", -3, "read-only", 1, 1,
1),
MakeCmdAttr<CommandTDigestReset>("tdigest.reset", 2,
"write", 1, 1, 1),
MakeCmdAttr<CommandTDigestMerge>("tdigest.merge", -4,
"write", GetMergeKeyRange));
diff --git a/src/commands/error_constants.h b/src/commands/error_constants.h
index 71235b6cc..bde89a6d7 100644
--- a/src/commands/error_constants.h
+++ b/src/commands/error_constants.h
@@ -53,4 +53,5 @@ inline constexpr const char *errKeyAlreadyExists = "key
already exists";
inline constexpr const char *errParsingNumkeys = "error parsing numkeys";
inline constexpr const char *errNumkeysMustBePositive = "numkeys need to be a
positive integer";
inline constexpr const char *errWrongKeyword = "wrong keyword";
+inline constexpr const char *errInvalidRankValue = "rank needs to be
non-negative";
} // namespace redis
diff --git a/src/types/redis_tdigest.cc b/src/types/redis_tdigest.cc
index d9a7d425a..6b06a0c07 100644
--- a/src/types/redis_tdigest.cc
+++ b/src/types/redis_tdigest.cc
@@ -262,7 +262,7 @@ rocksdb::Status TDigest::prepareRankData(engine::Context&
ctx, const Slice& dige
}
rocksdb::Status TDigest::Rank(engine::Context& ctx, const Slice& digest_name,
const std::vector<double>& inputs,
- std::vector<int>& result) {
+ std::vector<int>* result) {
TDigestMetadata metadata;
std::vector<Centroid> centroids;
if (auto status = prepareRankData(ctx, digest_name, metadata, centroids);
!status.ok()) {
@@ -270,7 +270,7 @@ rocksdb::Status TDigest::Rank(engine::Context& ctx, const
Slice& digest_name, co
}
if (metadata.total_observations == 0) {
- result.resize(inputs.size(), -2);
+ result->resize(inputs.size(), -2);
return rocksdb::Status::OK();
}
@@ -282,7 +282,7 @@ rocksdb::Status TDigest::Rank(engine::Context& ctx, const
Slice& digest_name, co
}
rocksdb::Status TDigest::RevRank(engine::Context& ctx, const Slice&
digest_name, const std::vector<double>& inputs,
- std::vector<int>& result) {
+ std::vector<int>* result) {
TDigestMetadata metadata;
std::vector<Centroid> centroids;
if (auto status = prepareRankData(ctx, digest_name, metadata, centroids);
!status.ok()) {
@@ -290,7 +290,7 @@ rocksdb::Status TDigest::RevRank(engine::Context& ctx,
const Slice& digest_name,
}
if (metadata.total_observations == 0) {
- result.resize(inputs.size(), -2);
+ result->resize(inputs.size(), -2);
return rocksdb::Status::OK();
}
@@ -301,6 +301,46 @@ rocksdb::Status TDigest::RevRank(engine::Context& ctx,
const Slice& digest_name,
return rocksdb::Status::OK();
}
+rocksdb::Status TDigest::ByRevRank(engine::Context& ctx, const Slice&
digest_name, const std::vector<int>& inputs,
+ std::vector<double>* result) {
+ TDigestMetadata metadata;
+ std::vector<Centroid> centroids;
+ if (auto status = prepareRankData(ctx, digest_name, metadata, centroids);
!status.ok()) {
+ return status;
+ }
+
+ if (metadata.total_observations == 0) {
+ result->resize(inputs.size(), std::numeric_limits<double>::quiet_NaN());
+ return rocksdb::Status::OK();
+ }
+
+ auto dump_centroids = DummyCentroids<true>(metadata, centroids);
+ if (auto status = TDigestByRank<true>(dump_centroids, inputs, result);
!status) {
+ return rocksdb::Status::InvalidArgument(status.Msg());
+ }
+ return rocksdb::Status::OK();
+}
+
+rocksdb::Status TDigest::ByRank(engine::Context& ctx, const Slice&
digest_name, const std::vector<int>& inputs,
+ std::vector<double>* result) {
+ TDigestMetadata metadata;
+ std::vector<Centroid> centroids;
+ if (auto status = prepareRankData(ctx, digest_name, metadata, centroids);
!status.ok()) {
+ return status;
+ }
+
+ if (metadata.total_observations == 0) {
+ result->resize(inputs.size(), std::numeric_limits<double>::quiet_NaN());
+ return rocksdb::Status::OK();
+ }
+
+ auto dump_centroids = DummyCentroids<false>(metadata, centroids);
+ if (auto status = TDigestByRank<false>(dump_centroids, inputs, result);
!status) {
+ return rocksdb::Status::InvalidArgument(status.Msg());
+ }
+ return rocksdb::Status::OK();
+}
+
rocksdb::Status TDigest::Quantile(engine::Context& ctx, const Slice&
digest_name, const std::vector<double>& qs,
TDigestQuantitleResult* result) {
auto ns_key = AppendNamespacePrefix(digest_name);
diff --git a/src/types/redis_tdigest.h b/src/types/redis_tdigest.h
index 4844f009f..236ec9eb4 100644
--- a/src/types/redis_tdigest.h
+++ b/src/types/redis_tdigest.h
@@ -78,9 +78,13 @@ class TDigest : public SubKeyScanner {
rocksdb::Status Merge(engine::Context& ctx, const Slice& dest_digest, const
std::vector<std::string>& source_digests,
const TDigestMergeOptions& options);
rocksdb::Status Rank(engine::Context& ctx, const Slice& digest_name, const
std::vector<double>& inputs,
- std::vector<int>& result);
+ std::vector<int>* result);
rocksdb::Status RevRank(engine::Context& ctx, const Slice& digest_name,
const std::vector<double>& inputs,
- std::vector<int>& result);
+ std::vector<int>* result);
+ rocksdb::Status ByRevRank(engine::Context& ctx, const Slice& digest_name,
const std::vector<int>& inputs,
+ std::vector<double>* result);
+ rocksdb::Status ByRank(engine::Context& ctx, const Slice& digest_name, const
std::vector<int>& inputs,
+ std::vector<double>* result);
rocksdb::Status GetMetaData(engine::Context& context, const Slice&
digest_name, TDigestMetadata* metadata);
private:
diff --git a/src/types/tdigest.h b/src/types/tdigest.h
index e4caf914f..b9c4a6fcb 100644
--- a/src/types/tdigest.h
+++ b/src/types/tdigest.h
@@ -172,14 +172,56 @@ struct DoubleComparator {
};
template <bool Reverse, typename TD>
-inline Status TDigestRank(TD&& td, const std::vector<double>& inputs,
std::vector<int>& result) {
+inline Status TDigestByRank(TD&& td, const std::vector<int>& inputs,
std::vector<double>* result) {
+ result->clear();
+ result->resize(inputs.size(), std::numeric_limits<double>::quiet_NaN());
+
+ std::map<int, size_t> rank_to_index;
+ for (size_t i = 0; i < inputs.size(); ++i) {
+ rank_to_index[inputs[i]] = i;
+ }
+
+ auto it = rank_to_index.begin();
+ auto is_end = [&it, &rank_to_index]() -> bool { return it ==
rank_to_index.end(); };
+ auto iter = td.Begin();
+ double cumulative_weight = 0;
+ while (iter->Valid() && !is_end()) {
+ auto centroid = GET_OR_RET(iter->GetCentroid());
+ cumulative_weight += centroid.weight;
+ while (!is_end() && it->first < static_cast<int>(cumulative_weight)) {
+ (*result)[it->second] = centroid.mean;
+ ++it;
+ }
+ iter->Next();
+ }
+
+ while (!is_end() && it->first >= static_cast<int>(td.TotalWeight())) {
+ if constexpr (Reverse) {
+ (*result)[it->second] = -std::numeric_limits<double>::infinity();
+ } else {
+ (*result)[it->second] = std::numeric_limits<double>::infinity();
+ }
+ ++it;
+ }
+
+ // check if all results are valid
+ for (auto r : *result) {
+ if (std::isnan(r)) {
+ return Status{Status::InvalidArgument, "invalid result when getting
byrank or byrevrank"};
+ }
+ }
+ return Status::OK();
+}
+
+template <bool Reverse, typename TD>
+inline Status TDigestRank(TD&& td, const std::vector<double>& inputs,
std::vector<int>* result) {
std::map<double, size_t, DoubleComparator> value_to_index;
for (size_t i = 0; i < inputs.size(); ++i) {
value_to_index[inputs[i]] = i;
}
- result.clear();
- result.resize(inputs.size(), -2);
+ result->clear();
+ result->resize(inputs.size(), -2);
using MapType = decltype(value_to_index);
using IterType = std::conditional_t<Reverse, typename
MapType::reverse_iterator, typename MapType::iterator>;
@@ -201,12 +243,12 @@ inline Status TDigestRank(TD&& td, const
std::vector<double>& inputs, std::vecto
// handle inputs larger than maximum in reverse order or smaller than
minimum in forward order
if constexpr (Reverse) {
while (!is_end() && it->first > td.Max()) {
- result[it->second] = -1;
+ (*result)[it->second] = -1;
++it;
}
} else {
while (!is_end() && it->first < td.Min()) {
- result[it->second] = -1;
+ (*result)[it->second] = -1;
++it;
}
}
@@ -233,7 +275,7 @@ inline Status TDigestRank(TD&& td, const
std::vector<double>& inputs, std::vecto
cumulative_weight += next_centroid.weight;
}
- result[it->second] = static_cast<int>(current_mean_cumulative_weight);
+ (*result)[it->second] = static_cast<int>(current_mean_cumulative_weight);
++it;
iter->Next();
} else if constexpr (Reverse) {
@@ -241,7 +283,7 @@ inline Status TDigestRank(TD&& td, const
std::vector<double>& inputs, std::vecto
cumulative_weight += centroid.weight;
iter->Next();
} else {
- result[it->second] = static_cast<int>(cumulative_weight);
+ (*result)[it->second] = static_cast<int>(cumulative_weight);
++it;
}
} else {
@@ -249,18 +291,18 @@ inline Status TDigestRank(TD&& td, const
std::vector<double>& inputs, std::vecto
cumulative_weight += centroid.weight;
iter->Next();
} else {
- result[it->second] = static_cast<int>(cumulative_weight);
+ (*result)[it->second] = static_cast<int>(cumulative_weight);
++it;
}
}
}
while (!is_end()) {
- result[it->second] = static_cast<int>(td.TotalWeight());
+ (*result)[it->second] = static_cast<int>(td.TotalWeight());
++it;
}
- for (auto r : result) {
+ for (auto r : *result) {
if (r <= -2) {
return Status{Status::InvalidArgument, "invalid result when computing
rank or revrank"};
}
diff --git a/tests/cppunit/types/tdigest_test.cc
b/tests/cppunit/types/tdigest_test.cc
index a6ae7b295..ed12df136 100644
--- a/tests/cppunit/types/tdigest_test.cc
+++ b/tests/cppunit/types/tdigest_test.cc
@@ -312,7 +312,7 @@ TEST_F(RedisTDigestTest,
RevRank_and_Rank_on_the_set_containing_different_elemen
std::vector<int> result;
result.reserve(input.size());
const std::vector<double> value = {0, 10, 20, 30, 40, 50, 60, 70};
- status = tdigest_->RevRank(*ctx_, test_digest_name, value, result);
+ status = tdigest_->RevRank(*ctx_, test_digest_name, value, &result);
const auto expect_result_revrank = std::vector<double>{6, 5, 4, 3, 2, 1, 0,
-1};
for (size_t i = 0; i < result.size(); i++) {
@@ -323,7 +323,7 @@ TEST_F(RedisTDigestTest,
RevRank_and_Rank_on_the_set_containing_different_elemen
result.clear();
result.reserve(input.size());
- status = tdigest_->Rank(*ctx_, test_digest_name, value, result);
+ status = tdigest_->Rank(*ctx_, test_digest_name, value, &result);
const auto expect_result_rank = std::vector<double>{-1, 0, 1, 2, 3, 4, 5, 6};
for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
@@ -345,7 +345,7 @@ TEST_F(RedisTDigestTest,
RevRank_and_Rank_on_the_set_containing_several_identica
std::vector<int> result;
const std::vector<double> value = {10, 20};
result.reserve(value.size());
- status = tdigest_->RevRank(*ctx_, test_digest_name, value, result);
+ status = tdigest_->RevRank(*ctx_, test_digest_name, value, &result);
const auto expect_result_revrank = std::vector<double>{3, 1};
for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
@@ -355,7 +355,7 @@ TEST_F(RedisTDigestTest,
RevRank_and_Rank_on_the_set_containing_several_identica
result.clear();
result.reserve(value.size());
- status = tdigest_->Rank(*ctx_, test_digest_name, value, result);
+ status = tdigest_->Rank(*ctx_, test_digest_name, value, &result);
const auto expect_result_rank = std::vector<double>{1, 4};
for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
@@ -368,7 +368,7 @@ TEST_F(RedisTDigestTest,
RevRank_and_Rank_on_the_set_containing_several_identica
result.clear();
result.reserve(value.size());
- status = tdigest_->RevRank(*ctx_, test_digest_name, value, result);
+ status = tdigest_->RevRank(*ctx_, test_digest_name, value, &result);
const auto expect_result_new_revrank = std::vector<double>{4, 1};
for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
@@ -378,7 +378,7 @@ TEST_F(RedisTDigestTest,
RevRank_and_Rank_on_the_set_containing_several_identica
result.clear();
result.reserve(value.size());
- status = tdigest_->Rank(*ctx_, test_digest_name, value, result);
+ status = tdigest_->Rank(*ctx_, test_digest_name, value, &result);
const auto expect_result_new_rank = std::vector<double>{2, 5};
for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
@@ -397,7 +397,7 @@ TEST_F(RedisTDigestTest, RevRank_and_Rank_on_empty_tdigest)
{
std::vector<int> result;
result.reserve(2);
const std::vector<double> value = {10, 20};
- status = tdigest_->RevRank(*ctx_, test_digest_name, value, result);
+ status = tdigest_->RevRank(*ctx_, test_digest_name, value, &result);
const auto expect_result_revrank = std::vector<double>{-2, -2};
for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
@@ -407,7 +407,7 @@ TEST_F(RedisTDigestTest, RevRank_and_Rank_on_empty_tdigest)
{
result.clear();
result.reserve(2);
- status = tdigest_->Rank(*ctx_, test_digest_name, value, result);
+ status = tdigest_->Rank(*ctx_, test_digest_name, value, &result);
const auto expect_result_rank = std::vector<double>{-2, -2};
for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
@@ -430,7 +430,7 @@ TEST_F(RedisTDigestTest,
RevRank_and_Rank_on_different_or_same_and_unordered_inp
std::vector<int> result;
const std::vector<double> value = {50, 36, 4, 99, 8.8};
result.reserve(value.size());
- status = tdigest_->Rank(*ctx_, test_digest_name, value, result);
+ status = tdigest_->Rank(*ctx_, test_digest_name, value, &result);
const auto expect_result_rank = std::vector<double>{13, 11, 1, 16, 4};
for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
@@ -441,11 +441,86 @@ TEST_F(RedisTDigestTest,
RevRank_and_Rank_on_different_or_same_and_unordered_inp
const std::vector<double> value_new = {50, 36, 4, 99, 8.8, 12};
result.clear();
result.reserve(value_new.size());
- status = tdigest_->RevRank(*ctx_, test_digest_name, value_new, result);
+ status = tdigest_->RevRank(*ctx_, test_digest_name, value_new, &result);
const auto expect_result_revrank = std::vector<double>{4, 7, 16, 1, 14, 12};
for (size_t i = 0; i < result.size(); i++) {
auto got = result[i];
EXPECT_EQ(got, expect_result_revrank[i]);
}
ASSERT_TRUE(status.ok()) << status.ToString();
-}
\ No newline at end of file
+}
+
+TEST_F(RedisTDigestTest, ByRank_And_ByRevRank) {
+ std::string test_digest_name = "test_digest_byrank_and_byrevrank" +
std::to_string(util::GetTimeStampMS());
+ bool exists = false;
+ auto status = tdigest_->Create(*ctx_, test_digest_name, {100}, &exists);
+ ASSERT_FALSE(exists);
+ ASSERT_TRUE(status.ok());
+
+ // Test 1: Empty TDigest should return NaN
+ std::vector<double> result;
+ std::vector<int> value = {1, 2};
+ result.reserve(value.size());
+ status = tdigest_->ByRank(*ctx_, test_digest_name, value, &result);
+ for (size_t i = 0; i < result.size(); i++) {
+ EXPECT_TRUE(std::isnan(result[i])) << "Expected NaN at index " << i << ",
got " << result[i];
+ }
+ ASSERT_TRUE(status.ok()) << status.ToString();
+
+ result.clear();
+ result.reserve(value.size());
+ status = tdigest_->ByRevRank(*ctx_, test_digest_name, value, &result);
+ for (size_t i = 0; i < result.size(); i++) {
+ EXPECT_TRUE(std::isnan(result[i])) << "Expected NaN at index " << i << ",
got " << result[i];
+ }
+ ASSERT_TRUE(status.ok()) << status.ToString();
+
+ // Test 2: Add values and test ByRank
+ // Add values: 1 2 2 3 3 3 4 4 4 4 5 5 5 5 5 (15 values)
+ std::vector<double> values = {1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5};
+ status = tdigest_->Add(*ctx_, test_digest_name, values);
+ ASSERT_TRUE(status.ok()) << status.ToString();
+
+ // Test ByRank: rank 0 should be min, increasing ranks should give
increasing values
+ std::vector<int> ranks = {0, 1, 2, 3, 6, 9, 10, 14, 15};
+ std::vector<double> expected_values = {
+ 1.0, 2.0, 2.0, 3.0, 4.0, 4.0, 5.0, 5.0,
std::numeric_limits<double>::infinity()};
+ result.clear();
+ status = tdigest_->ByRank(*ctx_, test_digest_name, ranks, &result);
+ ASSERT_TRUE(status.ok()) << status.ToString();
+ ASSERT_EQ(result.size(), ranks.size());
+
+ for (size_t i = 0; i < result.size(); i++) {
+ if (std::isinf(expected_values[i])) {
+ EXPECT_TRUE(std::isinf(result[i])) << "Expected inf at rank " <<
ranks[i] << ", got " << result[i];
+ } else {
+ EXPECT_DOUBLE_EQ(result[i], expected_values[i])
+ << "ByRank mismatch at rank " << ranks[i] << ": expected " <<
expected_values[i] << ", got " << result[i];
+ }
+ }
+
+ // Test ByRevRank: rank 0 should be max, increasing ranks should give
decreasing values
+ std::vector<double> expected_revvalues = {
+ 5.0, 5.0, 5.0, 5.0, 4.0, 3.0, 3.0, 1.0,
-std::numeric_limits<double>::infinity()};
+ result.clear();
+ status = tdigest_->ByRevRank(*ctx_, test_digest_name, ranks, &result);
+ ASSERT_TRUE(status.ok()) << status.ToString();
+ ASSERT_EQ(result.size(), ranks.size());
+
+ for (size_t i = 0; i < result.size(); i++) {
+ if (std::isinf(expected_revvalues[i])) {
+ EXPECT_TRUE(std::isinf(result[i])) << "Expected inf at revrank " <<
ranks[i] << ", got " << result[i];
+ } else {
+ EXPECT_DOUBLE_EQ(result[i], expected_revvalues[i]) << "ByRevRank
mismatch at rank " << ranks[i] << ": expected "
+ <<
expected_revvalues[i] << ", got " << result[i];
+ }
+ }
+
+ // Test 3: Test boundary conditions
+ std::vector<int> boundary_ranks = {0, 7, 14, 100};
+ result.clear();
+ status = tdigest_->ByRank(*ctx_, test_digest_name, boundary_ranks, &result);
+ ASSERT_TRUE(status.ok()) << status.ToString();
+ EXPECT_EQ(result[0], 1.0) << "Rank 0 should be minimum";
+ EXPECT_TRUE(std::isinf(result[3])) << "Rank >= total_weight should be
infinity";
+}
diff --git a/tests/gocase/unit/type/tdigest/tdigest_test.go
b/tests/gocase/unit/type/tdigest/tdigest_test.go
index 335ee0eff..bcc4a832c 100644
--- a/tests/gocase/unit/type/tdigest/tdigest_test.go
+++ b/tests/gocase/unit/type/tdigest/tdigest_test.go
@@ -23,6 +23,7 @@ package tdigest
import (
"context"
+ "math"
"strconv"
"testing"
@@ -717,3 +718,260 @@ func tdigestTests(t *testing.T, configs
util.KvrocksServerConfigs) {
}
})
}
+
+func TestTDigestByRankAndByRevRank(t *testing.T) {
+ configOptions := []util.ConfigOptions{
+ {
+ Name: "txn-context-enabled",
+ Options: []string{"yes", "no"},
+ ConfigType: util.YesNo,
+ },
+ {
+ Name: "resp3-enabled",
+ Options: []string{"yes", "no"},
+ ConfigType: util.YesNo,
+ },
+ }
+
+ configsMatrix, err := util.GenerateConfigsMatrix(configOptions)
+ require.NoError(t, err)
+
+ for _, configs := range configsMatrix {
+ tdigestTestsByRankAndByRevRank(t, configs)
+ }
+}
+
+func tdigestTestsByRankAndByRevRank(t *testing.T, configs
util.KvrocksServerConfigs) {
+ srv := util.StartServer(t, configs)
+ defer srv.Close()
+ ctx := context.Background()
+ rdb := srv.NewClient()
+ defer func() { require.NoError(t, rdb.Close()) }()
+
+ t.Run("tdigest.byrank and tdigest.byrevrank on empty sketch", func(t
*testing.T) {
+ key := "tdigest_byrank_on_empty_sketch"
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", key,
"compression", "100").Err())
+
+ // Test BYRANK on empty sketch
+ rsp := rdb.Do(ctx, "TDIGEST.BYRANK", key, "1", "2", "4", "5",
"0", "1", "20")
+ require.NoError(t, rsp.Err())
+ vals, err := rsp.Slice()
+ require.NoError(t, err)
+ require.Len(t, vals, 7)
+ isRESP3 := configs["resp3-enabled"] == "yes"
+ if isRESP3 {
+ for i, v := range vals {
+ rank, ok := v.(float64)
+ require.True(t, ok, "expected float64 but got
%T at index %d", v, i)
+ require.True(t, math.IsNaN(rank), "expected NaN
but got %v at index %d", rank, i)
+ }
+ } else {
+ expected := []string{"nan", "nan", "nan", "nan", "nan",
"nan", "nan"}
+ for i, v := range vals {
+ rank, ok := v.(string)
+ require.True(t, ok, "expected string but got %T
at index %d", v, i)
+ require.EqualValues(t, expected[i], rank, "RANK
mismatch at index %d", i)
+ }
+ }
+
+ // Test BYREVRANK on empty sketch
+ rsp = rdb.Do(ctx, "TDIGEST.BYREVRANK", key, "1", "2", "4", "5",
"0", "1", "20")
+ require.NoError(t, rsp.Err())
+ vals, err = rsp.Slice()
+ require.NoError(t, err)
+ require.Len(t, vals, 7)
+ if isRESP3 {
+ for i, v := range vals {
+ rank, ok := v.(float64)
+ require.True(t, ok, "expected float64 but got
%T at index %d", v, i)
+ require.True(t, math.IsNaN(rank), "expected NaN
but got %v at index %d", rank, i)
+ }
+ } else {
+ expected := []string{"nan", "nan", "nan", "nan", "nan",
"nan", "nan"}
+ for i, v := range vals {
+ rank, ok := v.(string)
+ require.True(t, ok, "expected string but got %T
at index %d", v, i)
+ require.EqualValues(t, expected[i], rank,
"REVRANK mismatch at index %d", i)
+ }
+ }
+ })
+
+ t.Run("tdigest.byrank and tdigest.byrevrank on non-empty sketch",
func(t *testing.T) {
+ key := "tdigest_byrank_non_empty"
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", key,
"compression", "100").Err())
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", key, "1", "2",
"3", "4", "5").Err())
+ isRESP3 := configs["resp3-enabled"] == "yes"
+
+ rsp := rdb.Do(ctx, "TDIGEST.BYRANK", key, "0", "1", "2", "3",
"4", "5", "6")
+ require.NoError(t, rsp.Err())
+ vals, err := rsp.Slice()
+ require.NoError(t, err)
+ require.Len(t, vals, 7)
+ if isRESP3 {
+ // Expected: rank 0 -> 1, rank 1 -> 2, rank 2 -> 3,
rank 3 -> 4, rank 4 -> 5
+ // Expected: rank 5, 6 -> out of range (+inf)
+ expected := []float64{1, 2, 3, 4, 5, math.Inf(1),
math.Inf(1)}
+ for i, v := range vals {
+ rank, ok := v.(float64)
+ require.True(t, ok, "expected float64 but got
%T at index %d", v, i)
+ if math.IsInf(expected[i], 1) {
+ require.True(t, math.IsInf(rank, 1),
"expected +Inf but got %v at index %d", rank, i)
+ } else {
+ require.InDelta(t, expected[i], rank,
0.1, "BYRANK mismatch at index %d", i)
+ }
+ }
+ } else {
+ expectedStrings := []string{"1", "2", "3", "4", "5",
"inf", "inf"}
+ for i, v := range vals {
+ rank, ok := v.(string)
+ require.True(t, ok, "expected string but got %T
at index %d", v, i)
+ require.Equal(t, expectedStrings[i], rank,
"BYRANK mismatch at index %d", i)
+ }
+ }
+
+ rsp = rdb.Do(ctx, "TDIGEST.BYREVRANK", key, "0", "1", "2", "3",
"4", "5", "6")
+ require.NoError(t, rsp.Err())
+ vals, err = rsp.Slice()
+ require.NoError(t, err)
+ require.Len(t, vals, 7)
+
+ if isRESP3 {
+ // Expected: rank 0 -> 5, rank 1 -> 4, rank 2 -> 3,
rank 3 -> 2, rank 4 -> 1
+ // Expected: rank 5, 6 -> out of range (-inf)
+ expected := []float64{5, 4, 3, 2, 1, math.Inf(-1),
math.Inf(-1)}
+ for i, v := range vals {
+ rank, ok := v.(float64)
+ require.True(t, ok, "expected float64 but got
%T at index %d", v, i)
+ if math.IsInf(expected[i], -1) {
+ require.True(t, math.IsInf(rank, -1),
"expected -Inf but got %v at index %d", rank, i)
+ } else {
+ require.InDelta(t, expected[i], rank,
0.1, "BYREVRANK mismatch at index %d", i)
+ }
+ }
+ } else {
+ expectedStrings := []string{"5", "4", "3", "2", "1",
"-inf", "-inf"}
+ for i, v := range vals {
+ rank, ok := v.(string)
+ require.True(t, ok, "expected string but got %T
at index %d", v, i)
+ require.Equal(t, expectedStrings[i], rank,
"BYREVRANK mismatch at index %d", i)
+ }
+ }
+ })
+
+ t.Run("tdigest.byrank and tdigest.byrevrank with duplicate values",
func(t *testing.T) {
+ key := "tdigest_byrank_duplicates"
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", key,
"compression", "100").Err())
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", key, "1", "2",
"2", "3", "3", "3").Err())
+ isRESP3 := configs["resp3-enabled"] == "yes"
+
+ rsp := rdb.Do(ctx, "TDIGEST.BYRANK", key, "0", "1", "2", "3",
"4", "5", "6", "7")
+ require.NoError(t, rsp.Err())
+ vals, err := rsp.Slice()
+ require.NoError(t, err)
+ require.Len(t, vals, 8)
+ // test BYRANK with duplicate values
+ if isRESP3 {
+ expected := []float64{1, 2, 2, 3, 3, 3}
+ for i, v := range vals {
+ rank, ok := v.(float64)
+ require.True(t, ok, "expected float64 but got
%T at index %d", v, i)
+ if i < 6 {
+ require.InDelta(t, expected[i], rank,
0.1, "BYRANK mismatch at index %d", i)
+ } else {
+ require.True(t, math.IsInf(rank, 1),
"rank %d should be +Inf, got %v", i, rank)
+ }
+ }
+ } else {
+ expectedStrings := []string{"1", "2", "2", "3", "3",
"3", "inf", "inf"}
+ for i, v := range vals {
+ rank, ok := v.(string)
+ require.True(t, ok, "expected string but got %T
at index %d", v, i)
+ require.Equal(t, expectedStrings[i], rank,
"BYRANK mismatch at index %d", i)
+ }
+ }
+
+ // test BYREVRANK with duplicate values
+ rsp = rdb.Do(ctx, "TDIGEST.BYREVRANK", key, "0", "1", "2", "3",
"4", "5", "6", "7")
+ require.NoError(t, rsp.Err())
+ vals, err = rsp.Slice()
+ require.NoError(t, err)
+ require.Len(t, vals, 8)
+ if isRESP3 {
+ expected := []float64{3, 3, 3, 2, 2, 1}
+ for i, v := range vals {
+ rank, ok := v.(float64)
+ require.True(t, ok, "expected float64 but got
%T at index %d", v, i)
+ if i < 6 {
+ require.InDelta(t, expected[i], rank,
0.1, "BYREVRANK mismatch at index %d", i)
+ } else {
+ require.True(t, math.IsInf(rank, -1),
"rank %d should be -Inf, got %v", i, rank)
+ }
+ }
+ } else {
+ expectedStrings := []string{"3", "3", "3", "2", "2",
"1", "-inf", "-inf"}
+ for i, v := range vals {
+ rank, ok := v.(string)
+ require.True(t, ok, "expected string but got %T
at index %d", v, i)
+ require.Equal(t, expectedStrings[i], rank,
"BYREVRANK mismatch at index %d", i)
+ }
+ }
+ })
+
+ t.Run("tdigest.byrank and tdigest.byrevrank with unordered duplicate
values", func(t *testing.T) {
+ key := "tdigest_byrank_unordered_dup_"
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", key,
"compression", "100").Err())
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", key, "12", "100",
"50", "36", "75", "81", "35.5", "46", "36", "8.8", "15", "4", "32.5", "12",
"8.8", "7", "99", "1").Err())
+ isRESP3 := configs["resp3-enabled"] == "yes"
+
+ rsp := rdb.Do(ctx, "TDIGEST.BYRANK", key, "0", "1", "2", "3",
"4", "5", "6", "7", "7", "5", "20", "100")
+ require.NoError(t, rsp.Err())
+ vals, err := rsp.Slice()
+ require.NoError(t, err)
+ require.Len(t, vals, 12)
+ if isRESP3 {
+ expected := []float64{1, 4, 7, 8.8, 8.8, 12, 12, 15,
15, 12}
+ for i, v := range vals {
+ rank, ok := v.(float64)
+ require.True(t, ok, "expected float64 but got
%T at index %d", v, i)
+ if i < 10 {
+ require.InDelta(t, expected[i], rank,
0.1, "BYRANK mismatch at index %d", i)
+ } else {
+ require.True(t, math.IsInf(rank, 1),
"rank %d should be +Inf, got %v", i, rank)
+ }
+ }
+ } else {
+ expectedStrings := []string{"1", "4", "7", "8.8",
"8.8", "12", "12", "15", "15", "12", "inf", "inf"}
+ for i, v := range vals {
+ rank, ok := v.(string)
+ require.True(t, ok, "expected string but got %T
at index %d", v, i)
+ require.Equal(t, expectedStrings[i], rank,
"BYRANK mismatch at index %d", i)
+ }
+ }
+
+ // test BYREVRANK with duplicate values
+ rsp = rdb.Do(ctx, "TDIGEST.BYREVRANK", key, "20", "75", "0",
"1", "2", "3", "4", "5", "6", "7")
+ require.NoError(t, rsp.Err())
+ vals, err = rsp.Slice()
+ require.NoError(t, err)
+ require.Len(t, vals, 10)
+ if isRESP3 {
+ expected := []float64{0, 0, 100, 99, 81, 75, 50, 46,
36, 36}
+ for i, v := range vals {
+ rank, ok := v.(float64)
+ require.True(t, ok, "expected float64 but got
%T at index %d", v, i)
+ if i < 2 {
+ require.True(t, math.IsInf(rank, -1),
"rank %d should be -Inf, got %v", i, rank)
+ } else {
+ require.InDelta(t, expected[i], rank,
0.1, "BYREVRANK mismatch at index %d", i)
+ }
+ }
+ } else {
+ expectedStrings := []string{"-inf", "-inf", "100",
"99", "81", "75", "50", "46", "36", "36"}
+ for i, v := range vals {
+ rank, ok := v.(string)
+ require.True(t, ok, "expected string but got %T
at index %d", v, i)
+ require.Equal(t, expectedStrings[i], rank,
"BYREVRANK mismatch at index %d", i)
+ }
+ }
+ })
+}