This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 80b493c6 Add CBOR as a storage format for the JSON data type (#1871)
80b493c6 is described below

commit 80b493c697e1dfed9d4099e51566f6ea11242a38
Author: Twice <[email protected]>
AuthorDate: Sat Nov 4 00:49:19 2023 +0900

    Add CBOR as a storage format for the JSON data type (#1871)
---
 kvrocks.conf                             |  7 ++++++
 src/cli/main.cc                          |  2 +-
 src/commands/cmd_json.cc                 | 19 +++++++++++++++
 src/config/config.cc                     | 22 +++++++++++------
 src/config/config.h                      |  6 +++--
 src/config/config_type.h                 | 32 ++++++++++++++-----------
 src/storage/redis_metadata.h             |  1 +
 src/storage/storage.cc                   |  4 ++--
 src/types/json.h                         | 38 +++++++++++++++++++++++++++++
 src/types/redis_json.cc                  | 41 +++++++++++++++++++++++++++-----
 src/types/redis_json.h                   |  1 +
 tests/gocase/unit/type/json/json_test.go | 11 +++++++++
 12 files changed, 152 insertions(+), 32 deletions(-)

diff --git a/kvrocks.conf b/kvrocks.conf
index be433254..7667511b 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -317,6 +317,13 @@ redis-cursor-compatible no
 # Default: 1024
 json-max-nesting-depth 1024
 
+# The underlying storage format of JSON data type
+# NOTE: This option only affects newly written/updated key-values
+# The CBOR format may reduce the storage size and speed up JSON commands
+# Available values: json, cbor
+# Default: json
+json-storage-format json
+
 ################################## TLS ###################################
 
 # By default, TLS/SSL is disabled, i.e. `tls-port` is set to 0.
diff --git a/src/cli/main.cc b/src/cli/main.cc
index a9661b7f..df0bf033 100644
--- a/src/cli/main.cc
+++ b/src/cli/main.cc
@@ -151,7 +151,7 @@ int main(int argc, char *argv[]) {
       }
     }
   }
-  bool is_supervised = 
IsSupervisedMode(static_cast<SupervisedMode>(config.supervised_mode));
+  bool is_supervised = IsSupervisedMode(config.supervised_mode);
   if (config.daemonize && !is_supervised) Daemonize();
   s = CreatePidFile(config.pidfile);
   if (!s.IsOK()) {
diff --git a/src/commands/cmd_json.cc b/src/commands/cmd_json.cc
index 6c2d40cd..ad2c7152 100644
--- a/src/commands/cmd_json.cc
+++ b/src/commands/cmd_json.cc
@@ -24,6 +24,7 @@
 #include "commands/command_parser.h"
 #include "server/redis_reply.h"
 #include "server/server.h"
+#include "storage/redis_metadata.h"
 #include "types/redis_json.h"
 
 namespace redis {
@@ -100,6 +101,23 @@ class CommandJsonGet : public Commander {
   std::vector<std::string> paths_;
 };
 
+class CommandJsonInfo : public Commander {
+ public:
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    redis::Json json(srv->storage, conn->GetNamespace());
+
+    auto storage_format = JsonStorageFormat::JSON;
+    auto s = json.Info(args_[1], &storage_format);
+    if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+
+    auto format_str = storage_format == JsonStorageFormat::JSON   ? "json"
+                      : storage_format == JsonStorageFormat::CBOR ? "cbor"
+                                                                  : "unknown";
+    output->append(redis::MultiBulkString({"storage_format", format_str}));
+    return Status::OK();
+  }
+};
+
 class CommandJsonArrAppend : public Commander {
  public:
   Status Execute(Server *srv, Connection *conn, std::string *output) override {
@@ -206,6 +224,7 @@ class CommandJsonArrLen : public Commander {
 
 REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandJsonSet>("json.set", 4, "write", 1, 
1, 1),
                         MakeCmdAttr<CommandJsonGet>("json.get", -2, 
"read-only", 1, 1, 1),
+                        MakeCmdAttr<CommandJsonInfo>("json.info", 2, 
"read-only", 1, 1, 1),
                         MakeCmdAttr<CommandJsonType>("json.type", -2, 
"read-only", 1, 1, 1),
                         MakeCmdAttr<CommandJsonArrAppend>("json.arrappend", 
-4, "write", 1, 1, 1),
                         MakeCmdAttr<CommandJsonClear>("json.clear", -2, 
"write", 1, 1, 1),
diff --git a/src/config/config.cc b/src/config/config.cc
index 62e48abf..d576c5b1 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -37,8 +37,10 @@
 #include "config_type.h"
 #include "config_util.h"
 #include "parse_util.h"
+#include "rocksdb/compression_type.h"
 #include "server/server.h"
 #include "status.h"
+#include "storage/redis_metadata.h"
 
 constexpr const char *kDefaultBindAddress = "127.0.0.1";
 
@@ -46,22 +48,25 @@ constexpr const char *errBlobDbNotEnabled = "Must set 
rocksdb.enable_blob_files
 constexpr const char *errLevelCompactionDynamicLevelBytesNotSet =
     "Must set rocksdb.level_compaction_dynamic_level_bytes yes first.";
 
-std::vector<ConfigEnum> supervised_modes{
+const std::vector<ConfigEnum<SupervisedMode>> supervised_modes{
     {"no", kSupervisedNone},
     {"auto", kSupervisedAutoDetect},
     {"upstart", kSupervisedUpStart},
     {"systemd", kSupervisedSystemd},
 };
 
-std::vector<ConfigEnum> log_levels{
+const std::vector<ConfigEnum<int>> log_levels{
     {"info", google::INFO},
     {"warning", google::WARNING},
     {"error", google::ERROR},
     {"fatal", google::FATAL},
 };
 
-std::vector<ConfigEnum> compression_types{[] {
-  std::vector<ConfigEnum> res;
+const std::vector<ConfigEnum<JsonStorageFormat>> json_storage_formats{{"json", 
JsonStorageFormat::JSON},
+                                                                      {"cbor", 
JsonStorageFormat::CBOR}};
+
+const std::vector<ConfigEnum<rocksdb::CompressionType>> compression_types{[] {
+  std::vector<ConfigEnum<rocksdb::CompressionType>> res;
   res.reserve(engine::CompressionOptions.size());
   for (const auto &e : engine::CompressionOptions) {
     res.push_back({e.name, e.type});
@@ -127,13 +132,13 @@ Config::Config() {
       {"dir", true, new StringField(&dir, "/tmp/kvrocks")},
       {"backup-dir", false, new StringField(&backup_dir, "")},
       {"log-dir", true, new StringField(&log_dir, "")},
-      {"log-level", false, new EnumField(&log_level, log_levels, 
google::INFO)},
+      {"log-level", false, new EnumField<int>(&log_level, log_levels, 
google::INFO)},
       {"pidfile", true, new StringField(&pidfile, "")},
       {"max-io-mb", false, new IntField(&max_io_mb, 0, 0, INT_MAX)},
       {"max-bitmap-to-string-mb", false, new 
IntField(&max_bitmap_to_string_mb, 16, 0, INT_MAX)},
       {"max-db-size", false, new IntField(&max_db_size, 0, 0, INT_MAX)},
       {"max-replication-mb", false, new IntField(&max_replication_mb, 0, 0, 
INT_MAX)},
-      {"supervised", true, new EnumField(&supervised_mode, supervised_modes, 
kSupervisedNone)},
+      {"supervised", true, new EnumField<SupervisedMode>(&supervised_mode, 
supervised_modes, kSupervisedNone)},
       {"slave-serve-stale-data", false, new 
YesNoField(&slave_serve_stale_data, true)},
       {"slave-empty-db-before-fullsync", false, new 
YesNoField(&slave_empty_db_before_fullsync, false)},
       {"slave-priority", false, new IntField(&slave_priority, 100, 0, 
INT_MAX)},
@@ -161,10 +166,13 @@ Config::Config() {
       {"redis-cursor-compatible", false, new 
YesNoField(&redis_cursor_compatible, false)},
       {"repl-namespace-enabled", false, new 
YesNoField(&repl_namespace_enabled, false)},
       {"json-max-nesting-depth", false, new IntField(&json_max_nesting_depth, 
1024, 0, INT_MAX)},
+      {"json-storage-format", false,
+       new EnumField<JsonStorageFormat>(&json_storage_format, 
json_storage_formats, JsonStorageFormat::JSON)},
 
       /* rocksdb options */
       {"rocksdb.compression", false,
-       new EnumField(&rocks_db.compression, compression_types, 
rocksdb::CompressionType::kNoCompression)},
+       new EnumField<rocksdb::CompressionType>(&rocks_db.compression, 
compression_types,
+                                               
rocksdb::CompressionType::kNoCompression)},
       {"rocksdb.block_size", true, new IntField(&rocks_db.block_size, 16384, 
0, INT_MAX)},
       {"rocksdb.max_open_files", false, new IntField(&rocks_db.max_open_files, 
8096, -1, INT_MAX)},
       {"rocksdb.write_buffer_size", false, new 
IntField(&rocks_db.write_buffer_size, 64, 0, 4096)},
diff --git a/src/config/config.h b/src/config/config.h
index 5b5b2632..aa1f9f89 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -32,6 +32,7 @@
 #include "config_type.h"
 #include "cron.h"
 #include "status.h"
+#include "storage/redis_metadata.h"
 
 // forward declaration
 class Server;
@@ -101,7 +102,7 @@ struct Config {
   int slowlog_log_slower_than = 100000;
   int slowlog_max_len = 128;
   bool daemonize = false;
-  int supervised_mode = kSupervisedNone;
+  SupervisedMode supervised_mode = kSupervisedNone;
   bool slave_readonly = true;
   bool slave_serve_stale_data = true;
   bool slave_empty_db_before_fullsync = false;
@@ -163,6 +164,7 @@ struct Config {
 
   // json
   int json_max_nesting_depth = 1024;
+  JsonStorageFormat json_storage_format = JsonStorageFormat::JSON;
 
   struct RocksDB {
     int block_size;
@@ -189,7 +191,7 @@ struct Config {
     int level0_slowdown_writes_trigger;
     int level0_stop_writes_trigger;
     int level0_file_num_compaction_trigger;
-    int compression;
+    rocksdb::CompressionType compression;
     bool disable_auto_compactions;
     bool enable_blob_files;
     int min_blob_size;
diff --git a/src/config/config_type.h b/src/config/config_type.h
index 60b7d7ba..0b980823 100644
--- a/src/config/config_type.h
+++ b/src/config/config_type.h
@@ -47,9 +47,10 @@ using IntField = IntegerField<int>;
 using UInt32Field = IntegerField<uint32_t>;
 using Int64Field = IntegerField<int64_t>;
 
+template <typename Enum>
 struct ConfigEnum {
-  const std::string name;
-  const int val;
+  std::string name;
+  Enum val;
 };
 
 enum ConfigType { SingleConfig, MultiConfig };
@@ -183,11 +184,13 @@ class YesNoField : public ConfigField {
   bool *receiver_;
 };
 
+template <typename Enum>
 class EnumField : public ConfigField {
  public:
-  EnumField(int *receiver, std::vector<ConfigEnum> enums, int e) : 
receiver_(receiver), enums_(std::move(enums)) {
-    *receiver_ = e;
-  }
+  using EnumItem = ConfigEnum<Enum>;
+  using EnumItems = std::vector<EnumItem>;
+
+  EnumField(Enum *receiver, EnumItems enums, Enum e) : receiver_(receiver), 
enums_(std::move(enums)) { *receiver_ = e; }
   ~EnumField() override = default;
 
   std::string ToString() const override {
@@ -198,7 +201,7 @@ class EnumField : public ConfigField {
   }
 
   Status ToNumber(int64_t *n) const override {
-    *n = *receiver_;
+    *n = static_cast<int64_t>(*receiver_);
     return Status::OK();
   }
 
@@ -209,16 +212,17 @@ class EnumField : public ConfigField {
         return Status::OK();
       }
     }
-    return {Status::NotOK, fmt::format("invalid enum option, acceptable values 
are {}",
-                                       std::accumulate(enums_.begin(), 
enums_.end(), std::string{},
-                                                       [this](const 
std::string &res, const ConfigEnum &e) {
-                                                         if (&e != 
&enums_.back()) return res + "'" + e.name + "', ";
 
-                                                         return res + "'" + 
e.name + "'";
-                                                       }))};
+    auto acceptale_values =
+        std::accumulate(enums_.begin(), enums_.end(), std::string{}, 
[this](const std::string &res, const EnumItem &e) {
+          if (&e != &enums_.back()) return res + "'" + e.name + "', ";
+
+          return res + "'" + e.name + "'";
+        });
+    return {Status::NotOK, fmt::format("invalid enum option, acceptable values 
are {}", acceptale_values)};
   }
 
  private:
-  int *receiver_;
-  std::vector<ConfigEnum> enums_;
+  Enum *receiver_;
+  EnumItems enums_;
 };
diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h
index 423d8388..48c40f67 100644
--- a/src/storage/redis_metadata.h
+++ b/src/storage/redis_metadata.h
@@ -272,6 +272,7 @@ class BloomChainMetadata : public Metadata {
 
 enum class JsonStorageFormat : uint8_t {
   JSON = 0,
+  CBOR = 1,
 };
 
 class JsonMetadata : public Metadata {
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 8c40b668..70855dcb 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -119,7 +119,7 @@ void Storage::SetBlobDB(rocksdb::ColumnFamilyOptions 
*cf_options) {
   cf_options->enable_blob_files = config_->rocks_db.enable_blob_files;
   cf_options->min_blob_size = config_->rocks_db.min_blob_size;
   cf_options->blob_file_size = config_->rocks_db.blob_file_size;
-  cf_options->blob_compression_type = 
static_cast<rocksdb::CompressionType>(config_->rocks_db.compression);
+  cf_options->blob_compression_type = config_->rocks_db.compression;
   cf_options->enable_blob_garbage_collection = 
config_->rocks_db.enable_blob_garbage_collection;
   // Use 100.0 to force converting blob_garbage_collection_age_cutoff to double
   cf_options->blob_garbage_collection_age_cutoff = 
config_->rocks_db.blob_garbage_collection_age_cutoff / 100.0;
@@ -149,7 +149,7 @@ rocksdb::Options Storage::InitRocksDBOptions() {
     if (i < 2) {
       options.compression_per_level[i] = 
rocksdb::CompressionType::kNoCompression;
     } else {
-      options.compression_per_level[i] = 
static_cast<rocksdb::CompressionType>(config_->rocks_db.compression);
+      options.compression_per_level[i] = config_->rocks_db.compression;
     }
   }
   if (config_->rocks_db.row_cache_size) {
diff --git a/src/types/json.h b/src/types/json.h
index cea9bcc9..67ebdba6 100644
--- a/src/types/json.h
+++ b/src/types/json.h
@@ -23,6 +23,9 @@
 #include <jsoncons/json.hpp>
 #include <jsoncons/json_error.hpp>
 #include <jsoncons/json_options.hpp>
+#include <jsoncons_ext/cbor/cbor.hpp>
+#include <jsoncons_ext/cbor/cbor_encoder.hpp>
+#include <jsoncons_ext/cbor/cbor_options.hpp>
 #include <jsoncons_ext/jsonpath/json_query.hpp>
 #include <jsoncons_ext/jsonpath/jsonpath_error.hpp>
 #include <limits>
@@ -48,6 +51,21 @@ struct JsonValue {
     return JsonValue(std::move(val));
   }
 
+  static StatusOr<JsonValue> FromCBOR(std::string_view str, int 
max_nesting_depth = std::numeric_limits<int>::max()) {
+    jsoncons::json val;
+
+    jsoncons::cbor::cbor_options options;
+    options.max_nesting_depth(max_nesting_depth);
+
+    try {
+      val = jsoncons::cbor::decode_cbor<jsoncons::json>(str, options);
+    } catch (const jsoncons::ser_error &e) {
+      return {Status::NotOK, e.what()};
+    }
+
+    return JsonValue(std::move(val));
+  }
+
   StatusOr<std::string> Dump(int max_nesting_depth = 
std::numeric_limits<int>::max()) const {
     std::string res;
     GET_OR_RET(Dump(&res, max_nesting_depth));
@@ -68,6 +86,26 @@ struct JsonValue {
     return Status::OK();
   }
 
+  StatusOr<std::string> DumpCBOR(int max_nesting_depth = 
std::numeric_limits<int>::max()) const {
+    std::string res;
+    GET_OR_RET(DumpCBOR(&res, max_nesting_depth));
+    return res;
+  }
+
+  Status DumpCBOR(std::string *buffer, int max_nesting_depth = 
std::numeric_limits<int>::max()) const {
+    jsoncons::cbor::cbor_options options;
+    options.max_nesting_depth(max_nesting_depth);
+
+    jsoncons::cbor::basic_cbor_encoder<jsoncons::string_sink<std::string>> 
encoder{*buffer, options};
+    std::error_code ec;
+    value.dump(encoder, ec);
+    if (ec) {
+      return {Status::NotOK, ec.message()};
+    }
+
+    return Status::OK();
+  }
+
   StatusOr<std::string> Print(uint8_t indent_size = 0, bool spaces_after_colon 
= false,
                               const std::string &new_line_chars = "") const {
     std::string res;
diff --git a/src/types/redis_json.cc b/src/types/redis_json.cc
index 6096003b..06dd8ed3 100644
--- a/src/types/redis_json.cc
+++ b/src/types/redis_json.cc
@@ -31,11 +31,20 @@ rocksdb::Status Json::write(Slice ns_key, JsonMetadata 
*metadata, const JsonValu
   WriteBatchLogData log_data(kRedisJson);
   batch->PutLogData(log_data.Encode());
 
-  metadata->format = JsonStorageFormat::JSON;
+  auto format = storage_->GetConfig()->json_storage_format;
+  metadata->format = format;
 
   std::string val;
   metadata->Encode(&val);
-  auto s = json_val.Dump(&val, storage_->GetConfig()->json_max_nesting_depth);
+
+  Status s;
+  if (format == JsonStorageFormat::JSON) {
+    s = json_val.Dump(&val, storage_->GetConfig()->json_max_nesting_depth);
+  } else if (format == JsonStorageFormat::CBOR) {
+    s = json_val.DumpCBOR(&val, storage_->GetConfig()->json_max_nesting_depth);
+  } else {
+    return rocksdb::Status::InvalidArgument("JSON storage format not 
supported");
+  }
   if (!s) {
     return rocksdb::Status::InvalidArgument("Failed to encode JSON into 
storage: " + s.Msg());
   }
@@ -52,12 +61,32 @@ rocksdb::Status Json::read(const Slice &ns_key, 
JsonMetadata *metadata, JsonValu
   auto s = GetMetadata(kRedisJson, ns_key, &bytes, metadata, &rest);
   if (!s.ok()) return s;
 
-  if (metadata->format != JsonStorageFormat::JSON)
+  if (metadata->format == JsonStorageFormat::JSON) {
+    auto origin_res = JsonValue::FromString(rest.ToStringView());
+    if (!origin_res) return rocksdb::Status::Corruption(origin_res.Msg());
+    *value = *std::move(origin_res);
+  } else if (metadata->format == JsonStorageFormat::CBOR) {
+    auto origin_res = JsonValue::FromCBOR(rest.ToStringView());
+    if (!origin_res) return rocksdb::Status::Corruption(origin_res.Msg());
+    *value = *std::move(origin_res);
+  } else {
     return rocksdb::Status::NotSupported("JSON storage format not supported");
+  }
+
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status Json::Info(const std::string &user_key, JsonStorageFormat 
*storage_format) {
+  auto ns_key = AppendNamespacePrefix(user_key);
+
+  std::string bytes;
+  Slice rest;
+  JsonMetadata metadata;
+
+  auto s = GetMetadata(kRedisJson, ns_key, &bytes, &metadata, &rest);
+  if (!s.ok()) return s;
 
-  auto origin_res = JsonValue::FromString(rest.ToStringView());
-  if (!origin_res) return rocksdb::Status::Corruption(origin_res.Msg());
-  *value = *std::move(origin_res);
+  *storage_format = metadata.format;
 
   return rocksdb::Status::OK();
 }
diff --git a/src/types/redis_json.h b/src/types/redis_json.h
index 8eeb2571..5ba6f7e9 100644
--- a/src/types/redis_json.h
+++ b/src/types/redis_json.h
@@ -35,6 +35,7 @@ class Json : public Database {
 
   rocksdb::Status Set(const std::string &user_key, const std::string &path, 
const std::string &value);
   rocksdb::Status Get(const std::string &user_key, const 
std::vector<std::string> &paths, JsonValue *result);
+  rocksdb::Status Info(const std::string &user_key, JsonStorageFormat 
*storage_format);
   rocksdb::Status Type(const std::string &user_key, const std::string &path, 
std::vector<std::string> *results);
   rocksdb::Status ArrAppend(const std::string &user_key, const std::string 
&path,
                             const std::vector<std::string> &values, 
std::vector<size_t> *result_count);
diff --git a/tests/gocase/unit/type/json/json_test.go 
b/tests/gocase/unit/type/json/json_test.go
index c0e7f803..3b1c70ac 100644
--- a/tests/gocase/unit/type/json/json_test.go
+++ b/tests/gocase/unit/type/json/json_test.go
@@ -71,6 +71,17 @@ func TestJson(t *testing.T) {
                require.Equal(t, rdb.Do(ctx, "JSON.GET", "a", "INDENT", " ", 
"$").Val(), `[ {  "x":1,  "y":2 }]`)
        })
 
+       t.Run("JSON storage format CBOR", func(t *testing.T) {
+               require.NoError(t, rdb.Do(ctx, "JSON.SET", "a", "$", `{"x":1, 
"y":2}`).Err())
+               require.Equal(t, "json", rdb.Do(ctx, "JSON.INFO", 
"a").Val().([]interface{})[1])
+
+               require.NoError(t, rdb.Do(ctx, "CONFIG", "SET", 
"json-storage-format", "cbor").Err())
+               require.NoError(t, rdb.Do(ctx, "JSON.SET", "b", "$", `{"x":1, 
"y":2}`).Err())
+               require.Equal(t, "cbor", rdb.Do(ctx, "JSON.INFO", 
"b").Val().([]interface{})[1])
+               require.Equal(t, `{"x":1,"y":2}`, rdb.Do(ctx, "JSON.GET", 
"b").Val())
+               require.Equal(t, `{"x":1,"y":2}`, rdb.Do(ctx, "JSON.GET", 
"a").Val())
+       })
+
        t.Run("JSON.ARRAPPEND basics", func(t *testing.T) {
                require.NoError(t, rdb.Do(ctx, "SET", "a", `1`).Err())
                require.Error(t, rdb.Do(ctx, "JSON.ARRAPPEND", "a", "$", 
`1`).Err())

Reply via email to