This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new f0251470e feat(tdigest): add support of TDIGEST.RESET command (#2826)
f0251470e is described below
commit f0251470ecb78f186977feb8b8230187e09830bc
Author: Anirudh Lakhanpal <[email protected]>
AuthorDate: Thu Mar 20 12:13:26 2025 +0530
feat(tdigest): add support of TDIGEST.RESET command (#2826)
Co-authored-by: Rivers <[email protected]>
Co-authored-by: Twice <[email protected]>
---
src/commands/cmd_tdigest.cc | 32 +++++++++++++++-
src/types/redis_tdigest.cc | 38 ++++++++++++++++++
src/types/redis_tdigest.h | 1 +
tests/gocase/unit/type/tdigest/tdigest_test.go | 53 ++++++++++++++++++++++++++
4 files changed, 123 insertions(+), 1 deletion(-)
diff --git a/src/commands/cmd_tdigest.cc b/src/commands/cmd_tdigest.cc
index 4c62ffb5b..166a22afb 100644
--- a/src/commands/cmd_tdigest.cc
+++ b/src/commands/cmd_tdigest.cc
@@ -202,7 +202,36 @@ class CommandTDigestMinMax : public Commander {
std::string key_name_;
bool is_min_;
};
+class CommandTDigestReset : public Commander {
+ Status Parse(const std::vector<std::string> &args) override {
+ key_name_ = args[1];
+ return Status::OK();
+ }
+ Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
+ TDigest tdigest(srv->storage, conn->GetNamespace());
+ TDigestMetadata metadata;
+ auto s = tdigest.GetMetaData(ctx, key_name_, &metadata);
+ if (!s.ok()) {
+ if (s.IsNotFound()) {
+ return {Status::RedisExecErr, errKeyNotFound};
+ }
+ return {Status::RedisExecErr, s.ToString()};
+ }
+ if (metadata.total_observations == 0) {
+ *output = redis::RESP_OK;
+ return Status::OK();
+ }
+ s = tdigest.Reset(ctx, key_name_);
+ if (!s.ok()) {
+ return {Status::RedisExecErr, s.ToString()};
+ }
+ *output = redis::RESP_OK;
+ return Status::OK();
+ }
+ private:
+ std::string key_name_;
+};
// Then replace the existing template implementation and type aliases with:
class CommandTDigestMin : public CommandTDigestMinMax {
public:
@@ -218,5 +247,6 @@ REDIS_REGISTER_COMMANDS(TDigest,
MakeCmdAttr<CommandTDigestCreate>("tdigest.crea
MakeCmdAttr<CommandTDigestInfo>("tdigest.info", 2,
"read-only", 1, 1, 1),
MakeCmdAttr<CommandTDigestAdd>("tdigest.add", -3,
"write", 1, 1, 1),
MakeCmdAttr<CommandTDigestMax>("tdigest.max", 2,
"read-only", 1, 1, 1),
- MakeCmdAttr<CommandTDigestMin>("tdigest.min", 2,
"read-only", 1, 1, 1));
+ MakeCmdAttr<CommandTDigestMin>("tdigest.min", 2,
"read-only", 1, 1, 1),
+ MakeCmdAttr<CommandTDigestReset>("tdigest.reset", 2,
"write", 1, 1, 1));
} // namespace redis
diff --git a/src/types/redis_tdigest.cc b/src/types/redis_tdigest.cc
index 0ee55740c..e9f31d2b1 100644
--- a/src/types/redis_tdigest.cc
+++ b/src/types/redis_tdigest.cc
@@ -238,7 +238,45 @@ rocksdb::Status TDigest::Quantile(engine::Context& ctx,
const Slice& digest_name
return rocksdb::Status::OK();
}
+rocksdb::Status TDigest::Reset(engine::Context& ctx, const Slice& digest_name)
{
+ auto ns_key = AppendNamespacePrefix(digest_name);
+
+ TDigestMetadata metadata;
+ if (auto status = getMetaDataByNsKey(ctx, ns_key, &metadata); !status.ok()) {
+ return status;
+ }
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisTDigest);
+ if (auto status = batch->PutLogData(log_data.Encode()); !status.ok()) {
+ return status;
+ }
+
+ metadata.unmerged_nodes = 0;
+ metadata.merged_nodes = 0;
+ metadata.total_weight = 0;
+ metadata.merged_weight = 0;
+ metadata.minimum = std::numeric_limits<double>::max();
+ metadata.maximum = std::numeric_limits<double>::lowest();
+ metadata.total_observations = 0;
+ metadata.merge_times = 0;
+ std::string metadata_bytes;
+ metadata.Encode(&metadata_bytes);
+
+ if (auto status = batch->Put(metadata_cf_handle_, ns_key, metadata_bytes);
!status.ok()) {
+ return status;
+ }
+
+ auto start_key = internalSegmentGuardPrefixKey(metadata, ns_key,
SegmentType::kBuffer);
+ auto guard_key = internalSegmentGuardPrefixKey(metadata, ns_key,
SegmentType::kGuardFlag);
+
+ if (auto status = batch->DeleteRange(cf_handle_, start_key, guard_key);
!status.ok()) {
+ return status;
+ }
+ auto status = storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+ return status;
+}
rocksdb::Status TDigest::GetMetaData(engine::Context& context, const Slice&
digest_name, TDigestMetadata* metadata) {
auto ns_key = AppendNamespacePrefix(digest_name);
return Database::GetMetadata(context, {kRedisTDigest}, ns_key, metadata);
diff --git a/src/types/redis_tdigest.h b/src/types/redis_tdigest.h
index eccb20074..33da44a67 100644
--- a/src/types/redis_tdigest.h
+++ b/src/types/redis_tdigest.h
@@ -67,6 +67,7 @@ class TDigest : public SubKeyScanner {
rocksdb::Status Quantile(engine::Context& ctx, const Slice& digest_name,
const std::vector<double>& qs,
TDigestQuantitleResult* result);
+ rocksdb::Status Reset(engine::Context& ctx, const Slice& digest_name);
rocksdb::Status GetMetaData(engine::Context& context, const Slice&
digest_name, TDigestMetadata* metadata);
private:
diff --git a/tests/gocase/unit/type/tdigest/tdigest_test.go
b/tests/gocase/unit/type/tdigest/tdigest_test.go
index 0183144c3..d3f118004 100644
--- a/tests/gocase/unit/type/tdigest/tdigest_test.go
+++ b/tests/gocase/unit/type/tdigest/tdigest_test.go
@@ -257,4 +257,57 @@ func tdigestTests(t *testing.T, configs
util.KvrocksServerConfigs) {
require.NoError(t, rsp.Err())
require.Equal(t, "-10.5", rsp.Val())
})
+ t.Run("tdigest.reset with different arguments", func(t *testing.T) {
+ keyPrefix := "tdigest_reset_"
+
+ // Testing with no arguments to .RESET
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.RESET").Err(),
errMsgWrongNumberArg)
+
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE",
keyPrefix+"mydigest", "compression", "101").Err())
+
+ key := keyPrefix + "mydigest"
+ // Adding some data to digest
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", key, "-84.3",
"199.3", "343.34", "12.34").Err())
+
+ // Checking MIN value to ensure data was added
+ rsp := rdb.Do(ctx, "TDIGEST.MIN", key)
+ require.NoError(t, rsp.Err())
+ require.EqualValues(t, rsp.Val(), "-84.3")
+
+ // Reset on a non-existent key
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.RESET",
keyPrefix+"notexist").Err(), errMsgKeyNotExist)
+
+ // Get TDIGEST.INFO before reset
+ rsp = rdb.Do(ctx, "TDIGEST.INFO", key)
+ require.NoError(t, rsp.Err())
+ infoBeforeReset := toTdigestInfo(t, rsp.Val())
+
+ // Perform the reset
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.RESET", key).Err())
+
+ // Get TDIGEST.INFO after reset
+ rsp = rdb.Do(ctx, "TDIGEST.INFO", key)
+ require.NoError(t, rsp.Err())
+ infoAfterReset := toTdigestInfo(t, rsp.Val())
+
+ // Ensure capacity remains unchanged
+ require.EqualValues(t, infoBeforeReset.Capacity,
infoAfterReset.Capacity)
+ require.EqualValues(t, 101, infoAfterReset.Compression)
+ require.EqualValues(t, 0, infoAfterReset.MergedNodes)
+ require.EqualValues(t, 0, infoAfterReset.UnmergedNodes)
+ require.EqualValues(t, 0, infoAfterReset.Observations)
+ require.EqualValues(t, 0, infoAfterReset.TotalCompressions)
+
+ // Reset on an empty digest
+ emptyDigestKey := keyPrefix + "empty"
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE",
emptyDigestKey, "COMPRESSION", "100").Err())
+ rsp = rdb.Do(ctx, "TDIGEST.RESET", emptyDigestKey)
+ require.NoError(t, rsp.Err())
+
+ // Ensure empty digest's capacity remains the same
+ rsp = rdb.Do(ctx, "TDIGEST.INFO", emptyDigestKey)
+ require.NoError(t, rsp.Err())
+ infoAfterEmptyReset := toTdigestInfo(t, rsp.Val())
+ require.EqualValues(t, 100, infoAfterEmptyReset.Compression)
+ })
}