This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 935f32e1 Add support of new command JSON.SET and JSON.GET (#1803)
935f32e1 is described below
commit 935f32e1de626669909e49f59b450dbf91f34790
Author: Twice <[email protected]>
AuthorDate: Sun Oct 15 22:08:17 2023 +0900
Add support of new command JSON.SET and JSON.GET (#1803)
---
.github/config/typos.toml | 3 ++
src/cluster/slot_migrate.cc | 2 +-
src/commands/cmd_json.cc | 58 ++++++++++++++++++++
src/storage/compact_filter.cc | 2 +-
src/storage/redis_db.cc | 32 +++++++----
src/storage/redis_db.h | 3 ++
src/storage/redis_metadata.cc | 32 +++++++++--
src/storage/redis_metadata.h | 31 +++++++++++
src/types/json.h | 77 +++++++++++++++++++++++++++
src/types/redis_json.cc | 120 ++++++++++++++++++++++++++++++++++++++++++
src/types/redis_json.h | 42 +++++++++++++++
11 files changed, 386 insertions(+), 16 deletions(-)
diff --git a/.github/config/typos.toml b/.github/config/typos.toml
index c02f4f6e..daae57c8 100644
--- a/.github/config/typos.toml
+++ b/.github/config/typos.toml
@@ -30,3 +30,6 @@ ignore-hidden = false
# random strings
"ue" = "ue"
+
+# jsoncons
+"ser" = "ser"
diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index 900be3b1..3e93d832 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -588,7 +588,7 @@ StatusOr<KeyMigrationResult>
SlotMigrator::migrateOneKey(const rocksdb::Slice &k
return {Status::NotOK, s.ToString()};
}
- if (metadata.Type() != kRedisString && metadata.Type() != kRedisStream &&
metadata.size == 0) {
+ if (!metadata.IsEmptyableType() && metadata.size == 0) {
return KeyMigrationResult::kUnderlyingStructEmpty;
}
diff --git a/src/commands/cmd_json.cc b/src/commands/cmd_json.cc
new file mode 100644
index 00000000..ad52467a
--- /dev/null
+++ b/src/commands/cmd_json.cc
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "commander.h"
+#include "server/redis_reply.h"
+#include "server/server.h"
+#include "types/redis_json.h"
+
+namespace redis {
+
+class CommandJsonSet : public Commander {
+ public:
+ Status Execute(Server *svr, Connection *conn, std::string *output) override {
+ redis::Json json(svr->storage, conn->GetNamespace());
+
+ auto s = json.Set(args_[1], args_[2], args_[3]);
+ if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+
+ *output = redis::SimpleString("OK");
+ return Status::OK();
+ }
+};
+
+class CommandJsonGet : public Commander {
+ public:
+ Status Execute(Server *svr, Connection *conn, std::string *output) override {
+ redis::Json json(svr->storage, conn->GetNamespace());
+
+ JsonValue result;
+ auto s = json.Get(args_[1], {args_.begin() + 2, args_.end()}, &result);
+ if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+
+ *output = redis::BulkString(result.Dump());
+ return Status::OK();
+ }
+};
+
+REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandJsonSet>("json.set", -3, "write",
1, 1, 1),
+ MakeCmdAttr<CommandJsonGet>("json.get", -2,
"read-only", 1, 1, 1), );
+
+} // namespace redis
diff --git a/src/storage/compact_filter.cc b/src/storage/compact_filter.cc
index 6de34496..91c6751a 100644
--- a/src/storage/compact_filter.cc
+++ b/src/storage/compact_filter.cc
@@ -88,7 +88,7 @@ bool SubKeyFilter::IsMetadataExpired(const InternalKey &ikey,
const Metadata &me
// lazy delete to avoid race condition between command Expire and subkey
Compaction
// Related issue:https://github.com/apache/kvrocks/issues/1298
//
- // `Util::GetTimeStampMS() - 300000` means extending 5 minutes for expired
items,
+ // `util::GetTimeStampMS() - 300000` means extending 5 minutes for expired
items,
// to prevent them from being recycled once they reach the expiration time.
uint64_t lazy_expired_ts = util::GetTimeStampMS() - 300000;
return metadata.Type() == kRedisString // metadata key was overwrite by set
command
diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc
index 9201d386..057729f2 100644
--- a/src/storage/redis_db.cc
+++ b/src/storage/redis_db.cc
@@ -38,13 +38,11 @@ Database::Database(engine::Storage *storage, std::string
ns) : storage_(storage)
metadata_cf_handle_ = storage->GetCFHandle("metadata");
}
-rocksdb::Status Database::GetMetadata(RedisType type, const Slice &ns_key,
Metadata *metadata) {
+rocksdb::Status Database::ParseMetadata(RedisType type, Slice *bytes, Metadata
*metadata) {
std::string old_metadata;
metadata->Encode(&old_metadata);
- std::string bytes;
- auto s = GetRawMetadata(ns_key, &bytes);
- if (!s.ok()) return s;
- s = metadata->Decode(bytes);
+
+ auto s = metadata->Decode(bytes);
if (!s.ok()) return s;
if (metadata->Expired()) {
@@ -52,14 +50,12 @@ rocksdb::Status Database::GetMetadata(RedisType type, const
Slice &ns_key, Metad
auto _ [[maybe_unused]] = metadata->Decode(old_metadata);
return rocksdb::Status::NotFound(kErrMsgKeyExpired);
}
- if (metadata->Type() != type &&
- (metadata->size > 0 || metadata->Type() == kRedisString ||
metadata->Type() == kRedisStream)) {
+ if (metadata->Type() != type && (metadata->size > 0 ||
metadata->IsEmptyableType())) {
// error discarded here since it already failed
auto _ [[maybe_unused]] = metadata->Decode(old_metadata);
return rocksdb::Status::InvalidArgument(kErrMsgWrongType);
}
- if (metadata->size == 0 && type != kRedisStream &&
- type != kRedisBloomFilter) { // stream and bloom is allowed to be empty
+ if (metadata->size == 0 && !metadata->IsEmptyableType()) {
// error discarded here since it already failed
auto _ [[maybe_unused]] = metadata->Decode(old_metadata);
return rocksdb::Status::NotFound("no element found");
@@ -67,6 +63,22 @@ rocksdb::Status Database::GetMetadata(RedisType type, const
Slice &ns_key, Metad
return s;
}
+rocksdb::Status Database::GetMetadata(RedisType type, const Slice &ns_key,
Metadata *metadata) {
+ std::string bytes;
+ auto s = GetRawMetadata(ns_key, &bytes);
+ if (!s.ok()) return s;
+ Slice bytes_slice(bytes);
+ return ParseMetadata(type, &bytes_slice, metadata);
+}
+
+rocksdb::Status Database::GetMetadata(RedisType type, const Slice &ns_key,
std::string *raw_value, Metadata *metadata,
+ Slice *rest) {
+ auto s = GetRawMetadata(ns_key, raw_value);
+ *rest = *raw_value;
+ if (!s.ok()) return s;
+ return ParseMetadata(type, rest, metadata);
+}
+
rocksdb::Status Database::GetRawMetadata(const Slice &ns_key, std::string
*bytes) {
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
@@ -92,7 +104,7 @@ rocksdb::Status Database::Expire(const Slice &user_key,
uint64_t timestamp) {
if (metadata.Expired()) {
return rocksdb::Status::NotFound(kErrMsgKeyExpired);
}
- if (metadata.Type() != kRedisString && metadata.size == 0) {
+ if (!metadata.IsEmptyableType() && metadata.size == 0) {
return rocksdb::Status::NotFound("no elements");
}
if (metadata.expire == timestamp) return rocksdb::Status::OK();
diff --git a/src/storage/redis_db.h b/src/storage/redis_db.h
index 8b1d7c32..3e94b73b 100644
--- a/src/storage/redis_db.h
+++ b/src/storage/redis_db.h
@@ -34,7 +34,10 @@ class Database {
static constexpr uint64_t RANDOM_KEY_SCAN_LIMIT = 60;
explicit Database(engine::Storage *storage, std::string ns = "");
+ [[nodiscard]] static rocksdb::Status ParseMetadata(RedisType type, Slice
*bytes, Metadata *metadata);
[[nodiscard]] rocksdb::Status GetMetadata(RedisType type, const Slice
&ns_key, Metadata *metadata);
+ [[nodiscard]] rocksdb::Status GetMetadata(RedisType type, const Slice
&ns_key, std::string *raw_value,
+ Metadata *metadata, Slice *rest);
[[nodiscard]] rocksdb::Status GetRawMetadata(const Slice &ns_key,
std::string *bytes);
[[nodiscard]] rocksdb::Status GetRawMetadataByUserKey(const Slice &user_key,
std::string *bytes);
[[nodiscard]] rocksdb::Status Expire(const Slice &user_key, uint64_t
timestamp);
diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc
index a40d769f..23afc765 100644
--- a/src/storage/redis_metadata.cc
+++ b/src/storage/redis_metadata.cc
@@ -162,7 +162,7 @@ rocksdb::Status Metadata::Decode(Slice *input) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}
- if (Type() != kRedisString) {
+ if (!IsSingleKVType()) {
if (input->size() < 8 + CommonEncodedSize()) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}
@@ -178,7 +178,7 @@ rocksdb::Status Metadata::Decode(Slice input) { return
Decode(&input); }
void Metadata::Encode(std::string *dst) const {
PutFixed8(dst, flags);
PutExpire(dst);
- if (Type() != kRedisString) {
+ if (!IsSingleKVType()) {
PutFixed64(dst, version);
PutFixedCommon(dst, size);
}
@@ -199,7 +199,7 @@ uint64_t Metadata::generateVersion() {
bool Metadata::operator==(const Metadata &that) const {
if (flags != that.flags) return false;
if (expire != that.expire) return false;
- if (Type() != kRedisString) {
+ if (!IsSingleKVType()) {
if (size != that.size) return false;
if (version != that.version) return false;
}
@@ -304,7 +304,7 @@ timeval Metadata::Time() const {
}
bool Metadata::ExpireAt(uint64_t expired_ts) const {
- if (Type() != kRedisString && Type() != kRedisStream && Type() !=
kRedisBloomFilter && size == 0) {
+ if (!IsEmptyableType() && size == 0) {
return true;
}
if (expire == 0) {
@@ -314,6 +314,12 @@ bool Metadata::ExpireAt(uint64_t expired_ts) const {
return expire < expired_ts;
}
+bool Metadata::IsSingleKVType() const { return Type() == kRedisString ||
Type() == kRedisJson; }
+
+bool Metadata::IsEmptyableType() const {
+ return IsSingleKVType() || Type() == kRedisStream || Type() ==
kRedisBloomFilter;
+}
+
bool Metadata::Expired() const { return ExpireAt(util::GetTimeStampMS()); }
ListMetadata::ListMetadata(bool generate_version)
@@ -436,3 +442,21 @@ uint32_t BloomChainMetadata::GetCapacity() const {
}
return static_cast<uint32_t>(base_capacity * (1 - pow(expansion, n_filters))
/ (1 - expansion));
}
+
+void JsonMetadata::Encode(std::string *dst) const {
+ Metadata::Encode(dst);
+
+ PutFixed8(dst, uint8_t(format));
+}
+
+rocksdb::Status JsonMetadata::Decode(Slice *input) {
+ if (auto s = Metadata::Decode(input); !s.ok()) {
+ return s;
+ }
+
+ if (!GetFixed8(input, reinterpret_cast<uint8_t *>(&format))) {
+ return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
+ }
+
+ return rocksdb::Status::OK();
+}
diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h
index 33760aac..8d6aac65 100644
--- a/src/storage/redis_metadata.h
+++ b/src/storage/redis_metadata.h
@@ -46,6 +46,7 @@ enum RedisType {
kRedisSortedint = 7,
kRedisStream = 8,
kRedisBloomFilter = 9,
+ kRedisJson = 10,
};
enum RedisCommand {
@@ -128,6 +129,7 @@ class Metadata {
explicit Metadata(RedisType type, bool generate_version = true,
bool use_64bit_common_field =
USE_64BIT_COMMON_FIELD_DEFAULT);
+
static void InitVersionCounter();
static size_t GetOffsetAfterExpire(uint8_t flags);
@@ -147,6 +149,19 @@ class Metadata {
bool Expired() const;
bool ExpireAt(uint64_t expired_ts) const;
+ // return whether for this type, the metadata itself is the whole data,
+ // no other key-values.
+ // this means that the metadata of these types do NOT have
+ // `version` and `size` field.
+ // e.g. RedisString, RedisJson
+ bool IsSingleKVType() const;
+
+ // return whether the `size` field of this type can be zero.
+ // if a type is NOT an emptyable type,
+ // any key of this type is regarded as expired if `size` equals to 0.
+ // e.g. any SingleKVType, RedisStream, RedisBloomFilter
+ bool IsEmptyableType() const;
+
virtual void Encode(std::string *dst) const;
[[nodiscard]] virtual rocksdb::Status Decode(Slice *input);
[[nodiscard]] rocksdb::Status Decode(Slice input);
@@ -254,3 +269,19 @@ class BloomChainMetadata : public Metadata {
bool IsScaling() const { return expansion != 0; };
};
+
+enum class JsonStorageFormat : uint8_t {
+ JSON = 0,
+};
+
+class JsonMetadata : public Metadata {
+ public:
+ // to make JSON type more extensible,
+ // we add a field to indicate the format of stored data
+ JsonStorageFormat format = JsonStorageFormat::JSON;
+
+ explicit JsonMetadata(bool generate_version = true) : Metadata(kRedisJson,
generate_version) {}
+
+ void Encode(std::string *dst) const override;
+ rocksdb::Status Decode(Slice *input) override;
+};
diff --git a/src/types/json.h b/src/types/json.h
new file mode 100644
index 00000000..d25c20f1
--- /dev/null
+++ b/src/types/json.h
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <jsoncons/json.hpp>
+#include <jsoncons/json_error.hpp>
+#include <jsoncons_ext/jsonpath/json_query.hpp>
+
+#include "jsoncons_ext/jsonpath/jsonpath_error.hpp"
+#include "status.h"
+
+struct JsonValue {
+ JsonValue() = default;
+ explicit JsonValue(jsoncons::basic_json<char> value) :
value(std::move(value)) {}
+
+ static StatusOr<JsonValue> FromString(std::string_view str) {
+ jsoncons::json val;
+
+ try {
+ val = jsoncons::json::parse(str);
+ } catch (const jsoncons::ser_error &e) {
+ return {Status::NotOK, e.what()};
+ }
+
+ return JsonValue(std::move(val));
+ }
+
+ std::string Dump() const { return value.to_string(); }
+
+ Status Set(std::string_view path, JsonValue &&new_value) {
+ try {
+ jsoncons::jsonpath::json_replace(value, path, [&new_value](const
std::string & /*path*/, jsoncons::json &origin) {
+ origin = std::move(new_value.value);
+ });
+ } catch (const jsoncons::jsonpath::jsonpath_error &e) {
+ return {Status::NotOK, e.what()};
+ }
+
+ return Status::OK();
+ }
+
+ StatusOr<JsonValue> Get(std::string_view path) const {
+ try {
+ return jsoncons::jsonpath::json_query(value, path);
+ } catch (const jsoncons::jsonpath::jsonpath_error &e) {
+ return {Status::NotOK, e.what()};
+ }
+ }
+
+ JsonValue(const JsonValue &) = default;
+ JsonValue(JsonValue &&) = default;
+
+ JsonValue &operator=(const JsonValue &) = default;
+ JsonValue &operator=(JsonValue &&) = default;
+
+ ~JsonValue() = default;
+
+ jsoncons::json value;
+};
diff --git a/src/types/redis_json.cc b/src/types/redis_json.cc
new file mode 100644
index 00000000..65895f4e
--- /dev/null
+++ b/src/types/redis_json.cc
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "redis_json.h"
+
+#include "json.h"
+#include "lock_manager.h"
+#include "storage/redis_metadata.h"
+
+namespace redis {
+
+rocksdb::Status Json::write(Slice ns_key, JsonMetadata *metadata, const
JsonValue &json_val) {
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisJson);
+ batch->PutLogData(log_data.Encode());
+
+ metadata->format = JsonStorageFormat::JSON;
+
+ std::string val;
+ metadata->Encode(&val);
+ val.append(json_val.Dump());
+
+ batch->Put(metadata_cf_handle_, ns_key, val);
+
+ return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+}
+
+rocksdb::Status Json::Set(const std::string &user_key, const std::string
&path, const std::string &value) {
+ auto ns_key = AppendNamespacePrefix(user_key);
+
+ LockGuard guard(storage_->GetLockManager(), ns_key);
+
+ std::string bytes;
+ JsonMetadata metadata;
+ Slice rest;
+ auto s = GetMetadata(kRedisJson, ns_key, &bytes, &metadata, &rest);
+
+ if (s.IsNotFound()) {
+ if (path != "$") return rocksdb::Status::InvalidArgument("new objects must
be created at the root");
+
+ auto json_res = JsonValue::FromString(value);
+ if (!json_res) return rocksdb::Status::InvalidArgument(json_res.Msg());
+ auto json_val = *std::move(json_res);
+
+ return write(ns_key, &metadata, json_val);
+ }
+
+ if (!s.ok()) return s;
+
+ if (metadata.format != JsonStorageFormat::JSON)
+ return rocksdb::Status::NotSupported("JSON storage format not supported");
+
+ auto origin_res = JsonValue::FromString(rest.ToStringView());
+ if (!origin_res) return rocksdb::Status::Corruption(origin_res.Msg());
+ auto origin = *std::move(origin_res);
+
+ auto new_res = JsonValue::FromString(value);
+ if (!new_res) return rocksdb::Status::InvalidArgument(new_res.Msg());
+ auto new_val = *std::move(new_res);
+
+ auto set_res = origin.Set(path, std::move(new_val));
+ if (!set_res) return rocksdb::Status::InvalidArgument(set_res.Msg());
+
+ return write(ns_key, &metadata, origin);
+}
+
+rocksdb::Status Json::Get(const std::string &user_key, const
std::vector<std::string> &paths, JsonValue *result) {
+ auto ns_key = AppendNamespacePrefix(user_key);
+
+ std::string bytes;
+ JsonMetadata metadata;
+ Slice rest;
+ auto s = GetMetadata(kRedisJson, ns_key, &bytes, &metadata, &rest);
+ if (!s.ok()) return s;
+
+ if (metadata.format != JsonStorageFormat::JSON)
+ return rocksdb::Status::NotSupported("JSON storage format not supported");
+
+ auto json_res = JsonValue::FromString(rest.ToStringView());
+ if (!json_res) return rocksdb::Status::Corruption(json_res.Msg());
+ auto json_val = *std::move(json_res);
+
+ JsonValue res;
+
+ if (paths.empty()) {
+ res = std::move(json_val);
+ } else if (paths.size() == 1) {
+ auto get_res = json_val.Get(paths[0]);
+ if (!get_res) return rocksdb::Status::InvalidArgument(get_res.Msg());
+ res = *std::move(get_res);
+ } else {
+ for (const auto &path : paths) {
+ auto get_res = json_val.Get(paths[0]);
+ if (!get_res) return rocksdb::Status::InvalidArgument(get_res.Msg());
+ res.value.insert_or_assign(path, std::move(get_res->value));
+ }
+ }
+
+ *result = std::move(res);
+ return rocksdb::Status::OK();
+}
+
+} // namespace redis
diff --git a/src/types/redis_json.h b/src/types/redis_json.h
new file mode 100644
index 00000000..413383c8
--- /dev/null
+++ b/src/types/redis_json.h
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include <storage/redis_db.h>
+
+#include <string>
+
+#include "json.h"
+
+namespace redis {
+
+class Json : public Database {
+ public:
+ Json(engine::Storage *storage, std::string ns) : Database(storage,
std::move(ns)) {}
+
+ rocksdb::Status Set(const std::string &user_key, const std::string &path,
const std::string &value);
+ rocksdb::Status Get(const std::string &user_key, const
std::vector<std::string> &paths, JsonValue *result);
+
+ private:
+ rocksdb::Status write(Slice ns_key, JsonMetadata *metadata, const JsonValue
&json_val);
+};
+
+} // namespace redis