This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 0e9c93c7 feat(tdigest): add TDIGEST.ADD command (#2803)
0e9c93c7 is described below

commit 0e9c93c726b9869c34db4fb7aa0fcd302e948100
Author: Uniqueyou <[email protected]>
AuthorDate: Thu Feb 27 16:45:49 2025 +0800

    feat(tdigest): add TDIGEST.ADD command (#2803)
---
 src/commands/cmd_tdigest.cc                    | 40 ++++++++++++++++-
 src/types/redis_tdigest.cc                     |  1 -
 tests/gocase/unit/type/tdigest/tdigest_test.go | 62 ++++++++++++++++++++++++++
 3 files changed, 101 insertions(+), 2 deletions(-)

diff --git a/src/commands/cmd_tdigest.cc b/src/commands/cmd_tdigest.cc
index 65876e5d..ac699ed0 100644
--- a/src/commands/cmd_tdigest.cc
+++ b/src/commands/cmd_tdigest.cc
@@ -131,6 +131,44 @@ class CommandTDigestInfo : public Commander {
   std::string key_name_;
 };
 
+class CommandTDigestAdd : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    key_name_ = args[1];
+
+    values_.reserve(args.size() - 2);
+    for (size_t i = 2; i < args.size(); i++) {
+      auto value = ParseFloat(args[i]);
+      if (!value) {
+        return {Status::RedisParseErr, errValueIsNotFloat};
+      }
+      values_.push_back(*value);
+    }
+
+    return Status::OK();
+  }
+
+  Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
+    TDigest tdigest(srv->storage, conn->GetNamespace());
+
+    auto s = tdigest.Add(ctx, key_name_, values_);
+    if (!s.ok()) {
+      if (s.IsNotFound()) {
+        return {Status::RedisExecErr, errKeyNotFound};
+      }
+      return {Status::RedisExecErr, s.ToString()};
+    }
+
+    *output = redis::RESP_OK;
+    return Status::OK();
+  }
+
+ private:
+  std::string key_name_;
+  std::vector<double> values_;
+};
+
 REDIS_REGISTER_COMMANDS(TDigest, 
MakeCmdAttr<CommandTDigestCreate>("tdigest.create", -2, "write", 1, 1, 1),
-                        MakeCmdAttr<CommandTDigestInfo>("tdigest.info", 2, 
"read-only", 1, 1, 1));
+                        MakeCmdAttr<CommandTDigestInfo>("tdigest.info", 2, 
"read-only", 1, 1, 1),
+                        MakeCmdAttr<CommandTDigestAdd>("tdigest.add", -3, 
"write", 1, 1, 1));
 }  // namespace redis
diff --git a/src/types/redis_tdigest.cc b/src/types/redis_tdigest.cc
index 5cfbbfcd..6953f476 100644
--- a/src/types/redis_tdigest.cc
+++ b/src/types/redis_tdigest.cc
@@ -147,7 +147,6 @@ rocksdb::Status TDigest::Create(engine::Context& ctx, const 
Slice& digest_name,
 
 rocksdb::Status TDigest::Add(engine::Context& ctx, const Slice& digest_name, 
const std::vector<double>& inputs) {
   auto ns_key = AppendNamespacePrefix(digest_name);
-  LockGuard guard(storage_->GetLockManager(), ns_key);
 
   TDigestMetadata metadata;
   if (auto status = getMetaDataByNsKey(ctx, ns_key, &metadata); !status.ok()) {
diff --git a/tests/gocase/unit/type/tdigest/tdigest_test.go 
b/tests/gocase/unit/type/tdigest/tdigest_test.go
index a9d4b3e7..e9509636 100644
--- a/tests/gocase/unit/type/tdigest/tdigest_test.go
+++ b/tests/gocase/unit/type/tdigest/tdigest_test.go
@@ -141,4 +141,66 @@ func tdigestTests(t *testing.T, configs 
util.KvrocksServerConfigs) {
                        require.EqualValues(t, 0, info.TotalCompressions)
                }
        })
+
+       t.Run("tdigest.add with different arguments", func(t *testing.T) {
+               keyPrefix := "tdigest_add_"
+
+               // Satisfy the number of parameters
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.ADD").Err(), 
errMsgWrongNumberArg)
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.ADD", 
keyPrefix+"key").Err(), errMsgWrongNumberArg)
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.ADD", 
keyPrefix+"key", "abc").Err(), "not a valid float")
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.ADD", 
keyPrefix+"nonexistent", "1.0").Err(), errMsgKeyNotExist)
+
+               // Test adding values to a key
+               key := keyPrefix + "test1"
+               require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", key, 
"compression", "100").Err())
+               require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", key, 
"42.0").Err())
+
+               rsp := rdb.Do(ctx, "TDIGEST.INFO", key)
+               require.NoError(t, rsp.Err())
+               info := toTdigestInfo(t, rsp.Val())
+               require.EqualValues(t, 1, info.UnmergedNodes)
+               require.EqualValues(t, 1, info.Observations)
+
+               require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", key, "1.0", 
"2.0", "3.0", "4.0", "5.0").Err())
+
+               rsp = rdb.Do(ctx, "TDIGEST.INFO", key)
+               require.NoError(t, rsp.Err())
+               info = toTdigestInfo(t, rsp.Val())
+               require.EqualValues(t, 6, info.Observations)
+
+               // Test adding values to a key with compression
+               key2 := keyPrefix + "test2"
+               require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", key2, 
"compression", "100").Err())
+
+               args := []interface{}{key2}
+               for i := 1; i <= 1000; i++ {
+                       args = append(args, float64(i))
+               }
+               require.NoError(t, rdb.Do(ctx, 
append([]interface{}{"TDIGEST.ADD"}, args...)...).Err())
+
+               rsp = rdb.Do(ctx, "TDIGEST.INFO", key2)
+               require.NoError(t, rsp.Err())
+               info = toTdigestInfo(t, rsp.Val())
+               require.EqualValues(t, 1000, info.Observations)
+
+               // Test adding values to a key with compression and merge node
+               key3 := keyPrefix + "test3"
+               require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", key3, 
"compression", "10").Err())
+
+               args = []interface{}{key3}
+               for i := 1; i <= 100; i++ {
+                       args = append(args, float64(i%10))
+               }
+               require.NoError(t, rdb.Do(ctx, 
append([]interface{}{"TDIGEST.ADD"}, args...)...).Err())
+
+               rsp = rdb.Do(ctx, "TDIGEST.INFO", key3)
+               require.NoError(t, rsp.Err())
+               info = toTdigestInfo(t, rsp.Val())
+
+               require.Greater(t, info.MergedNodes, int64(0))
+               require.Greater(t, info.MergedWeight, int64(0))
+               require.EqualValues(t, 100, info.Observations)
+               require.Greater(t, info.TotalCompressions, int64(0))
+       })
 }

Reply via email to