This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 18e6674ee fix(tdigest): fix centroid encoding overwrite for same mean 
(#2878)
18e6674ee is described below

commit 18e6674eecbfe3361891883f7777c1d188e5ba25
Author: Edward Xu <[email protected]>
AuthorDate: Wed Apr 16 11:20:09 2025 +0800

    fix(tdigest): fix centroid encoding overwrite for same mean (#2878)
    
    Co-authored-by: Twice <[email protected]>
---
 src/types/redis_tdigest.cc          | 13 ++++++++++---
 src/types/redis_tdigest.h           |  2 +-
 tests/cppunit/types/tdigest_test.cc | 35 +++++++++++++++++++++++++++++++++++
 3 files changed, 46 insertions(+), 4 deletions(-)

diff --git a/src/types/redis_tdigest.cc b/src/types/redis_tdigest.cc
index e9f31d2b1..90d8159e2 100644
--- a/src/types/redis_tdigest.cc
+++ b/src/types/redis_tdigest.cc
@@ -332,10 +332,13 @@ std::string TDigest::internalBufferKey(const std::string& 
ns_key, const TDigestM
 }
 
 std::string TDigest::internalKeyFromCentroid(const std::string& ns_key, const 
TDigestMetadata& metadata,
-                                             const Centroid& centroid) const {
+                                             const Centroid& centroid, 
uint32_t seq) const {
   std::string sub_key;
   PutFixed8(&sub_key, static_cast<uint8_t>(SegmentType::kCentroids));
   PutDouble(&sub_key, centroid.mean);  // It uses EncodeDoubleToUInt64 and 
keeps original order of double
+  // The tdigest centroids only cares about the weight rather than the mean, 
so different centroids may have same mean,
+  // we should keep them with same original order, this seq id could be 
discarded in decode
+  PutFixed32(&sub_key, seq);
   return InternalKey(ns_key, sub_key, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
 }
 
@@ -364,6 +367,9 @@ rocksdb::Status TDigest::decodeCentroidFromKeyValue(const 
rocksdb::Slice& key, c
     return rocksdb::Status::Corruption("corrupted tdigest centroid key");
   }
 
+  // The seq id after mean is not used in tdigest, but it is used to keep the 
original order of the centroids, so
+  // discard it for simplicity
+
   if (rocksdb::Slice value_slice = value;  // GetDouble needs a mutable 
pointer of slice
       !GetDouble(&value_slice, &centroid->weight)) {
     LOG(ERROR) << "corrupted tdigest centroid value, extract weight failed";
@@ -463,8 +469,9 @@ rocksdb::Status 
TDigest::dumpCentroidsAndBuffer(engine::Context& ctx, const std:
 rocksdb::Status 
TDigest::applyNewCentroids(ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch,
                                            const std::string& ns_key, const 
TDigestMetadata& metadata,
                                            const std::vector<Centroid>& 
centroids) {
-  for (const auto& c : centroids) {
-    auto centroid_key = internalKeyFromCentroid(ns_key, metadata, c);
+  for (size_t i = 0; i < centroids.size(); ++i) {
+    const auto& c = centroids[i];
+    auto centroid_key = internalKeyFromCentroid(ns_key, metadata, c, i);
     auto centroid_payload = internalValueFromCentroid(c);
     if (auto status = batch->Put(cf_handle_, centroid_key, centroid_payload); 
!status.ok()) {
       return status;
diff --git a/src/types/redis_tdigest.h b/src/types/redis_tdigest.h
index 33da44a67..24ba650f2 100644
--- a/src/types/redis_tdigest.h
+++ b/src/types/redis_tdigest.h
@@ -112,7 +112,7 @@ class TDigest : public SubKeyScanner {
                                      const std::vector<double>* 
additional_buffer = nullptr);
   std::string internalBufferKey(const std::string& ns_key, const 
TDigestMetadata& metadata) const;
   std::string internalKeyFromCentroid(const std::string& ns_key, const 
TDigestMetadata& metadata,
-                                      const Centroid& centroid) const;
+                                      const Centroid& centroid, uint32_t seq) 
const;
   static std::string internalValueFromCentroid(const Centroid& centroid);
   rocksdb::Status decodeCentroidFromKeyValue(const rocksdb::Slice& key, const 
rocksdb::Slice& value,
                                              Centroid* centroid) const;
diff --git a/tests/cppunit/types/tdigest_test.cc 
b/tests/cppunit/types/tdigest_test.cc
index 7d2b6e6e2..4bc8ae039 100644
--- a/tests/cppunit/types/tdigest_test.cc
+++ b/tests/cppunit/types/tdigest_test.cc
@@ -244,3 +244,38 @@ TEST_F(RedisTDigestTest, Add_2_times) {
                                                 expect_upper);
   }
 }
+
+TEST_F(RedisTDigestTest, Add_100_times_same_value) {
+  std::string test_digest_name = "test_digest_quantile" + 
std::to_string(util::GetTimeStampMS());
+
+  bool exists = false;
+  auto status = tdigest_->Create(*ctx_, test_digest_name, {100}, &exists);
+  ASSERT_FALSE(exists);
+  ASSERT_TRUE(status.ok());
+
+  auto samples = std::vector<double>{-10, -9, -8, -7, -6, -5, -4, -3, -2, -1, 
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
+  auto qs = std::vector<double>{0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 
0.99};
+
+  auto repeat_times = 100;
+
+  for (auto i = 0; i < repeat_times; ++i) {
+    std::shuffle(samples.begin(), samples.end(), std::mt19937(kSeed));
+    status = tdigest_->Add(*ctx_, test_digest_name, samples);
+    ASSERT_TRUE(status.ok()) << status.ToString();
+  }
+
+  redis::TDigestQuantitleResult tdigest_result;
+  status = tdigest_->Quantile(*ctx_, test_digest_name, qs, &tdigest_result);
+  ASSERT_TRUE(status.ok()) << status.ToString();
+
+  auto expect_result = std::vector<double>{
+      -10, -9, -8, -5, 1, 7, 10, 11, 12,
+  };
+
+  EXPECT_EQ(tdigest_result.quantiles.size(), qs.size());
+
+  for (size_t i = 0; i < qs.size(); i++) {
+    auto got = tdigest_result.quantiles[i];
+    EXPECT_NEAR(got, expect_result[i], 0.5) << fmt::format("quantile is {}, 
should be {}", qs[i], expect_result[i]);
+  }
+}

Reply via email to