This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 64fbd28f feat(tdigest): add TDIGEST.CREATE and TDIGEST.INFO command 
(#2799)
64fbd28f is described below

commit 64fbd28f4f49123a64e8f464d5039df21db3110e
Author: Edward Xu <[email protected]>
AuthorDate: Sat Feb 22 21:39:44 2025 +0800

    feat(tdigest): add TDIGEST.CREATE and TDIGEST.INFO command (#2799)
    
    Co-authored-by: hulk <[email protected]>
    Co-authored-by: Twice <[email protected]>
---
 src/commands/cmd_tdigest.cc                    | 136 +++++++++++++++++++++++
 src/commands/commander.h                       |   1 +
 src/commands/error_constants.h                 |   5 +
 src/types/redis_tdigest.cc                     |  19 ++--
 src/types/redis_tdigest.h                      |   4 +
 tests/cppunit/types/tdigest_test.cc            |   3 +-
 tests/gocase/unit/type/tdigest/tdigest_test.go | 144 +++++++++++++++++++++++++
 7 files changed, 302 insertions(+), 10 deletions(-)

diff --git a/src/commands/cmd_tdigest.cc b/src/commands/cmd_tdigest.cc
new file mode 100644
index 00000000..65876e5d
--- /dev/null
+++ b/src/commands/cmd_tdigest.cc
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "command_parser.h"
+#include "commander.h"
+#include "server/redis_reply.h"
+#include "server/server.h"
+#include "status.h"
+#include "types/redis_tdigest.h"
+
+namespace redis {
+namespace {
+constexpr auto kCompressionArg = "compression";
+
+constexpr auto kInfoCompression = "Compression";
+constexpr auto kInfoCapacity = "Capacity";
+constexpr auto kInfoMergedNodes = "Merged nodes";
+constexpr auto kInfoUnmergedNodes = "Unmerged nodes";
+constexpr auto kInfoMergedWeight = "Merged weight";
+constexpr auto kInfoUnmergedWeight = "Unmerged weight";
+constexpr auto kInfoObservations = "Observations";
+constexpr auto kInfoTotalCompressions = "Total compressions";
+}  // namespace
+
+class CommandTDigestCreate : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    CommandParser parser(args, 1);
+    key_name_ = GET_OR_RET(parser.TakeStr());
+    options_.compression = 100;
+    if (parser.EatEqICase(kCompressionArg)) {
+      if (!parser.Good()) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+      auto status = parser.TakeInt<int32_t>();
+      if (!status) {
+        return {Status::RedisParseErr, errParseCompression};
+      }
+      auto compression = *status;
+      if (compression <= 0) {
+        return {Status::RedisParseErr, errCompressionMustBePositive};
+      }
+      if (compression < 1 || compression > 
static_cast<int32_t>(kTDigestMaxCompression)) {
+        return {Status::RedisParseErr, errCompressionOutOfRange};
+      }
+      options_.compression = static_cast<uint32_t>(compression);
+    }
+    if (parser.Good()) {
+      return {Status::RedisParseErr, errWrongNumOfArguments};
+    }
+
+    return Status::OK();
+  }
+
+  Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
+    TDigest tdigest(srv->storage, conn->GetNamespace());
+    bool exists = false;
+    auto s = tdigest.Create(ctx, key_name_, options_, &exists);
+    if (!s.ok()) {
+      if (exists) {
+        return {Status::RedisExecErr, errKeyAlreadyExists};
+      }
+      return {Status::RedisExecErr, s.ToString()};
+    }
+    *output = redis::RESP_OK;
+    return Status::OK();
+  }
+
+ private:
+  std::string key_name_;
+  TDigestCreateOptions options_;
+};
+
+class CommandTDigestInfo : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    key_name_ = args[1];
+    return Status::OK();
+  }
+
+  Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
+    TDigest tdigest(srv->storage, conn->GetNamespace());
+    TDigestMetadata metadata;
+    auto s = tdigest.GetMetaData(ctx, key_name_, &metadata);
+    if (!s.ok()) {
+      if (s.IsNotFound()) {
+        return {Status::RedisExecErr, errKeyNotFound};
+      }
+      return {Status::RedisExecErr, s.ToString()};
+    }
+
+    output->append(conn->HeaderOfMap(8));
+    output->append(redis::BulkString(kInfoCompression));
+    output->append(redis::Integer(metadata.compression));
+    output->append(redis::BulkString(kInfoCapacity));
+    output->append(redis::Integer(metadata.capacity));
+    output->append(redis::BulkString(kInfoMergedNodes));
+    output->append(redis::Integer(metadata.merged_nodes));
+    output->append(redis::BulkString(kInfoUnmergedNodes));
+    output->append(redis::Integer(metadata.unmerged_nodes));
+    output->append(redis::BulkString(kInfoMergedWeight));
+    output->append(redis::Integer(metadata.merged_weight));
+    output->append(redis::BulkString(kInfoUnmergedWeight));
+    output->append(redis::Integer(metadata.total_weight - 
metadata.merged_weight));
+    output->append(redis::BulkString(kInfoObservations));
+    output->append(redis::Integer(metadata.total_observations));
+    output->append(redis::BulkString(kInfoTotalCompressions));
+    output->append(redis::Integer(metadata.merge_times));
+    // "Memory usage" is not meaningful for kvrocks, so we don't provide it 
here.
+    return Status::OK();
+  }
+
+ private:
+  std::string key_name_;
+};
+
+REDIS_REGISTER_COMMANDS(TDigest, 
MakeCmdAttr<CommandTDigestCreate>("tdigest.create", -2, "write", 1, 1, 1),
+                        MakeCmdAttr<CommandTDigestInfo>("tdigest.info", 2, 
"read-only", 1, 1, 1));
+}  // namespace redis
diff --git a/src/commands/commander.h b/src/commands/commander.h
index a4dd8b2d..8517b84f 100644
--- a/src/commands/commander.h
+++ b/src/commands/commander.h
@@ -108,6 +108,7 @@ enum class CommandCategory : uint8_t {
   SortedInt,
   Stream,
   String,
+  TDigest,
   Txn,
   ZSet,
 };
diff --git a/src/commands/error_constants.h b/src/commands/error_constants.h
index fc33b60d..26b2e692 100644
--- a/src/commands/error_constants.h
+++ b/src/commands/error_constants.h
@@ -45,4 +45,9 @@ inline constexpr const char *errNoMatchingScript = "No 
matching script. Please u
 inline constexpr const char *errUnknownOption = "unknown option";
 inline constexpr const char *errUnknownSubcommandOrWrongArguments = "Unknown 
subcommand or wrong number of arguments";
 inline constexpr const char *errRestoringBackup = "kvrocks is restoring the db 
from backup";
+inline constexpr const char *errParseCompression = "error parsing compression 
parameter";
+inline constexpr const char *errCompressionMustBePositive = "compression 
parameter needs to be a positive integer";
+inline constexpr const char *errCompressionOutOfRange = "compression must be 
between 1 and 1000";
+inline constexpr const char *errKeyNotFound = "key does not exist";
+inline constexpr const char *errKeyAlreadyExists = "key already exists";
 }  // namespace redis
diff --git a/src/types/redis_tdigest.cc b/src/types/redis_tdigest.cc
index e29b35f0..5cfbbfcd 100644
--- a/src/types/redis_tdigest.cc
+++ b/src/types/redis_tdigest.cc
@@ -108,12 +108,11 @@ class DummyCentroids {
 };
 
 uint32_t constexpr kMaxElements = 1 * 1024;  // 1k doubles
-uint32_t constexpr kMaxCompression = 1000;   // limit the compression to 1k
 
 rocksdb::Status TDigest::Create(engine::Context& ctx, const Slice& 
digest_name, const TDigestCreateOptions& options,
                                 bool* exists) {
-  if (options.compression > kMaxCompression) {
-    return rocksdb::Status::InvalidArgument(fmt::format("compression should be 
less than {}", kMaxCompression));
+  if (options.compression > kTDigestMaxCompression) {
+    return rocksdb::Status::InvalidArgument(fmt::format("compression should be 
less than {}", kTDigestMaxCompression));
   }
 
   auto ns_key = AppendNamespacePrefix(digest_name);
@@ -121,8 +120,7 @@ rocksdb::Status TDigest::Create(engine::Context& ctx, const 
Slice& digest_name,
   capacity = ((capacity < kMaxElements) ? capacity : kMaxElements);
   TDigestMetadata metadata(options.compression, capacity);
 
-  LockGuard guard(storage_->GetLockManager(), ns_key);
-  auto status = GetMetaData(ctx, ns_key, &metadata);
+  auto status = getMetaDataByNsKey(ctx, ns_key, &metadata);
   *exists = status.ok();
   if (*exists) {
     return rocksdb::Status::InvalidArgument("tdigest already exists");
@@ -152,7 +150,7 @@ rocksdb::Status TDigest::Add(engine::Context& ctx, const 
Slice& digest_name, con
   LockGuard guard(storage_->GetLockManager(), ns_key);
 
   TDigestMetadata metadata;
-  if (auto status = GetMetaData(ctx, ns_key, &metadata); !status.ok()) {
+  if (auto status = getMetaDataByNsKey(ctx, ns_key, &metadata); !status.ok()) {
     return status;
   }
 
@@ -192,7 +190,7 @@ rocksdb::Status TDigest::Quantile(engine::Context& ctx, 
const Slice& digest_name
   {
     LockGuard guard(storage_->GetLockManager(), ns_key);
 
-    if (auto status = GetMetaData(ctx, ns_key, &metadata); !status.ok()) {
+    if (auto status = getMetaDataByNsKey(ctx, ns_key, &metadata); 
!status.ok()) {
       return status;
     }
 
@@ -239,7 +237,12 @@ rocksdb::Status TDigest::Quantile(engine::Context& ctx, 
const Slice& digest_name
   return rocksdb::Status::OK();
 }
 
-rocksdb::Status TDigest::GetMetaData(engine::Context& context, const Slice& 
ns_key, TDigestMetadata* metadata) {
+rocksdb::Status TDigest::GetMetaData(engine::Context& context, const Slice& 
digest_name, TDigestMetadata* metadata) {
+  auto ns_key = AppendNamespacePrefix(digest_name);
+  return Database::GetMetadata(context, {kRedisTDigest}, ns_key, metadata);
+}
+
+rocksdb::Status TDigest::getMetaDataByNsKey(engine::Context& context, const 
Slice& ns_key, TDigestMetadata* metadata) {
   return Database::GetMetadata(context, {kRedisTDigest}, ns_key, metadata);
 }
 
diff --git a/src/types/redis_tdigest.h b/src/types/redis_tdigest.h
index c1621338..eccb2007 100644
--- a/src/types/redis_tdigest.h
+++ b/src/types/redis_tdigest.h
@@ -32,6 +32,8 @@
 #include "tdigest.h"
 
 namespace redis {
+inline constexpr uint32_t kTDigestMaxCompression = 1000;  // limit the 
compression to 1k
+
 struct CentroidWithKey {
   Centroid centroid;
   rocksdb::Slice key;
@@ -72,6 +74,8 @@ class TDigest : public SubKeyScanner {
 
   rocksdb::ColumnFamilyHandle* cf_handle_;
 
+  rocksdb::Status getMetaDataByNsKey(engine::Context& context, const Slice& 
digest_name, TDigestMetadata* metadata);
+
   rocksdb::Status appendBuffer(engine::Context& ctx, 
ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch,
                                const std::string& ns_key, const 
std::vector<double>& inputs, TDigestMetadata* metadata);
 
diff --git a/tests/cppunit/types/tdigest_test.cc 
b/tests/cppunit/types/tdigest_test.cc
index 849c27f6..7d2b6e6e 100644
--- a/tests/cppunit/types/tdigest_test.cc
+++ b/tests/cppunit/types/tdigest_test.cc
@@ -150,9 +150,8 @@ TEST_F(RedisTDigestTest, Create) {
   ASSERT_TRUE(exists);
   ASSERT_TRUE(status.IsInvalidArgument());
 
-  auto ns_key = tdigest_->AppendNamespacePrefix(test_digest_name);
   TDigestMetadata metadata;
-  auto get_status = tdigest_->GetMetaData(*ctx_, ns_key, &metadata);
+  auto get_status = tdigest_->GetMetaData(*ctx_, test_digest_name, &metadata);
   ASSERT_TRUE(get_status.ok()) << get_status.ToString();
   ASSERT_EQ(metadata.compression, 100) << metadata.compression;
 }
diff --git a/tests/gocase/unit/type/tdigest/tdigest_test.go 
b/tests/gocase/unit/type/tdigest/tdigest_test.go
new file mode 100644
index 00000000..a9d4b3e7
--- /dev/null
+++ b/tests/gocase/unit/type/tdigest/tdigest_test.go
@@ -0,0 +1,144 @@
+//go:build !ignore_when_tsan
+
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+ */
+
+package tdigest
+
+import (
+       "context"
+       "testing"
+
+       "github.com/apache/kvrocks/tests/gocase/util"
+       "github.com/stretchr/testify/require"
+)
+
+const (
+       errMsgWrongNumberArg   = "wrong number of arguments"
+       errMsgParseCompression = "error parsing compression parameter"
+       errMsgNeedToBePositive = "compression parameter needs to be a positive 
integer"
+       errMsgMustInRange      = "compression must be between 1 and 1000"
+       errMsgKeyAlreadyExists = "key already exists"
+       errMsgKeyNotExist      = "key does not exist"
+)
+
+type tdigestInfo struct {
+       Compression       int64
+       Capacity          int64
+       MergedNodes       int64
+       UnmergedNodes     int64
+       MergedWeight      int64
+       UnmergedWeight    int64
+       Observations      int64
+       TotalCompressions int64
+       // memory usgae is not useful, we do not support it now
+}
+
+func toTdigestInfo(t *testing.T, value interface{}) tdigestInfo {
+       require.IsType(t, map[interface{}]interface{}{}, value)
+       v := value.(map[interface{}]interface{})
+       return tdigestInfo{
+               Compression:       v["Compression"].(int64),
+               Capacity:          v["Capacity"].(int64),
+               MergedNodes:       v["Merged nodes"].(int64),
+               UnmergedNodes:     v["Unmerged nodes"].(int64),
+               MergedWeight:      v["Merged weight"].(int64),
+               UnmergedWeight:    v["Unmerged weight"].(int64),
+               Observations:      v["Observations"].(int64),
+               TotalCompressions: v["Total compressions"].(int64),
+       }
+}
+
+func TestTDigest(t *testing.T) {
+       configOptions := []util.ConfigOptions{
+               {
+                       Name:       "txn-context-enabled",
+                       Options:    []string{"yes", "no"},
+                       ConfigType: util.YesNo,
+               },
+       }
+
+       configsMatrix, err := util.GenerateConfigsMatrix(configOptions)
+       require.NoError(t, err)
+
+       for _, configs := range configsMatrix {
+               tdigestTests(t, configs)
+       }
+}
+
+func tdigestTests(t *testing.T, configs util.KvrocksServerConfigs) {
+       srv := util.StartServer(t, configs)
+       defer srv.Close()
+       ctx := context.Background()
+       rdb := srv.NewClient()
+       defer func() { require.NoError(t, rdb.Close()) }()
+
+       t.Run("tdigest.create with different arguments", func(t *testing.T) {
+               keyPrefix := "tdigest_create_"
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CREATE").Err(), 
errMsgWrongNumberArg)
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CREATE", 
keyPrefix+"key", "hahah").Err(), errMsgWrongNumberArg)
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CREATE", 
keyPrefix+"key", "1", "hahah").Err(), errMsgWrongNumberArg)
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CREATE", 
keyPrefix+"key", "compression").Err(), errMsgWrongNumberArg)
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CREATE", 
keyPrefix+"key", "compression", "hahah").Err(), errMsgParseCompression)
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CREATE", 
keyPrefix+"key", "compression", "0").Err(), errMsgNeedToBePositive)
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CREATE", 
keyPrefix+"key", "compression", "-1").Err(), errMsgNeedToBePositive)
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CREATE", 
keyPrefix+"key", "compression", "0.1").Err(), errMsgParseCompression)
+
+               require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", 
keyPrefix+"key0", "compression", "1").Err())
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CREATE", 
keyPrefix+"key0", "compression", "1").Err(), errMsgKeyAlreadyExists)
+               require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", 
keyPrefix+"key1", "compression", "1000").Err())
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CREATE", 
keyPrefix+"key", "compression", "1001").Err(), errMsgMustInRange)
+       })
+
+       t.Run("tdigest.info with different arguments", func(t *testing.T) {
+               keyPrefix := "tdigest_info_"
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.INFO").Err(), 
errMsgWrongNumberArg)
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.INFO", 
keyPrefix+"key", "hahah").Err(), errMsgWrongNumberArg)
+               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.INFO", 
keyPrefix+"key").Err(), errMsgKeyNotExist)
+               require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", 
keyPrefix+"key0", "compression", "1").Err())
+               {
+                       rsp := rdb.Do(ctx, "TDIGEST.INFO", keyPrefix+"key0")
+                       require.NoError(t, rsp.Err())
+                       info := toTdigestInfo(t, rsp.Val())
+                       require.EqualValues(t, 1, info.Compression)
+                       require.EqualValues(t, 1*6+10, info.Capacity)
+                       require.EqualValues(t, 0, info.MergedNodes)
+                       require.EqualValues(t, 0, info.UnmergedNodes)
+                       require.EqualValues(t, 0, info.MergedWeight)
+                       require.EqualValues(t, 0, info.UnmergedWeight)
+                       require.EqualValues(t, 0, info.Observations)
+                       require.EqualValues(t, 0, info.TotalCompressions)
+               }
+
+               {
+                       require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", 
keyPrefix+"key1", "compression", "1000").Err())
+                       rsp := rdb.Do(ctx, "TDIGEST.INFO", keyPrefix+"key1")
+                       require.NoError(t, rsp.Err())
+                       info := toTdigestInfo(t, rsp.Val())
+                       require.EqualValues(t, 1000, info.Compression)
+                       require.EqualValues(t, 1024, info.Capacity) // max is 
1024
+                       require.EqualValues(t, 0, info.MergedNodes)
+                       require.EqualValues(t, 0, info.UnmergedNodes)
+                       require.EqualValues(t, 0, info.MergedWeight)
+                       require.EqualValues(t, 0, info.UnmergedWeight)
+                       require.EqualValues(t, 0, info.Observations)
+                       require.EqualValues(t, 0, info.TotalCompressions)
+               }
+       })
+}

Reply via email to