LindaSummer commented on code in PR #3130:
URL: https://github.com/apache/kvrocks/pull/3130#discussion_r2308941503
##########
src/types/redis_tdigest.cc:
##########
@@ -186,8 +188,37 @@ rocksdb::Status TDigest::Add(engine::Context& ctx, const
Slice& digest_name, con
return storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
-rocksdb::Status TDigest::Quantile(engine::Context& ctx, const Slice&
digest_name, const std::vector<double>& qs,
- TDigestQuantitleResult* result) {
+rocksdb::Status TDigest::mergeNodes(engine::Context& ctx, const std::string&
ns_key, TDigestMetadata* metadata) {
+ if (metadata->unmerged_nodes == 0) {
+ return rocksdb::Status::OK();
+ }
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisTDigest);
+ if (auto status = batch->PutLogData(log_data.Encode()); !status.ok()) {
+ return status;
+ }
+
+ if (auto status = mergeCurrentBuffer(ctx, ns_key, batch, metadata);
!status.ok()) {
+ return status;
+ }
+
+ std::string metadata_bytes;
+ metadata->Encode(&metadata_bytes);
+ if (auto status = batch->Put(metadata_cf_handle_, ns_key, metadata_bytes);
!status.ok()) {
+ return status;
+ }
+
+ if (auto status = storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch()); !status.ok()) {
+ return status;
+ }
+
+ ctx.RefreshLatestSnapshot();
Review Comment:
~~We'd better refresh the metadata after the snapshot refresh. Another
thread may update it.~~
A lock guards this merge action, so the metadata should remain consistent
after refreshing the snapshot.
##########
src/types/tdigest.h:
##########
@@ -150,3 +151,74 @@ inline StatusOr<double> TDigestQuantile(TD&& td, double q)
{
diff /= (lc.weight / 2 + rc.weight / 2);
return Lerp(lc.mean, rc.mean, diff);
}
+
+template <typename TD>
+inline Status TDigestRank(TD&& td, const std::vector<double>& inputs,
std::vector<int>& result) {
Review Comment:
This function seems specialized for `RevRank` rather than `Rank`.
Could we implement the `Rank`, then wrap the `Iterator` to a reverse version
with the same logic to construct `RevRank`?
##########
src/types/tdigest.h:
##########
@@ -150,3 +151,74 @@ inline StatusOr<double> TDigestQuantile(TD&& td, double q)
{
diff /= (lc.weight / 2 + rc.weight / 2);
return Lerp(lc.mean, rc.mean, diff);
}
+
+template <typename TD>
+inline Status TDigestRank(TD&& td, const std::vector<double>& inputs,
std::vector<int>& result) {
+ std::vector<size_t> indices(inputs.size());
+ std::iota(indices.begin(), indices.end(), 0);
+ std::sort(indices.begin(), indices.end(), [&inputs](size_t a, size_t b) {
return inputs[a] < inputs[b]; });
+
+ result.resize(inputs.size());
+ size_t i = indices.size();
+ double cumulative_weight = 0;
+
+ // handle inputs larger than maximum
+ while (i > 0 && inputs[indices[i - 1]] > td.Max()) {
+ result[indices[i - 1]] = -1;
+ i--;
+ }
+
+ // reverse iterate through centroids and calculate reverse rank for each
input
+ auto iter = td.End();
+ while (i > 0) {
+ auto centroid = GET_OR_RET(iter->GetCentroid());
+
+ if (centroid.mean > inputs[indices[i - 1]]) {
+ // mean > input, accumulate weight and move to prev centroid
+ cumulative_weight += centroid.weight;
+ } else if (centroid.mean == inputs[indices[i - 1]]) {
+ // mean == input, calculate reverse rank with half weight of current
centroid
+ cumulative_weight += centroid.weight;
+ auto current_mean = centroid.mean;
+ auto current_mean_cumulative_weight = cumulative_weight +
centroid.weight / 2;
+
+ // handle all the prev centroids which has the same mean
+ while (!iter->IsAtBegin() && iter->Prev()) {
+ auto next_centroid = GET_OR_RET(iter->GetCentroid());
+ if (current_mean != next_centroid.mean) {
+ // move back to the last equal centroid, because we will process it
in the next loop
+ iter->Next();
+ break;
+ }
+ current_mean_cumulative_weight += centroid.weight / 2;
+ cumulative_weight += centroid.weight;
+ }
+
+ // assign the reverse rank for the inputs[indices[i - 1]]
+ result[indices[i - 1]] =
static_cast<int>(current_mean_cumulative_weight);
+ i--;
+
+ // handle the prev inputs which has the same value
+ while ((i > 0) && (inputs[indices[i]] == inputs[indices[i - 1]])) {
Review Comment:
Maybe we could use a `map<double, std::vector<size_t>>` to construct a
sorted input and group redundant input with rank into a group, and it may
simplify our logic.
##########
src/types/tdigest.h:
##########
@@ -150,3 +151,74 @@ inline StatusOr<double> TDigestQuantile(TD&& td, double q)
{
diff /= (lc.weight / 2 + rc.weight / 2);
return Lerp(lc.mean, rc.mean, diff);
}
+
+template <typename TD>
+inline Status TDigestRank(TD&& td, const std::vector<double>& inputs,
std::vector<int>& result) {
+ std::vector<size_t> indices(inputs.size());
+ std::iota(indices.begin(), indices.end(), 0);
+ std::sort(indices.begin(), indices.end(), [&inputs](size_t a, size_t b) {
return inputs[a] < inputs[b]; });
+
+ result.resize(inputs.size());
+ size_t i = indices.size();
+ double cumulative_weight = 0;
+
+ // handle inputs larger than maximum
+ while (i > 0 && inputs[indices[i - 1]] > td.Max()) {
+ result[indices[i - 1]] = -1;
+ i--;
+ }
+
+ // reverse iterate through centroids and calculate reverse rank for each
input
+ auto iter = td.End();
+ while (i > 0) {
+ auto centroid = GET_OR_RET(iter->GetCentroid());
+
+ if (centroid.mean > inputs[indices[i - 1]]) {
+ // mean > input, accumulate weight and move to prev centroid
+ cumulative_weight += centroid.weight;
+ } else if (centroid.mean == inputs[indices[i - 1]]) {
+ // mean == input, calculate reverse rank with half weight of current
centroid
+ cumulative_weight += centroid.weight;
+ auto current_mean = centroid.mean;
+ auto current_mean_cumulative_weight = cumulative_weight +
centroid.weight / 2;
+
+ // handle all the prev centroids which has the same mean
+ while (!iter->IsAtBegin() && iter->Prev()) {
+ auto next_centroid = GET_OR_RET(iter->GetCentroid());
+ if (current_mean != next_centroid.mean) {
+ // move back to the last equal centroid, because we will process it
in the next loop
+ iter->Next();
+ break;
+ }
+ current_mean_cumulative_weight += centroid.weight / 2;
+ cumulative_weight += centroid.weight;
+ }
+
+ // assign the reverse rank for the inputs[indices[i - 1]]
+ result[indices[i - 1]] =
static_cast<int>(current_mean_cumulative_weight);
+ i--;
+
+ // handle the prev inputs which has the same value
+ while ((i > 0) && (inputs[indices[i]] == inputs[indices[i - 1]])) {
+ result[indices[i - 1]] = result[indices[i]];
+ i--;
+ }
+ } else {
+ // mean < input, calculate reverse rank
+ result[indices[i - 1]] = static_cast<int>(cumulative_weight);
+ i--;
+ }
+
+ if (iter->IsAtBegin()) {
+ break;
+ }
+ iter->Prev();
+ }
+
+ // handle inputs less than minimum
+ while (i > 0) {
+ result[indices[i - 1]] = static_cast<int>(td.TotalWeight());
+ i--;
+ }
+ return Status::OK();
+}
Review Comment:
Add a newline to the file.
##########
src/commands/cmd_tdigest.cc:
##########
@@ -176,6 +176,49 @@ class CommandTDigestAdd : public Commander {
std::vector<double> values_;
};
+class CommandTDigestRevRank : public Commander {
+ public:
+ Status Parse(const std::vector<std::string> &args) override {
+ key_name_ = args[1];
+ inputs_.reserve(args.size() - 2);
+ for (size_t i = 2; i < args.size(); i++) {
+ auto value = ParseFloat(args[i]);
+ if (!value) {
+ return {Status::RedisParseErr, errValueIsNotFloat};
+ }
+ inputs_.push_back(*value);
+ }
+ return Status::OK();
+ }
+ Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
+ TDigest tdigest(srv->storage, conn->GetNamespace());
+ std::vector<int> result;
+ result.reserve(inputs_.size());
+ if (const auto s = tdigest.RevRank(ctx, key_name_, inputs_, result);
!s.ok()) {
+ if (s.IsNotFound()) {
+ return {Status::RedisExecErr, errKeyNotFound};
+ }
+ return {Status::RedisExecErr, s.ToString()};
+ }
+
+ if (result.data()) {
Review Comment:
```suggestion
if (!result.empty()) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]