This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 79f4d525 feat(cluster): add support of the JSON type in cluster
migration (#2654)
79f4d525 is described below
commit 79f4d525e3e63eaec7d1316f5076345170174b9c
Author: hulk <[email protected]>
AuthorDate: Tue Nov 12 09:56:34 2024 +0800
feat(cluster): add support of the JSON type in cluster migration (#2654)
---
src/cluster/slot_migrate.cc | 34 +++++++++++++++++-----
src/storage/batch_extractor.cc | 12 ++++++++
src/storage/redis_db.cc | 22 ++++++--------
src/storage/redis_db.h | 4 ++-
src/types/redis_bitmap.cc | 2 +-
src/types/redis_json.cc | 10 ++++++-
src/types/redis_json.h | 1 +
src/types/redis_string.cc | 4 +--
.../integration/slotmigrate/slotmigrate_test.go | 7 +++--
9 files changed, 69 insertions(+), 27 deletions(-)
diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index e59aee1b..4b544cf7 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -692,7 +692,8 @@ StatusOr<KeyMigrationResult>
SlotMigrator::migrateOneKey(const rocksdb::Slice &k
// Construct command according to type of the key
switch (metadata.Type()) {
- case kRedisString: {
+ case kRedisString:
+ case kRedisJson: {
auto s = migrateSimpleKey(key, metadata, bytes, restore_cmds);
if (!s.IsOK()) {
return s.Prefixed("failed to migrate simple key");
@@ -738,13 +739,32 @@ StatusOr<KeyMigrationResult>
SlotMigrator::migrateOneKey(const rocksdb::Slice &k
Status SlotMigrator::migrateSimpleKey(const rocksdb::Slice &key, const
Metadata &metadata, const std::string &bytes,
std::string *restore_cmds) {
- std::vector<std::string> command = {"SET", key.ToString(),
bytes.substr(Metadata::GetOffsetAfterExpire(bytes[0]))};
- if (metadata.expire > 0) {
- command.emplace_back("PXAT");
- command.emplace_back(std::to_string(metadata.expire));
+ if (metadata.Type() == kRedisString) {
+ std::vector<std::string> command = {"SET", key.ToString(),
bytes.substr(Metadata::GetOffsetAfterExpire(bytes[0]))};
+ if (metadata.expire > 0) {
+ command.emplace_back("PXAT");
+ command.emplace_back(std::to_string(metadata.expire));
+ }
+ *restore_cmds += redis::ArrayOfBulkStrings(command);
+ current_pipeline_size_++;
+ } else if (metadata.Type() == kRedisJson) {
+ // kRedisJson
+ JsonValue json_value;
+ if (auto s = redis::Json::FromRawString(bytes, &json_value); !s.ok()) {
+ return {Status::NotOK, s.ToString()};
+ }
+ auto json_bytes = GET_OR_RET(json_value.Dump());
+ std::vector<std::string> command = {"JSON.SET", key.ToString(), "$",
std::move(json_bytes)};
+ *restore_cmds += redis::ArrayOfBulkStrings(command);
+ current_pipeline_size_++;
+
+ if (metadata.expire > 0) {
+ *restore_cmds += redis::ArrayOfBulkStrings({"PEXPIREAT", key.ToString(),
std::to_string(metadata.expire)});
+ current_pipeline_size_++;
+ }
+ } else {
+ return {Status::NotOK, "unsupported simple key type"};
}
- *restore_cmds += redis::ArrayOfBulkStrings(command);
- current_pipeline_size_++;
// Check whether pipeline needs to be sent
// TODO(chrisZMF): Resend data if failed to send data
diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc
index 8ab8f04e..968c7088 100644
--- a/src/storage/batch_extractor.cc
+++ b/src/storage/batch_extractor.cc
@@ -69,6 +69,18 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t
column_family_id, const Slic
command_args = {"PEXPIREAT", user_key,
std::to_string(metadata.expire)};
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
}
+ } else if (metadata.Type() == kRedisJson) {
+ JsonValue json_value;
+ s = redis::Json::FromRawString(value.ToString(), &json_value);
+ if (!s.ok()) return s;
+ auto json_bytes = json_value.Dump();
+ if (!json_bytes) return rocksdb::Status::Corruption(json_bytes.Msg());
+ command_args = {"JSON.SET", user_key, "$", json_bytes.GetValue()};
+ resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
+ if (metadata.expire > 0) {
+ command_args = {"PEXPIREAT", user_key,
std::to_string(metadata.expire)};
+
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
+ }
} else if (metadata.expire > 0) {
auto args = log_data_.GetArguments();
if (args->size() > 0) {
diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc
index 26864f3e..38845fef 100644
--- a/src/storage/redis_db.cc
+++ b/src/storage/redis_db.cc
@@ -47,21 +47,10 @@ Database::Database(engine::Storage *storage, std::string ns)
metadata_cf_handle_(storage->GetCFHandle(ColumnFamilyID::Metadata)),
namespace_(std::move(ns)) {}
-// Some data types may support reading multiple types of metadata.
-// For example, bitmap supports reading string metadata and bitmap metadata.
rocksdb::Status Database::ParseMetadata(RedisTypes types, Slice *bytes,
Metadata *metadata) {
std::string old_metadata;
metadata->Encode(&old_metadata);
- bool is_keyspace_hit = false;
- ScopeExit se([this, &is_keyspace_hit] {
- if (is_keyspace_hit) {
- storage_->RecordStat(engine::StatType::KeyspaceHits, 1);
- } else {
- storage_->RecordStat(engine::StatType::KeyspaceMisses, 1);
- }
- });
-
auto s = metadata->Decode(bytes);
// delay InvalidArgument error check after type match check
if (!s.ok() && !s.IsInvalidArgument()) return s;
@@ -85,7 +74,14 @@ rocksdb::Status Database::ParseMetadata(RedisTypes types,
Slice *bytes, Metadata
auto _ [[maybe_unused]] = metadata->Decode(old_metadata);
return rocksdb::Status::NotFound("no element found");
}
- is_keyspace_hit = true;
+ return s;
+}
+
+// Some data types may support reading multiple types of metadata.
+// For example, bitmap supports reading string metadata and bitmap metadata.
+rocksdb::Status Database::ParseMetadataWithStats(RedisTypes types, Slice
*bytes, Metadata *metadata) {
+ auto s = ParseMetadata(types, bytes, metadata);
+ storage_->RecordStat(s.ok() ? engine::StatType::KeyspaceHits :
engine::StatType::KeyspaceMisses, 1);
return s;
}
@@ -100,7 +96,7 @@ rocksdb::Status Database::GetMetadata(engine::Context &ctx,
RedisTypes types, co
auto s = GetRawMetadata(ctx, ns_key, raw_value);
*rest = *raw_value;
if (!s.ok()) return s;
- return ParseMetadata(types, rest, metadata);
+ return ParseMetadataWithStats(types, rest, metadata);
}
rocksdb::Status Database::GetRawMetadata(engine::Context &ctx, const Slice
&ns_key, std::string *bytes) {
diff --git a/src/storage/redis_db.h b/src/storage/redis_db.h
index 41ed3dae..9563b1e1 100644
--- a/src/storage/redis_db.h
+++ b/src/storage/redis_db.h
@@ -75,7 +75,9 @@ class Database {
explicit Database(engine::Storage *storage, std::string ns = "");
/// Parsing metadata with type of `types` from bytes, the metadata is a base
class of all metadata.
/// When parsing, the bytes will be consumed.
- [[nodiscard]] rocksdb::Status ParseMetadata(RedisTypes types, Slice *bytes,
Metadata *metadata);
+ [[nodiscard]] rocksdb::Status ParseMetadataWithStats(RedisTypes types, Slice
*bytes, Metadata *metadata);
+ // ParseMetadata behaves the same as ParseMetadataWithStats, but without
recording stats.
+ [[nodiscard]] static rocksdb::Status ParseMetadata(RedisTypes types, Slice
*bytes, Metadata *metadata);
/// GetMetadata is a helper function to get metadata from the database. It
will read the "raw metadata"
/// from underlying storage, and then parse the raw metadata to the
specified metadata type.
///
diff --git a/src/types/redis_bitmap.cc b/src/types/redis_bitmap.cc
index e75deea3..d7216d7d 100644
--- a/src/types/redis_bitmap.cc
+++ b/src/types/redis_bitmap.cc
@@ -101,7 +101,7 @@ rocksdb::Status Bitmap::GetMetadata(engine::Context &ctx,
const Slice &ns_key, B
if (!s.ok()) return s;
Slice slice = *raw_value;
- return ParseMetadata({kRedisBitmap, kRedisString}, &slice, metadata);
+ return ParseMetadataWithStats({kRedisBitmap, kRedisString}, &slice,
metadata);
}
rocksdb::Status Bitmap::GetBit(engine::Context &ctx, const Slice &user_key,
uint32_t bit_offset, bool *bit) {
diff --git a/src/types/redis_json.cc b/src/types/redis_json.cc
index ba331ef9..43b516ba 100644
--- a/src/types/redis_json.cc
+++ b/src/types/redis_json.cc
@@ -630,7 +630,7 @@ std::vector<rocksdb::Status>
Json::readMulti(engine::Context &ctx, const std::ve
if (!statuses[i].ok()) continue;
Slice rest(pin_values[i].data(), pin_values[i].size());
JsonMetadata metadata;
- statuses[i] = ParseMetadata({kRedisJson}, &rest, &metadata);
+ statuses[i] = ParseMetadataWithStats({kRedisJson}, &rest, &metadata);
if (!statuses[i].ok()) continue;
statuses[i] = parse(metadata, rest, &values[i]);
@@ -674,4 +674,12 @@ rocksdb::Status Json::Resp(engine::Context &ctx, const
std::string &user_key, co
return rocksdb::Status::OK();
}
+rocksdb::Status Json::FromRawString(std::string_view value, JsonValue *result)
{
+ Slice rest = value;
+ JsonMetadata metadata;
+ auto s = ParseMetadata({kRedisJson}, &rest, &metadata);
+ if (!s.ok()) return s;
+ return parse(metadata, rest, result);
+}
+
} // namespace redis
diff --git a/src/types/redis_json.h b/src/types/redis_json.h
index 54831472..8b42ca7c 100644
--- a/src/types/redis_json.h
+++ b/src/types/redis_json.h
@@ -82,6 +82,7 @@ class Json : public Database {
rocksdb::Status Resp(engine::Context &ctx, const std::string &user_key,
const std::string &path,
std::vector<std::string> *results, RESP resp);
+ static rocksdb::Status FromRawString(std::string_view value, JsonValue
*result);
private:
rocksdb::Status write(engine::Context &ctx, Slice ns_key, JsonMetadata
*metadata, const JsonValue &json_val);
diff --git a/src/types/redis_string.cc b/src/types/redis_string.cc
index 210fb697..7d7f476d 100644
--- a/src/types/redis_string.cc
+++ b/src/types/redis_string.cc
@@ -47,7 +47,7 @@ std::vector<rocksdb::Status>
String::getRawValues(engine::Context &ctx, const st
(*raw_values)[i].assign(pin_values[i].data(), pin_values[i].size());
Metadata metadata(kRedisNone, false);
Slice slice = (*raw_values)[i];
- auto s = ParseMetadata({kRedisString}, &slice, &metadata);
+ auto s = ParseMetadataWithStats({kRedisString}, &slice, &metadata);
if (!s.ok()) {
statuses[i] = s;
(*raw_values)[i].clear();
@@ -65,7 +65,7 @@ rocksdb::Status String::getRawValue(engine::Context &ctx,
const std::string &ns_
Metadata metadata(kRedisNone, false);
Slice slice = *raw_value;
- return ParseMetadata({kRedisString}, &slice, &metadata);
+ return ParseMetadataWithStats({kRedisString}, &slice, &metadata);
}
rocksdb::Status String::getValueAndExpire(engine::Context &ctx, const
std::string &ns_key, std::string *value,
diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
index d1ddbb3d..e04acfc4 100644
--- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
+++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
@@ -535,7 +535,7 @@ func TestSlotMigrateDataType(t *testing.T) {
testSlot += 1
keys := make(map[string]string, 0)
- for _, typ := range []string{"string", "expired_string",
"list", "hash", "set", "zset", "bitmap", "sortint", "stream"} {
+ for _, typ := range []string{"string", "expired_string",
"list", "hash", "set", "zset", "bitmap", "sortint", "stream", "json"} {
keys[typ] = fmt.Sprintf("%s_{%s}", typ,
util.SlotTable[testSlot])
require.NoError(t, rdb0.Del(ctx, keys[typ]).Err())
}
@@ -590,8 +590,9 @@ func TestSlotMigrateDataType(t *testing.T) {
}).Err())
}
require.NoError(t, rdb0.Expire(ctx, keys["stream"],
10*time.Second).Err())
+ require.NoError(t, rdb0.JSONSet(ctx, keys["json"], "$", `{"a":
1, "b": "hello"}`).Err())
// check source data existence
- for _, typ := range []string{"string", "list", "hash", "set",
"zset", "bitmap", "sortint", "stream"} {
+ for _, typ := range []string{"string", "list", "hash", "set",
"zset", "bitmap", "sortint", "stream", "json"} {
require.EqualValues(t, 1, rdb0.Exists(ctx,
keys[typ]).Val())
}
// get source data
@@ -653,6 +654,8 @@ func TestSlotMigrateDataType(t *testing.T) {
require.EqualValues(t, 19, streamInfo.EntriesAdded)
require.EqualValues(t, "0-0", streamInfo.MaxDeletedEntryID)
require.EqualValues(t, 19, streamInfo.Length)
+ // type json
+ require.Equal(t, `{"a":1,"b":"hello"}`, rdb1.JSONGet(ctx,
keys["json"]).Val())
// topology is changed on source server
for _, typ := range []string{"string", "list", "hash", "set",
"zset", "bitmap", "sortint", "stream"} {
require.ErrorContains(t, rdb0.Exists(ctx,
keys[typ]).Err(), "MOVED")