This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 80b493c6 Add CBOR as a storage format for the JSON data type (#1871)
80b493c6 is described below
commit 80b493c697e1dfed9d4099e51566f6ea11242a38
Author: Twice <[email protected]>
AuthorDate: Sat Nov 4 00:49:19 2023 +0900
Add CBOR as a storage format for the JSON data type (#1871)
---
kvrocks.conf | 7 ++++++
src/cli/main.cc | 2 +-
src/commands/cmd_json.cc | 19 +++++++++++++++
src/config/config.cc | 22 +++++++++++------
src/config/config.h | 6 +++--
src/config/config_type.h | 32 ++++++++++++++-----------
src/storage/redis_metadata.h | 1 +
src/storage/storage.cc | 4 ++--
src/types/json.h | 38 +++++++++++++++++++++++++++++
src/types/redis_json.cc | 41 +++++++++++++++++++++++++++-----
src/types/redis_json.h | 1 +
tests/gocase/unit/type/json/json_test.go | 11 +++++++++
12 files changed, 152 insertions(+), 32 deletions(-)
diff --git a/kvrocks.conf b/kvrocks.conf
index be433254..7667511b 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -317,6 +317,13 @@ redis-cursor-compatible no
# Default: 1024
json-max-nesting-depth 1024
+# The underlying storage format of JSON data type
+# NOTE: This option only affects newly written/updated key-values
+# The CBOR format may reduce the storage size and speed up JSON commands
+# Available values: json, cbor
+# Default: json
+json-storage-format json
+
################################## TLS ###################################
# By default, TLS/SSL is disabled, i.e. `tls-port` is set to 0.
diff --git a/src/cli/main.cc b/src/cli/main.cc
index a9661b7f..df0bf033 100644
--- a/src/cli/main.cc
+++ b/src/cli/main.cc
@@ -151,7 +151,7 @@ int main(int argc, char *argv[]) {
}
}
}
- bool is_supervised =
IsSupervisedMode(static_cast<SupervisedMode>(config.supervised_mode));
+ bool is_supervised = IsSupervisedMode(config.supervised_mode);
if (config.daemonize && !is_supervised) Daemonize();
s = CreatePidFile(config.pidfile);
if (!s.IsOK()) {
diff --git a/src/commands/cmd_json.cc b/src/commands/cmd_json.cc
index 6c2d40cd..ad2c7152 100644
--- a/src/commands/cmd_json.cc
+++ b/src/commands/cmd_json.cc
@@ -24,6 +24,7 @@
#include "commands/command_parser.h"
#include "server/redis_reply.h"
#include "server/server.h"
+#include "storage/redis_metadata.h"
#include "types/redis_json.h"
namespace redis {
@@ -100,6 +101,23 @@ class CommandJsonGet : public Commander {
std::vector<std::string> paths_;
};
+class CommandJsonInfo : public Commander {
+ public:
+ Status Execute(Server *srv, Connection *conn, std::string *output) override {
+ redis::Json json(srv->storage, conn->GetNamespace());
+
+ auto storage_format = JsonStorageFormat::JSON;
+ auto s = json.Info(args_[1], &storage_format);
+ if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+
+ auto format_str = storage_format == JsonStorageFormat::JSON ? "json"
+ : storage_format == JsonStorageFormat::CBOR ? "cbor"
+ : "unknown";
+ output->append(redis::MultiBulkString({"storage_format", format_str}));
+ return Status::OK();
+ }
+};
+
class CommandJsonArrAppend : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
@@ -206,6 +224,7 @@ class CommandJsonArrLen : public Commander {
REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandJsonSet>("json.set", 4, "write", 1,
1, 1),
MakeCmdAttr<CommandJsonGet>("json.get", -2,
"read-only", 1, 1, 1),
+ MakeCmdAttr<CommandJsonInfo>("json.info", 2,
"read-only", 1, 1, 1),
MakeCmdAttr<CommandJsonType>("json.type", -2,
"read-only", 1, 1, 1),
MakeCmdAttr<CommandJsonArrAppend>("json.arrappend",
-4, "write", 1, 1, 1),
MakeCmdAttr<CommandJsonClear>("json.clear", -2,
"write", 1, 1, 1),
diff --git a/src/config/config.cc b/src/config/config.cc
index 62e48abf..d576c5b1 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -37,8 +37,10 @@
#include "config_type.h"
#include "config_util.h"
#include "parse_util.h"
+#include "rocksdb/compression_type.h"
#include "server/server.h"
#include "status.h"
+#include "storage/redis_metadata.h"
constexpr const char *kDefaultBindAddress = "127.0.0.1";
@@ -46,22 +48,25 @@ constexpr const char *errBlobDbNotEnabled = "Must set
rocksdb.enable_blob_files
constexpr const char *errLevelCompactionDynamicLevelBytesNotSet =
"Must set rocksdb.level_compaction_dynamic_level_bytes yes first.";
-std::vector<ConfigEnum> supervised_modes{
+const std::vector<ConfigEnum<SupervisedMode>> supervised_modes{
{"no", kSupervisedNone},
{"auto", kSupervisedAutoDetect},
{"upstart", kSupervisedUpStart},
{"systemd", kSupervisedSystemd},
};
-std::vector<ConfigEnum> log_levels{
+const std::vector<ConfigEnum<int>> log_levels{
{"info", google::INFO},
{"warning", google::WARNING},
{"error", google::ERROR},
{"fatal", google::FATAL},
};
-std::vector<ConfigEnum> compression_types{[] {
- std::vector<ConfigEnum> res;
+const std::vector<ConfigEnum<JsonStorageFormat>> json_storage_formats{{"json",
JsonStorageFormat::JSON},
+ {"cbor",
JsonStorageFormat::CBOR}};
+
+const std::vector<ConfigEnum<rocksdb::CompressionType>> compression_types{[] {
+ std::vector<ConfigEnum<rocksdb::CompressionType>> res;
res.reserve(engine::CompressionOptions.size());
for (const auto &e : engine::CompressionOptions) {
res.push_back({e.name, e.type});
@@ -127,13 +132,13 @@ Config::Config() {
{"dir", true, new StringField(&dir, "/tmp/kvrocks")},
{"backup-dir", false, new StringField(&backup_dir, "")},
{"log-dir", true, new StringField(&log_dir, "")},
- {"log-level", false, new EnumField(&log_level, log_levels,
google::INFO)},
+ {"log-level", false, new EnumField<int>(&log_level, log_levels,
google::INFO)},
{"pidfile", true, new StringField(&pidfile, "")},
{"max-io-mb", false, new IntField(&max_io_mb, 0, 0, INT_MAX)},
{"max-bitmap-to-string-mb", false, new
IntField(&max_bitmap_to_string_mb, 16, 0, INT_MAX)},
{"max-db-size", false, new IntField(&max_db_size, 0, 0, INT_MAX)},
{"max-replication-mb", false, new IntField(&max_replication_mb, 0, 0,
INT_MAX)},
- {"supervised", true, new EnumField(&supervised_mode, supervised_modes,
kSupervisedNone)},
+ {"supervised", true, new EnumField<SupervisedMode>(&supervised_mode,
supervised_modes, kSupervisedNone)},
{"slave-serve-stale-data", false, new
YesNoField(&slave_serve_stale_data, true)},
{"slave-empty-db-before-fullsync", false, new
YesNoField(&slave_empty_db_before_fullsync, false)},
{"slave-priority", false, new IntField(&slave_priority, 100, 0,
INT_MAX)},
@@ -161,10 +166,13 @@ Config::Config() {
{"redis-cursor-compatible", false, new
YesNoField(&redis_cursor_compatible, false)},
{"repl-namespace-enabled", false, new
YesNoField(&repl_namespace_enabled, false)},
{"json-max-nesting-depth", false, new IntField(&json_max_nesting_depth,
1024, 0, INT_MAX)},
+ {"json-storage-format", false,
+ new EnumField<JsonStorageFormat>(&json_storage_format,
json_storage_formats, JsonStorageFormat::JSON)},
/* rocksdb options */
{"rocksdb.compression", false,
- new EnumField(&rocks_db.compression, compression_types,
rocksdb::CompressionType::kNoCompression)},
+ new EnumField<rocksdb::CompressionType>(&rocks_db.compression,
compression_types,
+
rocksdb::CompressionType::kNoCompression)},
{"rocksdb.block_size", true, new IntField(&rocks_db.block_size, 16384,
0, INT_MAX)},
{"rocksdb.max_open_files", false, new IntField(&rocks_db.max_open_files,
8096, -1, INT_MAX)},
{"rocksdb.write_buffer_size", false, new
IntField(&rocks_db.write_buffer_size, 64, 0, 4096)},
diff --git a/src/config/config.h b/src/config/config.h
index 5b5b2632..aa1f9f89 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -32,6 +32,7 @@
#include "config_type.h"
#include "cron.h"
#include "status.h"
+#include "storage/redis_metadata.h"
// forward declaration
class Server;
@@ -101,7 +102,7 @@ struct Config {
int slowlog_log_slower_than = 100000;
int slowlog_max_len = 128;
bool daemonize = false;
- int supervised_mode = kSupervisedNone;
+ SupervisedMode supervised_mode = kSupervisedNone;
bool slave_readonly = true;
bool slave_serve_stale_data = true;
bool slave_empty_db_before_fullsync = false;
@@ -163,6 +164,7 @@ struct Config {
// json
int json_max_nesting_depth = 1024;
+ JsonStorageFormat json_storage_format = JsonStorageFormat::JSON;
struct RocksDB {
int block_size;
@@ -189,7 +191,7 @@ struct Config {
int level0_slowdown_writes_trigger;
int level0_stop_writes_trigger;
int level0_file_num_compaction_trigger;
- int compression;
+ rocksdb::CompressionType compression;
bool disable_auto_compactions;
bool enable_blob_files;
int min_blob_size;
diff --git a/src/config/config_type.h b/src/config/config_type.h
index 60b7d7ba..0b980823 100644
--- a/src/config/config_type.h
+++ b/src/config/config_type.h
@@ -47,9 +47,10 @@ using IntField = IntegerField<int>;
using UInt32Field = IntegerField<uint32_t>;
using Int64Field = IntegerField<int64_t>;
+template <typename Enum>
struct ConfigEnum {
- const std::string name;
- const int val;
+ std::string name;
+ Enum val;
};
enum ConfigType { SingleConfig, MultiConfig };
@@ -183,11 +184,13 @@ class YesNoField : public ConfigField {
bool *receiver_;
};
+template <typename Enum>
class EnumField : public ConfigField {
public:
- EnumField(int *receiver, std::vector<ConfigEnum> enums, int e) :
receiver_(receiver), enums_(std::move(enums)) {
- *receiver_ = e;
- }
+ using EnumItem = ConfigEnum<Enum>;
+ using EnumItems = std::vector<EnumItem>;
+
+ EnumField(Enum *receiver, EnumItems enums, Enum e) : receiver_(receiver),
enums_(std::move(enums)) { *receiver_ = e; }
~EnumField() override = default;
std::string ToString() const override {
@@ -198,7 +201,7 @@ class EnumField : public ConfigField {
}
Status ToNumber(int64_t *n) const override {
- *n = *receiver_;
+ *n = static_cast<int64_t>(*receiver_);
return Status::OK();
}
@@ -209,16 +212,17 @@ class EnumField : public ConfigField {
return Status::OK();
}
}
- return {Status::NotOK, fmt::format("invalid enum option, acceptable values
are {}",
- std::accumulate(enums_.begin(),
enums_.end(), std::string{},
- [this](const
std::string &res, const ConfigEnum &e) {
- if (&e !=
&enums_.back()) return res + "'" + e.name + "', ";
- return res + "'" +
e.name + "'";
- }))};
+ auto acceptale_values =
+ std::accumulate(enums_.begin(), enums_.end(), std::string{},
[this](const std::string &res, const EnumItem &e) {
+ if (&e != &enums_.back()) return res + "'" + e.name + "', ";
+
+ return res + "'" + e.name + "'";
+ });
+ return {Status::NotOK, fmt::format("invalid enum option, acceptable values
are {}", acceptale_values)};
}
private:
- int *receiver_;
- std::vector<ConfigEnum> enums_;
+ Enum *receiver_;
+ EnumItems enums_;
};
diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h
index 423d8388..48c40f67 100644
--- a/src/storage/redis_metadata.h
+++ b/src/storage/redis_metadata.h
@@ -272,6 +272,7 @@ class BloomChainMetadata : public Metadata {
enum class JsonStorageFormat : uint8_t {
JSON = 0,
+ CBOR = 1,
};
class JsonMetadata : public Metadata {
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 8c40b668..70855dcb 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -119,7 +119,7 @@ void Storage::SetBlobDB(rocksdb::ColumnFamilyOptions
*cf_options) {
cf_options->enable_blob_files = config_->rocks_db.enable_blob_files;
cf_options->min_blob_size = config_->rocks_db.min_blob_size;
cf_options->blob_file_size = config_->rocks_db.blob_file_size;
- cf_options->blob_compression_type =
static_cast<rocksdb::CompressionType>(config_->rocks_db.compression);
+ cf_options->blob_compression_type = config_->rocks_db.compression;
cf_options->enable_blob_garbage_collection =
config_->rocks_db.enable_blob_garbage_collection;
// Use 100.0 to force converting blob_garbage_collection_age_cutoff to double
cf_options->blob_garbage_collection_age_cutoff =
config_->rocks_db.blob_garbage_collection_age_cutoff / 100.0;
@@ -149,7 +149,7 @@ rocksdb::Options Storage::InitRocksDBOptions() {
if (i < 2) {
options.compression_per_level[i] =
rocksdb::CompressionType::kNoCompression;
} else {
- options.compression_per_level[i] =
static_cast<rocksdb::CompressionType>(config_->rocks_db.compression);
+ options.compression_per_level[i] = config_->rocks_db.compression;
}
}
if (config_->rocks_db.row_cache_size) {
diff --git a/src/types/json.h b/src/types/json.h
index cea9bcc9..67ebdba6 100644
--- a/src/types/json.h
+++ b/src/types/json.h
@@ -23,6 +23,9 @@
#include <jsoncons/json.hpp>
#include <jsoncons/json_error.hpp>
#include <jsoncons/json_options.hpp>
+#include <jsoncons_ext/cbor/cbor.hpp>
+#include <jsoncons_ext/cbor/cbor_encoder.hpp>
+#include <jsoncons_ext/cbor/cbor_options.hpp>
#include <jsoncons_ext/jsonpath/json_query.hpp>
#include <jsoncons_ext/jsonpath/jsonpath_error.hpp>
#include <limits>
@@ -48,6 +51,21 @@ struct JsonValue {
return JsonValue(std::move(val));
}
+ static StatusOr<JsonValue> FromCBOR(std::string_view str, int
max_nesting_depth = std::numeric_limits<int>::max()) {
+ jsoncons::json val;
+
+ jsoncons::cbor::cbor_options options;
+ options.max_nesting_depth(max_nesting_depth);
+
+ try {
+ val = jsoncons::cbor::decode_cbor<jsoncons::json>(str, options);
+ } catch (const jsoncons::ser_error &e) {
+ return {Status::NotOK, e.what()};
+ }
+
+ return JsonValue(std::move(val));
+ }
+
StatusOr<std::string> Dump(int max_nesting_depth =
std::numeric_limits<int>::max()) const {
std::string res;
GET_OR_RET(Dump(&res, max_nesting_depth));
@@ -68,6 +86,26 @@ struct JsonValue {
return Status::OK();
}
+ StatusOr<std::string> DumpCBOR(int max_nesting_depth =
std::numeric_limits<int>::max()) const {
+ std::string res;
+ GET_OR_RET(DumpCBOR(&res, max_nesting_depth));
+ return res;
+ }
+
+ Status DumpCBOR(std::string *buffer, int max_nesting_depth =
std::numeric_limits<int>::max()) const {
+ jsoncons::cbor::cbor_options options;
+ options.max_nesting_depth(max_nesting_depth);
+
+ jsoncons::cbor::basic_cbor_encoder<jsoncons::string_sink<std::string>>
encoder{*buffer, options};
+ std::error_code ec;
+ value.dump(encoder, ec);
+ if (ec) {
+ return {Status::NotOK, ec.message()};
+ }
+
+ return Status::OK();
+ }
+
StatusOr<std::string> Print(uint8_t indent_size = 0, bool spaces_after_colon
= false,
const std::string &new_line_chars = "") const {
std::string res;
diff --git a/src/types/redis_json.cc b/src/types/redis_json.cc
index 6096003b..06dd8ed3 100644
--- a/src/types/redis_json.cc
+++ b/src/types/redis_json.cc
@@ -31,11 +31,20 @@ rocksdb::Status Json::write(Slice ns_key, JsonMetadata
*metadata, const JsonValu
WriteBatchLogData log_data(kRedisJson);
batch->PutLogData(log_data.Encode());
- metadata->format = JsonStorageFormat::JSON;
+ auto format = storage_->GetConfig()->json_storage_format;
+ metadata->format = format;
std::string val;
metadata->Encode(&val);
- auto s = json_val.Dump(&val, storage_->GetConfig()->json_max_nesting_depth);
+
+ Status s;
+ if (format == JsonStorageFormat::JSON) {
+ s = json_val.Dump(&val, storage_->GetConfig()->json_max_nesting_depth);
+ } else if (format == JsonStorageFormat::CBOR) {
+ s = json_val.DumpCBOR(&val, storage_->GetConfig()->json_max_nesting_depth);
+ } else {
+ return rocksdb::Status::InvalidArgument("JSON storage format not
supported");
+ }
if (!s) {
return rocksdb::Status::InvalidArgument("Failed to encode JSON into
storage: " + s.Msg());
}
@@ -52,12 +61,32 @@ rocksdb::Status Json::read(const Slice &ns_key,
JsonMetadata *metadata, JsonValu
auto s = GetMetadata(kRedisJson, ns_key, &bytes, metadata, &rest);
if (!s.ok()) return s;
- if (metadata->format != JsonStorageFormat::JSON)
+ if (metadata->format == JsonStorageFormat::JSON) {
+ auto origin_res = JsonValue::FromString(rest.ToStringView());
+ if (!origin_res) return rocksdb::Status::Corruption(origin_res.Msg());
+ *value = *std::move(origin_res);
+ } else if (metadata->format == JsonStorageFormat::CBOR) {
+ auto origin_res = JsonValue::FromCBOR(rest.ToStringView());
+ if (!origin_res) return rocksdb::Status::Corruption(origin_res.Msg());
+ *value = *std::move(origin_res);
+ } else {
return rocksdb::Status::NotSupported("JSON storage format not supported");
+ }
+
+ return rocksdb::Status::OK();
+}
+
+rocksdb::Status Json::Info(const std::string &user_key, JsonStorageFormat
*storage_format) {
+ auto ns_key = AppendNamespacePrefix(user_key);
+
+ std::string bytes;
+ Slice rest;
+ JsonMetadata metadata;
+
+ auto s = GetMetadata(kRedisJson, ns_key, &bytes, &metadata, &rest);
+ if (!s.ok()) return s;
- auto origin_res = JsonValue::FromString(rest.ToStringView());
- if (!origin_res) return rocksdb::Status::Corruption(origin_res.Msg());
- *value = *std::move(origin_res);
+ *storage_format = metadata.format;
return rocksdb::Status::OK();
}
diff --git a/src/types/redis_json.h b/src/types/redis_json.h
index 8eeb2571..5ba6f7e9 100644
--- a/src/types/redis_json.h
+++ b/src/types/redis_json.h
@@ -35,6 +35,7 @@ class Json : public Database {
rocksdb::Status Set(const std::string &user_key, const std::string &path,
const std::string &value);
rocksdb::Status Get(const std::string &user_key, const
std::vector<std::string> &paths, JsonValue *result);
+ rocksdb::Status Info(const std::string &user_key, JsonStorageFormat
*storage_format);
rocksdb::Status Type(const std::string &user_key, const std::string &path,
std::vector<std::string> *results);
rocksdb::Status ArrAppend(const std::string &user_key, const std::string
&path,
const std::vector<std::string> &values,
std::vector<size_t> *result_count);
diff --git a/tests/gocase/unit/type/json/json_test.go
b/tests/gocase/unit/type/json/json_test.go
index c0e7f803..3b1c70ac 100644
--- a/tests/gocase/unit/type/json/json_test.go
+++ b/tests/gocase/unit/type/json/json_test.go
@@ -71,6 +71,17 @@ func TestJson(t *testing.T) {
require.Equal(t, rdb.Do(ctx, "JSON.GET", "a", "INDENT", " ",
"$").Val(), `[ { "x":1, "y":2 }]`)
})
+ t.Run("JSON storage format CBOR", func(t *testing.T) {
+ require.NoError(t, rdb.Do(ctx, "JSON.SET", "a", "$", `{"x":1,
"y":2}`).Err())
+ require.Equal(t, "json", rdb.Do(ctx, "JSON.INFO",
"a").Val().([]interface{})[1])
+
+ require.NoError(t, rdb.Do(ctx, "CONFIG", "SET",
"json-storage-format", "cbor").Err())
+ require.NoError(t, rdb.Do(ctx, "JSON.SET", "b", "$", `{"x":1,
"y":2}`).Err())
+ require.Equal(t, "cbor", rdb.Do(ctx, "JSON.INFO",
"b").Val().([]interface{})[1])
+ require.Equal(t, `{"x":1,"y":2}`, rdb.Do(ctx, "JSON.GET",
"b").Val())
+ require.Equal(t, `{"x":1,"y":2}`, rdb.Do(ctx, "JSON.GET",
"a").Val())
+ })
+
t.Run("JSON.ARRAPPEND basics", func(t *testing.T) {
require.NoError(t, rdb.Do(ctx, "SET", "a", `1`).Err())
require.Error(t, rdb.Do(ctx, "JSON.ARRAPPEND", "a", "$",
`1`).Err())