This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new f0251470e feat(tdigest): add support of TDIGEST.RESET command (#2826)
f0251470e is described below

commit f0251470ecb78f186977feb8b8230187e09830bc
Author: Anirudh Lakhanpal <[email protected]>
AuthorDate: Thu Mar 20 12:13:26 2025 +0530

    feat(tdigest): add support of TDIGEST.RESET command (#2826)
    
    Co-authored-by: Rivers <[email protected]>
    Co-authored-by: Twice <[email protected]>
---
 src/commands/cmd_tdigest.cc                    | 32 +++++++++++++++-
 src/types/redis_tdigest.cc                     | 38 ++++++++++++++++++
 src/types/redis_tdigest.h                      |  1 +
 tests/gocase/unit/type/tdigest/tdigest_test.go | 53 ++++++++++++++++++++++++++
 4 files changed, 123 insertions(+), 1 deletion(-)

diff --git a/src/commands/cmd_tdigest.cc b/src/commands/cmd_tdigest.cc
index 4c62ffb5b..166a22afb 100644
--- a/src/commands/cmd_tdigest.cc
+++ b/src/commands/cmd_tdigest.cc
@@ -202,7 +202,36 @@ class CommandTDigestMinMax : public Commander {
   std::string key_name_;
   bool is_min_;
 };
+class CommandTDigestReset : public Commander {
+  Status Parse(const std::vector<std::string> &args) override {
+    key_name_ = args[1];
+    return Status::OK();
+  }
+  Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
+    TDigest tdigest(srv->storage, conn->GetNamespace());
+    TDigestMetadata metadata;
+    auto s = tdigest.GetMetaData(ctx, key_name_, &metadata);
+    if (!s.ok()) {
+      if (s.IsNotFound()) {
+        return {Status::RedisExecErr, errKeyNotFound};
+      }
+      return {Status::RedisExecErr, s.ToString()};
+    }
+    if (metadata.total_observations == 0) {
+      *output = redis::RESP_OK;
+      return Status::OK();
+    }
+    s = tdigest.Reset(ctx, key_name_);
+    if (!s.ok()) {
+      return {Status::RedisExecErr, s.ToString()};
+    }
+    *output = redis::RESP_OK;
+    return Status::OK();
+  }
 
+ private:
+  std::string key_name_;
+};
 // Then replace the existing template implementation and type aliases with:
 class CommandTDigestMin : public CommandTDigestMinMax {
  public:
@@ -218,5 +247,6 @@ REDIS_REGISTER_COMMANDS(TDigest, 
MakeCmdAttr<CommandTDigestCreate>("tdigest.crea
                         MakeCmdAttr<CommandTDigestInfo>("tdigest.info", 2, 
"read-only", 1, 1, 1),
                         MakeCmdAttr<CommandTDigestAdd>("tdigest.add", -3, 
"write", 1, 1, 1),
                         MakeCmdAttr<CommandTDigestMax>("tdigest.max", 2, 
"read-only", 1, 1, 1),
-                        MakeCmdAttr<CommandTDigestMin>("tdigest.min", 2, 
"read-only", 1, 1, 1));
+                        MakeCmdAttr<CommandTDigestMin>("tdigest.min", 2, 
"read-only", 1, 1, 1),
+                        MakeCmdAttr<CommandTDigestReset>("tdigest.reset", 2, 
"write", 1, 1, 1));
 }  // namespace redis
diff --git a/src/types/redis_tdigest.cc b/src/types/redis_tdigest.cc
index 0ee55740c..e9f31d2b1 100644
--- a/src/types/redis_tdigest.cc
+++ b/src/types/redis_tdigest.cc
@@ -238,7 +238,45 @@ rocksdb::Status TDigest::Quantile(engine::Context& ctx, 
const Slice& digest_name
 
   return rocksdb::Status::OK();
 }
+rocksdb::Status TDigest::Reset(engine::Context& ctx, const Slice& digest_name) 
{
+  auto ns_key = AppendNamespacePrefix(digest_name);
+
+  TDigestMetadata metadata;
+  if (auto status = getMetaDataByNsKey(ctx, ns_key, &metadata); !status.ok()) {
+    return status;
+  }
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisTDigest);
+  if (auto status = batch->PutLogData(log_data.Encode()); !status.ok()) {
+    return status;
+  }
+
+  metadata.unmerged_nodes = 0;
+  metadata.merged_nodes = 0;
+  metadata.total_weight = 0;
+  metadata.merged_weight = 0;
+  metadata.minimum = std::numeric_limits<double>::max();
+  metadata.maximum = std::numeric_limits<double>::lowest();
+  metadata.total_observations = 0;
+  metadata.merge_times = 0;
 
+  std::string metadata_bytes;
+  metadata.Encode(&metadata_bytes);
+
+  if (auto status = batch->Put(metadata_cf_handle_, ns_key, metadata_bytes); 
!status.ok()) {
+    return status;
+  }
+
+  auto start_key = internalSegmentGuardPrefixKey(metadata, ns_key, 
SegmentType::kBuffer);
+  auto guard_key = internalSegmentGuardPrefixKey(metadata, ns_key, 
SegmentType::kGuardFlag);
+
+  if (auto status = batch->DeleteRange(cf_handle_, start_key, guard_key); 
!status.ok()) {
+    return status;
+  }
+  auto status = storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+  return status;
+}
 rocksdb::Status TDigest::GetMetaData(engine::Context& context, const Slice& 
digest_name, TDigestMetadata* metadata) {
   auto ns_key = AppendNamespacePrefix(digest_name);
   return Database::GetMetadata(context, {kRedisTDigest}, ns_key, metadata);
diff --git a/src/types/redis_tdigest.h b/src/types/redis_tdigest.h
index eccb20074..33da44a67 100644
--- a/src/types/redis_tdigest.h
+++ b/src/types/redis_tdigest.h
@@ -67,6 +67,7 @@ class TDigest : public SubKeyScanner {
   rocksdb::Status Quantile(engine::Context& ctx, const Slice& digest_name, 
const std::vector<double>& qs,
                            TDigestQuantitleResult* result);
 
+  rocksdb::Status Reset(engine::Context& ctx, const Slice& digest_name);
   rocksdb::Status GetMetaData(engine::Context& context, const Slice& 
digest_name, TDigestMetadata* metadata);
 
  private:
diff --git a/tests/gocase/unit/type/tdigest/tdigest_test.go 
b/tests/gocase/unit/type/tdigest/tdigest_test.go
index 0183144c3..d3f118004 100644
--- a/tests/gocase/unit/type/tdigest/tdigest_test.go
+++ b/tests/gocase/unit/type/tdigest/tdigest_test.go
@@ -257,4 +257,57 @@ func tdigestTests(t *testing.T, configs 
util.KvrocksServerConfigs) {
                require.NoError(t, rsp.Err())
                require.Equal(t, "-10.5", rsp.Val())
        })
+       t.Run("tdigest.reset with different arguments", func(t *testing.T) {
+               keyPrefix := "tdigest_reset_"
+
+               // Testing with no arguments to .RESET
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.RESET").Err(), 
errMsgWrongNumberArg)
+
+               require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", 
keyPrefix+"mydigest", "compression", "101").Err())
+
+               key := keyPrefix + "mydigest"
+               // Adding some data to digest
+               require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", key, "-84.3", 
"199.3", "343.34", "12.34").Err())
+
+               // Checking MIN value to ensure data was added
+               rsp := rdb.Do(ctx, "TDIGEST.MIN", key)
+               require.NoError(t, rsp.Err())
+               require.EqualValues(t, rsp.Val(), "-84.3")
+
+               // Reset on a non-existent key
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.RESET", 
keyPrefix+"notexist").Err(), errMsgKeyNotExist)
+
+               // Get TDIGEST.INFO before reset
+               rsp = rdb.Do(ctx, "TDIGEST.INFO", key)
+               require.NoError(t, rsp.Err())
+               infoBeforeReset := toTdigestInfo(t, rsp.Val())
+
+               // Perform the reset
+               require.NoError(t, rdb.Do(ctx, "TDIGEST.RESET", key).Err())
+
+               // Get TDIGEST.INFO after reset
+               rsp = rdb.Do(ctx, "TDIGEST.INFO", key)
+               require.NoError(t, rsp.Err())
+               infoAfterReset := toTdigestInfo(t, rsp.Val())
+
+               // Ensure capacity remains unchanged
+               require.EqualValues(t, infoBeforeReset.Capacity, 
infoAfterReset.Capacity)
+               require.EqualValues(t, 101, infoAfterReset.Compression)
+               require.EqualValues(t, 0, infoAfterReset.MergedNodes)
+               require.EqualValues(t, 0, infoAfterReset.UnmergedNodes)
+               require.EqualValues(t, 0, infoAfterReset.Observations)
+               require.EqualValues(t, 0, infoAfterReset.TotalCompressions)
+
+               // Reset on an empty digest
+               emptyDigestKey := keyPrefix + "empty"
+               require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", 
emptyDigestKey, "COMPRESSION", "100").Err())
+               rsp = rdb.Do(ctx, "TDIGEST.RESET", emptyDigestKey)
+               require.NoError(t, rsp.Err())
+
+               // Ensure empty digest's capacity remains the same
+               rsp = rdb.Do(ctx, "TDIGEST.INFO", emptyDigestKey)
+               require.NoError(t, rsp.Err())
+               infoAfterEmptyReset := toTdigestInfo(t, rsp.Val())
+               require.EqualValues(t, 100, infoAfterEmptyReset.Compression)
+       })
 }

Reply via email to