This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 18e6674ee fix(tdigest): fix centroid encoding overwrite for same mean
(#2878)
18e6674ee is described below
commit 18e6674eecbfe3361891883f7777c1d188e5ba25
Author: Edward Xu <[email protected]>
AuthorDate: Wed Apr 16 11:20:09 2025 +0800
fix(tdigest): fix centroid encoding overwrite for same mean (#2878)
Co-authored-by: Twice <[email protected]>
---
src/types/redis_tdigest.cc | 13 ++++++++++---
src/types/redis_tdigest.h | 2 +-
tests/cppunit/types/tdigest_test.cc | 35 +++++++++++++++++++++++++++++++++++
3 files changed, 46 insertions(+), 4 deletions(-)
diff --git a/src/types/redis_tdigest.cc b/src/types/redis_tdigest.cc
index e9f31d2b1..90d8159e2 100644
--- a/src/types/redis_tdigest.cc
+++ b/src/types/redis_tdigest.cc
@@ -332,10 +332,13 @@ std::string TDigest::internalBufferKey(const std::string&
ns_key, const TDigestM
}
std::string TDigest::internalKeyFromCentroid(const std::string& ns_key, const
TDigestMetadata& metadata,
- const Centroid& centroid) const {
+ const Centroid& centroid,
uint32_t seq) const {
std::string sub_key;
PutFixed8(&sub_key, static_cast<uint8_t>(SegmentType::kCentroids));
PutDouble(&sub_key, centroid.mean); // It uses EncodeDoubleToUInt64 and
keeps original order of double
+ // The tdigest centroids only cares about the weight rather than the mean,
so different centroids may have same mean,
+ // we should keep them with same original order, this seq id could be
discarded in decode
+ PutFixed32(&sub_key, seq);
return InternalKey(ns_key, sub_key, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
}
@@ -364,6 +367,9 @@ rocksdb::Status TDigest::decodeCentroidFromKeyValue(const
rocksdb::Slice& key, c
return rocksdb::Status::Corruption("corrupted tdigest centroid key");
}
+ // The seq id after mean is not used in tdigest, but it is used to keep the
original order of the centroids, so
+ // discard it for simplicity
+
if (rocksdb::Slice value_slice = value; // GetDouble needs a mutable
pointer of slice
!GetDouble(&value_slice, ¢roid->weight)) {
LOG(ERROR) << "corrupted tdigest centroid value, extract weight failed";
@@ -463,8 +469,9 @@ rocksdb::Status
TDigest::dumpCentroidsAndBuffer(engine::Context& ctx, const std:
rocksdb::Status
TDigest::applyNewCentroids(ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch,
const std::string& ns_key, const
TDigestMetadata& metadata,
const std::vector<Centroid>&
centroids) {
- for (const auto& c : centroids) {
- auto centroid_key = internalKeyFromCentroid(ns_key, metadata, c);
+ for (size_t i = 0; i < centroids.size(); ++i) {
+ const auto& c = centroids[i];
+ auto centroid_key = internalKeyFromCentroid(ns_key, metadata, c, i);
auto centroid_payload = internalValueFromCentroid(c);
if (auto status = batch->Put(cf_handle_, centroid_key, centroid_payload);
!status.ok()) {
return status;
diff --git a/src/types/redis_tdigest.h b/src/types/redis_tdigest.h
index 33da44a67..24ba650f2 100644
--- a/src/types/redis_tdigest.h
+++ b/src/types/redis_tdigest.h
@@ -112,7 +112,7 @@ class TDigest : public SubKeyScanner {
const std::vector<double>*
additional_buffer = nullptr);
std::string internalBufferKey(const std::string& ns_key, const
TDigestMetadata& metadata) const;
std::string internalKeyFromCentroid(const std::string& ns_key, const
TDigestMetadata& metadata,
- const Centroid& centroid) const;
+ const Centroid& centroid, uint32_t seq)
const;
static std::string internalValueFromCentroid(const Centroid& centroid);
rocksdb::Status decodeCentroidFromKeyValue(const rocksdb::Slice& key, const
rocksdb::Slice& value,
Centroid* centroid) const;
diff --git a/tests/cppunit/types/tdigest_test.cc
b/tests/cppunit/types/tdigest_test.cc
index 7d2b6e6e2..4bc8ae039 100644
--- a/tests/cppunit/types/tdigest_test.cc
+++ b/tests/cppunit/types/tdigest_test.cc
@@ -244,3 +244,38 @@ TEST_F(RedisTDigestTest, Add_2_times) {
expect_upper);
}
}
+
+TEST_F(RedisTDigestTest, Add_100_times_same_value) {
+ std::string test_digest_name = "test_digest_quantile" +
std::to_string(util::GetTimeStampMS());
+
+ bool exists = false;
+ auto status = tdigest_->Create(*ctx_, test_digest_name, {100}, &exists);
+ ASSERT_FALSE(exists);
+ ASSERT_TRUE(status.ok());
+
+ auto samples = std::vector<double>{-10, -9, -8, -7, -6, -5, -4, -3, -2, -1,
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
+ auto qs = std::vector<double>{0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95,
0.99};
+
+ auto repeat_times = 100;
+
+ for (auto i = 0; i < repeat_times; ++i) {
+ std::shuffle(samples.begin(), samples.end(), std::mt19937(kSeed));
+ status = tdigest_->Add(*ctx_, test_digest_name, samples);
+ ASSERT_TRUE(status.ok()) << status.ToString();
+ }
+
+ redis::TDigestQuantitleResult tdigest_result;
+ status = tdigest_->Quantile(*ctx_, test_digest_name, qs, &tdigest_result);
+ ASSERT_TRUE(status.ok()) << status.ToString();
+
+ auto expect_result = std::vector<double>{
+ -10, -9, -8, -5, 1, 7, 10, 11, 12,
+ };
+
+ EXPECT_EQ(tdigest_result.quantiles.size(), qs.size());
+
+ for (size_t i = 0; i < qs.size(); i++) {
+ auto got = tdigest_result.quantiles[i];
+ EXPECT_NEAR(got, expect_result[i], 0.5) << fmt::format("quantile is {},
should be {}", qs[i], expect_result[i]);
+ }
+}