This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 64fbd28f feat(tdigest): add TDIGEST.CREATE and TDIGEST.INFO command
(#2799)
64fbd28f is described below
commit 64fbd28f4f49123a64e8f464d5039df21db3110e
Author: Edward Xu <[email protected]>
AuthorDate: Sat Feb 22 21:39:44 2025 +0800
feat(tdigest): add TDIGEST.CREATE and TDIGEST.INFO command (#2799)
Co-authored-by: hulk <[email protected]>
Co-authored-by: Twice <[email protected]>
---
src/commands/cmd_tdigest.cc | 136 +++++++++++++++++++++++
src/commands/commander.h | 1 +
src/commands/error_constants.h | 5 +
src/types/redis_tdigest.cc | 19 ++--
src/types/redis_tdigest.h | 4 +
tests/cppunit/types/tdigest_test.cc | 3 +-
tests/gocase/unit/type/tdigest/tdigest_test.go | 144 +++++++++++++++++++++++++
7 files changed, 302 insertions(+), 10 deletions(-)
diff --git a/src/commands/cmd_tdigest.cc b/src/commands/cmd_tdigest.cc
new file mode 100644
index 00000000..65876e5d
--- /dev/null
+++ b/src/commands/cmd_tdigest.cc
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "command_parser.h"
+#include "commander.h"
+#include "server/redis_reply.h"
+#include "server/server.h"
+#include "status.h"
+#include "types/redis_tdigest.h"
+
+namespace redis {
+namespace {
+constexpr auto kCompressionArg = "compression";
+
+constexpr auto kInfoCompression = "Compression";
+constexpr auto kInfoCapacity = "Capacity";
+constexpr auto kInfoMergedNodes = "Merged nodes";
+constexpr auto kInfoUnmergedNodes = "Unmerged nodes";
+constexpr auto kInfoMergedWeight = "Merged weight";
+constexpr auto kInfoUnmergedWeight = "Unmerged weight";
+constexpr auto kInfoObservations = "Observations";
+constexpr auto kInfoTotalCompressions = "Total compressions";
+} // namespace
+
+class CommandTDigestCreate : public Commander {
+ public:
+ Status Parse(const std::vector<std::string> &args) override {
+ CommandParser parser(args, 1);
+ key_name_ = GET_OR_RET(parser.TakeStr());
+ options_.compression = 100;
+ if (parser.EatEqICase(kCompressionArg)) {
+ if (!parser.Good()) {
+ return {Status::RedisParseErr, errWrongNumOfArguments};
+ }
+ auto status = parser.TakeInt<int32_t>();
+ if (!status) {
+ return {Status::RedisParseErr, errParseCompression};
+ }
+ auto compression = *status;
+ if (compression <= 0) {
+ return {Status::RedisParseErr, errCompressionMustBePositive};
+ }
+ if (compression < 1 || compression >
static_cast<int32_t>(kTDigestMaxCompression)) {
+ return {Status::RedisParseErr, errCompressionOutOfRange};
+ }
+ options_.compression = static_cast<uint32_t>(compression);
+ }
+ if (parser.Good()) {
+ return {Status::RedisParseErr, errWrongNumOfArguments};
+ }
+
+ return Status::OK();
+ }
+
+ Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
+ TDigest tdigest(srv->storage, conn->GetNamespace());
+ bool exists = false;
+ auto s = tdigest.Create(ctx, key_name_, options_, &exists);
+ if (!s.ok()) {
+ if (exists) {
+ return {Status::RedisExecErr, errKeyAlreadyExists};
+ }
+ return {Status::RedisExecErr, s.ToString()};
+ }
+ *output = redis::RESP_OK;
+ return Status::OK();
+ }
+
+ private:
+ std::string key_name_;
+ TDigestCreateOptions options_;
+};
+
+class CommandTDigestInfo : public Commander {
+ public:
+ Status Parse(const std::vector<std::string> &args) override {
+ key_name_ = args[1];
+ return Status::OK();
+ }
+
+ Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
+ TDigest tdigest(srv->storage, conn->GetNamespace());
+ TDigestMetadata metadata;
+ auto s = tdigest.GetMetaData(ctx, key_name_, &metadata);
+ if (!s.ok()) {
+ if (s.IsNotFound()) {
+ return {Status::RedisExecErr, errKeyNotFound};
+ }
+ return {Status::RedisExecErr, s.ToString()};
+ }
+
+ output->append(conn->HeaderOfMap(8));
+ output->append(redis::BulkString(kInfoCompression));
+ output->append(redis::Integer(metadata.compression));
+ output->append(redis::BulkString(kInfoCapacity));
+ output->append(redis::Integer(metadata.capacity));
+ output->append(redis::BulkString(kInfoMergedNodes));
+ output->append(redis::Integer(metadata.merged_nodes));
+ output->append(redis::BulkString(kInfoUnmergedNodes));
+ output->append(redis::Integer(metadata.unmerged_nodes));
+ output->append(redis::BulkString(kInfoMergedWeight));
+ output->append(redis::Integer(metadata.merged_weight));
+ output->append(redis::BulkString(kInfoUnmergedWeight));
+ output->append(redis::Integer(metadata.total_weight -
metadata.merged_weight));
+ output->append(redis::BulkString(kInfoObservations));
+ output->append(redis::Integer(metadata.total_observations));
+ output->append(redis::BulkString(kInfoTotalCompressions));
+ output->append(redis::Integer(metadata.merge_times));
+ // "Memory usage" is not meaningful for kvrocks, so we don't provide it
here.
+ return Status::OK();
+ }
+
+ private:
+ std::string key_name_;
+};
+
+REDIS_REGISTER_COMMANDS(TDigest,
MakeCmdAttr<CommandTDigestCreate>("tdigest.create", -2, "write", 1, 1, 1),
+ MakeCmdAttr<CommandTDigestInfo>("tdigest.info", 2,
"read-only", 1, 1, 1));
+} // namespace redis
diff --git a/src/commands/commander.h b/src/commands/commander.h
index a4dd8b2d..8517b84f 100644
--- a/src/commands/commander.h
+++ b/src/commands/commander.h
@@ -108,6 +108,7 @@ enum class CommandCategory : uint8_t {
SortedInt,
Stream,
String,
+ TDigest,
Txn,
ZSet,
};
diff --git a/src/commands/error_constants.h b/src/commands/error_constants.h
index fc33b60d..26b2e692 100644
--- a/src/commands/error_constants.h
+++ b/src/commands/error_constants.h
@@ -45,4 +45,9 @@ inline constexpr const char *errNoMatchingScript = "No
matching script. Please u
inline constexpr const char *errUnknownOption = "unknown option";
inline constexpr const char *errUnknownSubcommandOrWrongArguments = "Unknown
subcommand or wrong number of arguments";
inline constexpr const char *errRestoringBackup = "kvrocks is restoring the db
from backup";
+inline constexpr const char *errParseCompression = "error parsing compression
parameter";
+inline constexpr const char *errCompressionMustBePositive = "compression
parameter needs to be a positive integer";
+inline constexpr const char *errCompressionOutOfRange = "compression must be
between 1 and 1000";
+inline constexpr const char *errKeyNotFound = "key does not exist";
+inline constexpr const char *errKeyAlreadyExists = "key already exists";
} // namespace redis
diff --git a/src/types/redis_tdigest.cc b/src/types/redis_tdigest.cc
index e29b35f0..5cfbbfcd 100644
--- a/src/types/redis_tdigest.cc
+++ b/src/types/redis_tdigest.cc
@@ -108,12 +108,11 @@ class DummyCentroids {
};
uint32_t constexpr kMaxElements = 1 * 1024; // 1k doubles
-uint32_t constexpr kMaxCompression = 1000; // limit the compression to 1k
rocksdb::Status TDigest::Create(engine::Context& ctx, const Slice&
digest_name, const TDigestCreateOptions& options,
bool* exists) {
- if (options.compression > kMaxCompression) {
- return rocksdb::Status::InvalidArgument(fmt::format("compression should be
less than {}", kMaxCompression));
+ if (options.compression > kTDigestMaxCompression) {
+ return rocksdb::Status::InvalidArgument(fmt::format("compression should be
less than {}", kTDigestMaxCompression));
}
auto ns_key = AppendNamespacePrefix(digest_name);
@@ -121,8 +120,7 @@ rocksdb::Status TDigest::Create(engine::Context& ctx, const
Slice& digest_name,
capacity = ((capacity < kMaxElements) ? capacity : kMaxElements);
TDigestMetadata metadata(options.compression, capacity);
- LockGuard guard(storage_->GetLockManager(), ns_key);
- auto status = GetMetaData(ctx, ns_key, &metadata);
+ auto status = getMetaDataByNsKey(ctx, ns_key, &metadata);
*exists = status.ok();
if (*exists) {
return rocksdb::Status::InvalidArgument("tdigest already exists");
@@ -152,7 +150,7 @@ rocksdb::Status TDigest::Add(engine::Context& ctx, const
Slice& digest_name, con
LockGuard guard(storage_->GetLockManager(), ns_key);
TDigestMetadata metadata;
- if (auto status = GetMetaData(ctx, ns_key, &metadata); !status.ok()) {
+ if (auto status = getMetaDataByNsKey(ctx, ns_key, &metadata); !status.ok()) {
return status;
}
@@ -192,7 +190,7 @@ rocksdb::Status TDigest::Quantile(engine::Context& ctx,
const Slice& digest_name
{
LockGuard guard(storage_->GetLockManager(), ns_key);
- if (auto status = GetMetaData(ctx, ns_key, &metadata); !status.ok()) {
+ if (auto status = getMetaDataByNsKey(ctx, ns_key, &metadata);
!status.ok()) {
return status;
}
@@ -239,7 +237,12 @@ rocksdb::Status TDigest::Quantile(engine::Context& ctx,
const Slice& digest_name
return rocksdb::Status::OK();
}
-rocksdb::Status TDigest::GetMetaData(engine::Context& context, const Slice&
ns_key, TDigestMetadata* metadata) {
+rocksdb::Status TDigest::GetMetaData(engine::Context& context, const Slice&
digest_name, TDigestMetadata* metadata) {
+ auto ns_key = AppendNamespacePrefix(digest_name);
+ return Database::GetMetadata(context, {kRedisTDigest}, ns_key, metadata);
+}
+
+rocksdb::Status TDigest::getMetaDataByNsKey(engine::Context& context, const
Slice& ns_key, TDigestMetadata* metadata) {
return Database::GetMetadata(context, {kRedisTDigest}, ns_key, metadata);
}
diff --git a/src/types/redis_tdigest.h b/src/types/redis_tdigest.h
index c1621338..eccb2007 100644
--- a/src/types/redis_tdigest.h
+++ b/src/types/redis_tdigest.h
@@ -32,6 +32,8 @@
#include "tdigest.h"
namespace redis {
+inline constexpr uint32_t kTDigestMaxCompression = 1000; // limit the
compression to 1k
+
struct CentroidWithKey {
Centroid centroid;
rocksdb::Slice key;
@@ -72,6 +74,8 @@ class TDigest : public SubKeyScanner {
rocksdb::ColumnFamilyHandle* cf_handle_;
+ rocksdb::Status getMetaDataByNsKey(engine::Context& context, const Slice&
digest_name, TDigestMetadata* metadata);
+
rocksdb::Status appendBuffer(engine::Context& ctx,
ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch,
const std::string& ns_key, const
std::vector<double>& inputs, TDigestMetadata* metadata);
diff --git a/tests/cppunit/types/tdigest_test.cc
b/tests/cppunit/types/tdigest_test.cc
index 849c27f6..7d2b6e6e 100644
--- a/tests/cppunit/types/tdigest_test.cc
+++ b/tests/cppunit/types/tdigest_test.cc
@@ -150,9 +150,8 @@ TEST_F(RedisTDigestTest, Create) {
ASSERT_TRUE(exists);
ASSERT_TRUE(status.IsInvalidArgument());
- auto ns_key = tdigest_->AppendNamespacePrefix(test_digest_name);
TDigestMetadata metadata;
- auto get_status = tdigest_->GetMetaData(*ctx_, ns_key, &metadata);
+ auto get_status = tdigest_->GetMetaData(*ctx_, test_digest_name, &metadata);
ASSERT_TRUE(get_status.ok()) << get_status.ToString();
ASSERT_EQ(metadata.compression, 100) << metadata.compression;
}
diff --git a/tests/gocase/unit/type/tdigest/tdigest_test.go
b/tests/gocase/unit/type/tdigest/tdigest_test.go
new file mode 100644
index 00000000..a9d4b3e7
--- /dev/null
+++ b/tests/gocase/unit/type/tdigest/tdigest_test.go
@@ -0,0 +1,144 @@
+//go:build !ignore_when_tsan
+
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+ */
+
+package tdigest
+
+import (
+ "context"
+ "testing"
+
+ "github.com/apache/kvrocks/tests/gocase/util"
+ "github.com/stretchr/testify/require"
+)
+
+const (
+ errMsgWrongNumberArg = "wrong number of arguments"
+ errMsgParseCompression = "error parsing compression parameter"
+ errMsgNeedToBePositive = "compression parameter needs to be a positive
integer"
+ errMsgMustInRange = "compression must be between 1 and 1000"
+ errMsgKeyAlreadyExists = "key already exists"
+ errMsgKeyNotExist = "key does not exist"
+)
+
+type tdigestInfo struct {
+ Compression int64
+ Capacity int64
+ MergedNodes int64
+ UnmergedNodes int64
+ MergedWeight int64
+ UnmergedWeight int64
+ Observations int64
+ TotalCompressions int64
+ // memory usgae is not useful, we do not support it now
+}
+
+func toTdigestInfo(t *testing.T, value interface{}) tdigestInfo {
+ require.IsType(t, map[interface{}]interface{}{}, value)
+ v := value.(map[interface{}]interface{})
+ return tdigestInfo{
+ Compression: v["Compression"].(int64),
+ Capacity: v["Capacity"].(int64),
+ MergedNodes: v["Merged nodes"].(int64),
+ UnmergedNodes: v["Unmerged nodes"].(int64),
+ MergedWeight: v["Merged weight"].(int64),
+ UnmergedWeight: v["Unmerged weight"].(int64),
+ Observations: v["Observations"].(int64),
+ TotalCompressions: v["Total compressions"].(int64),
+ }
+}
+
+func TestTDigest(t *testing.T) {
+ configOptions := []util.ConfigOptions{
+ {
+ Name: "txn-context-enabled",
+ Options: []string{"yes", "no"},
+ ConfigType: util.YesNo,
+ },
+ }
+
+ configsMatrix, err := util.GenerateConfigsMatrix(configOptions)
+ require.NoError(t, err)
+
+ for _, configs := range configsMatrix {
+ tdigestTests(t, configs)
+ }
+}
+
+func tdigestTests(t *testing.T, configs util.KvrocksServerConfigs) {
+ srv := util.StartServer(t, configs)
+ defer srv.Close()
+ ctx := context.Background()
+ rdb := srv.NewClient()
+ defer func() { require.NoError(t, rdb.Close()) }()
+
+ t.Run("tdigest.create with different arguments", func(t *testing.T) {
+ keyPrefix := "tdigest_create_"
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CREATE").Err(),
errMsgWrongNumberArg)
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CREATE",
keyPrefix+"key", "hahah").Err(), errMsgWrongNumberArg)
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CREATE",
keyPrefix+"key", "1", "hahah").Err(), errMsgWrongNumberArg)
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CREATE",
keyPrefix+"key", "compression").Err(), errMsgWrongNumberArg)
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CREATE",
keyPrefix+"key", "compression", "hahah").Err(), errMsgParseCompression)
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CREATE",
keyPrefix+"key", "compression", "0").Err(), errMsgNeedToBePositive)
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CREATE",
keyPrefix+"key", "compression", "-1").Err(), errMsgNeedToBePositive)
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CREATE",
keyPrefix+"key", "compression", "0.1").Err(), errMsgParseCompression)
+
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE",
keyPrefix+"key0", "compression", "1").Err())
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CREATE",
keyPrefix+"key0", "compression", "1").Err(), errMsgKeyAlreadyExists)
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE",
keyPrefix+"key1", "compression", "1000").Err())
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.CREATE",
keyPrefix+"key", "compression", "1001").Err(), errMsgMustInRange)
+ })
+
+ t.Run("tdigest.info with different arguments", func(t *testing.T) {
+ keyPrefix := "tdigest_info_"
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.INFO").Err(),
errMsgWrongNumberArg)
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.INFO",
keyPrefix+"key", "hahah").Err(), errMsgWrongNumberArg)
+ require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.INFO",
keyPrefix+"key").Err(), errMsgKeyNotExist)
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE",
keyPrefix+"key0", "compression", "1").Err())
+ {
+ rsp := rdb.Do(ctx, "TDIGEST.INFO", keyPrefix+"key0")
+ require.NoError(t, rsp.Err())
+ info := toTdigestInfo(t, rsp.Val())
+ require.EqualValues(t, 1, info.Compression)
+ require.EqualValues(t, 1*6+10, info.Capacity)
+ require.EqualValues(t, 0, info.MergedNodes)
+ require.EqualValues(t, 0, info.UnmergedNodes)
+ require.EqualValues(t, 0, info.MergedWeight)
+ require.EqualValues(t, 0, info.UnmergedWeight)
+ require.EqualValues(t, 0, info.Observations)
+ require.EqualValues(t, 0, info.TotalCompressions)
+ }
+
+ {
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE",
keyPrefix+"key1", "compression", "1000").Err())
+ rsp := rdb.Do(ctx, "TDIGEST.INFO", keyPrefix+"key1")
+ require.NoError(t, rsp.Err())
+ info := toTdigestInfo(t, rsp.Val())
+ require.EqualValues(t, 1000, info.Compression)
+ require.EqualValues(t, 1024, info.Capacity) // max is
1024
+ require.EqualValues(t, 0, info.MergedNodes)
+ require.EqualValues(t, 0, info.UnmergedNodes)
+ require.EqualValues(t, 0, info.MergedWeight)
+ require.EqualValues(t, 0, info.UnmergedWeight)
+ require.EqualValues(t, 0, info.Observations)
+ require.EqualValues(t, 0, info.TotalCompressions)
+ }
+ })
+}