This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 45037538 feat(tdigest): refactor buffer merge's min max updating logic
(#2814)
45037538 is described below
commit 45037538ca4a4722e1d4134bcdde57d7f1f815d2
Author: Edward Xu <[email protected]>
AuthorDate: Wed Mar 5 12:45:47 2025 +0800
feat(tdigest): refactor buffer merge's min max updating logic (#2814)
---
src/types/redis_tdigest.cc | 5 +++--
src/types/tdigest.cc | 10 ++--------
2 files changed, 5 insertions(+), 10 deletions(-)
diff --git a/src/types/redis_tdigest.cc b/src/types/redis_tdigest.cc
index 6953f476..0ee55740 100644
--- a/src/types/redis_tdigest.cc
+++ b/src/types/redis_tdigest.cc
@@ -161,6 +161,9 @@ rocksdb::Status TDigest::Add(engine::Context& ctx, const
Slice& digest_name, con
metadata.total_observations += inputs.size();
metadata.total_weight += inputs.size();
+ auto [buffer_min, buffer_max] = std::minmax_element(inputs.cbegin(),
inputs.cend());
+ metadata.maximum = std::max(metadata.maximum, *buffer_max);
+ metadata.minimum = std::min(metadata.minimum, *buffer_min);
if (metadata.unmerged_nodes + inputs.size() <= metadata.capacity) {
if (auto status = appendBuffer(ctx, batch, ns_key, inputs, &metadata);
!status.ok()) {
@@ -279,8 +282,6 @@ rocksdb::Status
TDigest::mergeCurrentBuffer(engine::Context& ctx, const std::str
metadata->merge_times++;
metadata->merged_nodes = merged_centroids->centroids.size();
metadata->unmerged_nodes = 0;
- metadata->minimum = merged_centroids->min;
- metadata->maximum = merged_centroids->max;
metadata->merged_weight =
static_cast<uint64_t>(merged_centroids->total_weight);
return rocksdb::Status::OK();
diff --git a/src/types/tdigest.cc b/src/types/tdigest.cc
index 43a5a798..a51967a4 100644
--- a/src/types/tdigest.cc
+++ b/src/types/tdigest.cc
@@ -119,8 +119,8 @@ class TDigestImpl {
tdigests_[1].resize(0);
current_ = 0;
total_weight_ = 0;
- min_ = std::numeric_limits<double>::infinity();
- max_ = -std::numeric_limits<double>::infinity();
+ min_ = std::numeric_limits<double>::max();
+ max_ = std::numeric_limits<double>::lowest();
merger_.Reset(0, nullptr);
}
@@ -225,18 +225,12 @@ class TDigestImpl {
// merge input data with current tdigest
void MergeInput(std::vector<double> input) {
- if (tdigests_[current_].empty() && !input.empty()) {
- min_ = input.front();
- max_ = input.front();
- }
total_weight_ += static_cast<double>(input.size());
std::sort(input.begin(), input.end());
if (input.empty()) {
return;
}
- min_ = std::min(min_, input.front());
- max_ = std::max(max_, input.back());
// pick next minimal centroid from input and tdigest, feed to merger
merger_.Reset(total_weight_, &tdigests_[1 - current_]);