This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 79f4d525 feat(cluster): add support of the JSON type in cluster 
migration (#2654)
79f4d525 is described below

commit 79f4d525e3e63eaec7d1316f5076345170174b9c
Author: hulk <[email protected]>
AuthorDate: Tue Nov 12 09:56:34 2024 +0800

    feat(cluster): add support of the JSON type in cluster migration (#2654)
---
 src/cluster/slot_migrate.cc                        | 34 +++++++++++++++++-----
 src/storage/batch_extractor.cc                     | 12 ++++++++
 src/storage/redis_db.cc                            | 22 ++++++--------
 src/storage/redis_db.h                             |  4 ++-
 src/types/redis_bitmap.cc                          |  2 +-
 src/types/redis_json.cc                            | 10 ++++++-
 src/types/redis_json.h                             |  1 +
 src/types/redis_string.cc                          |  4 +--
 .../integration/slotmigrate/slotmigrate_test.go    |  7 +++--
 9 files changed, 69 insertions(+), 27 deletions(-)

diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index e59aee1b..4b544cf7 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -692,7 +692,8 @@ StatusOr<KeyMigrationResult> 
SlotMigrator::migrateOneKey(const rocksdb::Slice &k
 
   // Construct command according to type of the key
   switch (metadata.Type()) {
-    case kRedisString: {
+    case kRedisString:
+    case kRedisJson: {
       auto s = migrateSimpleKey(key, metadata, bytes, restore_cmds);
       if (!s.IsOK()) {
         return s.Prefixed("failed to migrate simple key");
@@ -738,13 +739,32 @@ StatusOr<KeyMigrationResult> 
SlotMigrator::migrateOneKey(const rocksdb::Slice &k
 
 Status SlotMigrator::migrateSimpleKey(const rocksdb::Slice &key, const 
Metadata &metadata, const std::string &bytes,
                                       std::string *restore_cmds) {
-  std::vector<std::string> command = {"SET", key.ToString(), 
bytes.substr(Metadata::GetOffsetAfterExpire(bytes[0]))};
-  if (metadata.expire > 0) {
-    command.emplace_back("PXAT");
-    command.emplace_back(std::to_string(metadata.expire));
+  if (metadata.Type() == kRedisString) {
+    std::vector<std::string> command = {"SET", key.ToString(), 
bytes.substr(Metadata::GetOffsetAfterExpire(bytes[0]))};
+    if (metadata.expire > 0) {
+      command.emplace_back("PXAT");
+      command.emplace_back(std::to_string(metadata.expire));
+    }
+    *restore_cmds += redis::ArrayOfBulkStrings(command);
+    current_pipeline_size_++;
+  } else if (metadata.Type() == kRedisJson) {
+    // kRedisJson
+    JsonValue json_value;
+    if (auto s = redis::Json::FromRawString(bytes, &json_value); !s.ok()) {
+      return {Status::NotOK, s.ToString()};
+    }
+    auto json_bytes = GET_OR_RET(json_value.Dump());
+    std::vector<std::string> command = {"JSON.SET", key.ToString(), "$", 
std::move(json_bytes)};
+    *restore_cmds += redis::ArrayOfBulkStrings(command);
+    current_pipeline_size_++;
+
+    if (metadata.expire > 0) {
+      *restore_cmds += redis::ArrayOfBulkStrings({"PEXPIREAT", key.ToString(), 
std::to_string(metadata.expire)});
+      current_pipeline_size_++;
+    }
+  } else {
+    return {Status::NotOK, "unsupported simple key type"};
   }
-  *restore_cmds += redis::ArrayOfBulkStrings(command);
-  current_pipeline_size_++;
 
   // Check whether pipeline needs to be sent
   // TODO(chrisZMF): Resend data if failed to send data
diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc
index 8ab8f04e..968c7088 100644
--- a/src/storage/batch_extractor.cc
+++ b/src/storage/batch_extractor.cc
@@ -69,6 +69,18 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t 
column_family_id, const Slic
         command_args = {"PEXPIREAT", user_key, 
std::to_string(metadata.expire)};
         
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
       }
+    } else if (metadata.Type() == kRedisJson) {
+      JsonValue json_value;
+      s = redis::Json::FromRawString(value.ToString(), &json_value);
+      if (!s.ok()) return s;
+      auto json_bytes = json_value.Dump();
+      if (!json_bytes) return rocksdb::Status::Corruption(json_bytes.Msg());
+      command_args = {"JSON.SET", user_key, "$", json_bytes.GetValue()};
+      resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
+      if (metadata.expire > 0) {
+        command_args = {"PEXPIREAT", user_key, 
std::to_string(metadata.expire)};
+        
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
+      }
     } else if (metadata.expire > 0) {
       auto args = log_data_.GetArguments();
       if (args->size() > 0) {
diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc
index 26864f3e..38845fef 100644
--- a/src/storage/redis_db.cc
+++ b/src/storage/redis_db.cc
@@ -47,21 +47,10 @@ Database::Database(engine::Storage *storage, std::string ns)
       metadata_cf_handle_(storage->GetCFHandle(ColumnFamilyID::Metadata)),
       namespace_(std::move(ns)) {}
 
-// Some data types may support reading multiple types of metadata.
-// For example, bitmap supports reading string metadata and bitmap metadata.
 rocksdb::Status Database::ParseMetadata(RedisTypes types, Slice *bytes, 
Metadata *metadata) {
   std::string old_metadata;
   metadata->Encode(&old_metadata);
 
-  bool is_keyspace_hit = false;
-  ScopeExit se([this, &is_keyspace_hit] {
-    if (is_keyspace_hit) {
-      storage_->RecordStat(engine::StatType::KeyspaceHits, 1);
-    } else {
-      storage_->RecordStat(engine::StatType::KeyspaceMisses, 1);
-    }
-  });
-
   auto s = metadata->Decode(bytes);
   // delay InvalidArgument error check after type match check
   if (!s.ok() && !s.IsInvalidArgument()) return s;
@@ -85,7 +74,14 @@ rocksdb::Status Database::ParseMetadata(RedisTypes types, 
Slice *bytes, Metadata
     auto _ [[maybe_unused]] = metadata->Decode(old_metadata);
     return rocksdb::Status::NotFound("no element found");
   }
-  is_keyspace_hit = true;
+  return s;
+}
+
+// Some data types may support reading multiple types of metadata.
+// For example, bitmap supports reading string metadata and bitmap metadata.
+rocksdb::Status Database::ParseMetadataWithStats(RedisTypes types, Slice 
*bytes, Metadata *metadata) {
+  auto s = ParseMetadata(types, bytes, metadata);
+  storage_->RecordStat(s.ok() ? engine::StatType::KeyspaceHits : 
engine::StatType::KeyspaceMisses, 1);
   return s;
 }
 
@@ -100,7 +96,7 @@ rocksdb::Status Database::GetMetadata(engine::Context &ctx, 
RedisTypes types, co
   auto s = GetRawMetadata(ctx, ns_key, raw_value);
   *rest = *raw_value;
   if (!s.ok()) return s;
-  return ParseMetadata(types, rest, metadata);
+  return ParseMetadataWithStats(types, rest, metadata);
 }
 
 rocksdb::Status Database::GetRawMetadata(engine::Context &ctx, const Slice 
&ns_key, std::string *bytes) {
diff --git a/src/storage/redis_db.h b/src/storage/redis_db.h
index 41ed3dae..9563b1e1 100644
--- a/src/storage/redis_db.h
+++ b/src/storage/redis_db.h
@@ -75,7 +75,9 @@ class Database {
   explicit Database(engine::Storage *storage, std::string ns = "");
   /// Parsing metadata with type of `types` from bytes, the metadata is a base 
class of all metadata.
   /// When parsing, the bytes will be consumed.
-  [[nodiscard]] rocksdb::Status ParseMetadata(RedisTypes types, Slice *bytes, 
Metadata *metadata);
+  [[nodiscard]] rocksdb::Status ParseMetadataWithStats(RedisTypes types, Slice 
*bytes, Metadata *metadata);
+  // ParseMetadata behaves the same as ParseMetadataWithStats, but without 
recording stats.
+  [[nodiscard]] static rocksdb::Status ParseMetadata(RedisTypes types, Slice 
*bytes, Metadata *metadata);
   /// GetMetadata is a helper function to get metadata from the database. It 
will read the "raw metadata"
   /// from underlying storage, and then parse the raw metadata to the 
specified metadata type.
   ///
diff --git a/src/types/redis_bitmap.cc b/src/types/redis_bitmap.cc
index e75deea3..d7216d7d 100644
--- a/src/types/redis_bitmap.cc
+++ b/src/types/redis_bitmap.cc
@@ -101,7 +101,7 @@ rocksdb::Status Bitmap::GetMetadata(engine::Context &ctx, 
const Slice &ns_key, B
   if (!s.ok()) return s;
 
   Slice slice = *raw_value;
-  return ParseMetadata({kRedisBitmap, kRedisString}, &slice, metadata);
+  return ParseMetadataWithStats({kRedisBitmap, kRedisString}, &slice, 
metadata);
 }
 
 rocksdb::Status Bitmap::GetBit(engine::Context &ctx, const Slice &user_key, 
uint32_t bit_offset, bool *bit) {
diff --git a/src/types/redis_json.cc b/src/types/redis_json.cc
index ba331ef9..43b516ba 100644
--- a/src/types/redis_json.cc
+++ b/src/types/redis_json.cc
@@ -630,7 +630,7 @@ std::vector<rocksdb::Status> 
Json::readMulti(engine::Context &ctx, const std::ve
     if (!statuses[i].ok()) continue;
     Slice rest(pin_values[i].data(), pin_values[i].size());
     JsonMetadata metadata;
-    statuses[i] = ParseMetadata({kRedisJson}, &rest, &metadata);
+    statuses[i] = ParseMetadataWithStats({kRedisJson}, &rest, &metadata);
     if (!statuses[i].ok()) continue;
 
     statuses[i] = parse(metadata, rest, &values[i]);
@@ -674,4 +674,12 @@ rocksdb::Status Json::Resp(engine::Context &ctx, const 
std::string &user_key, co
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status Json::FromRawString(std::string_view value, JsonValue *result) 
{
+  Slice rest = value;
+  JsonMetadata metadata;
+  auto s = ParseMetadata({kRedisJson}, &rest, &metadata);
+  if (!s.ok()) return s;
+  return parse(metadata, rest, result);
+}
+
 }  // namespace redis
diff --git a/src/types/redis_json.h b/src/types/redis_json.h
index 54831472..8b42ca7c 100644
--- a/src/types/redis_json.h
+++ b/src/types/redis_json.h
@@ -82,6 +82,7 @@ class Json : public Database {
 
   rocksdb::Status Resp(engine::Context &ctx, const std::string &user_key, 
const std::string &path,
                        std::vector<std::string> *results, RESP resp);
+  static rocksdb::Status FromRawString(std::string_view value, JsonValue 
*result);
 
  private:
   rocksdb::Status write(engine::Context &ctx, Slice ns_key, JsonMetadata 
*metadata, const JsonValue &json_val);
diff --git a/src/types/redis_string.cc b/src/types/redis_string.cc
index 210fb697..7d7f476d 100644
--- a/src/types/redis_string.cc
+++ b/src/types/redis_string.cc
@@ -47,7 +47,7 @@ std::vector<rocksdb::Status> 
String::getRawValues(engine::Context &ctx, const st
     (*raw_values)[i].assign(pin_values[i].data(), pin_values[i].size());
     Metadata metadata(kRedisNone, false);
     Slice slice = (*raw_values)[i];
-    auto s = ParseMetadata({kRedisString}, &slice, &metadata);
+    auto s = ParseMetadataWithStats({kRedisString}, &slice, &metadata);
     if (!s.ok()) {
       statuses[i] = s;
       (*raw_values)[i].clear();
@@ -65,7 +65,7 @@ rocksdb::Status String::getRawValue(engine::Context &ctx, 
const std::string &ns_
 
   Metadata metadata(kRedisNone, false);
   Slice slice = *raw_value;
-  return ParseMetadata({kRedisString}, &slice, &metadata);
+  return ParseMetadataWithStats({kRedisString}, &slice, &metadata);
 }
 
 rocksdb::Status String::getValueAndExpire(engine::Context &ctx, const 
std::string &ns_key, std::string *value,
diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go 
b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
index d1ddbb3d..e04acfc4 100644
--- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
+++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
@@ -535,7 +535,7 @@ func TestSlotMigrateDataType(t *testing.T) {
 
                testSlot += 1
                keys := make(map[string]string, 0)
-               for _, typ := range []string{"string", "expired_string", 
"list", "hash", "set", "zset", "bitmap", "sortint", "stream"} {
+               for _, typ := range []string{"string", "expired_string", 
"list", "hash", "set", "zset", "bitmap", "sortint", "stream", "json"} {
                        keys[typ] = fmt.Sprintf("%s_{%s}", typ, 
util.SlotTable[testSlot])
                        require.NoError(t, rdb0.Del(ctx, keys[typ]).Err())
                }
@@ -590,8 +590,9 @@ func TestSlotMigrateDataType(t *testing.T) {
                        }).Err())
                }
                require.NoError(t, rdb0.Expire(ctx, keys["stream"], 
10*time.Second).Err())
+               require.NoError(t, rdb0.JSONSet(ctx, keys["json"], "$", `{"a": 
1, "b": "hello"}`).Err())
                // check source data existence
-               for _, typ := range []string{"string", "list", "hash", "set", 
"zset", "bitmap", "sortint", "stream"} {
+               for _, typ := range []string{"string", "list", "hash", "set", 
"zset", "bitmap", "sortint", "stream", "json"} {
                        require.EqualValues(t, 1, rdb0.Exists(ctx, 
keys[typ]).Val())
                }
                // get source data
@@ -653,6 +654,8 @@ func TestSlotMigrateDataType(t *testing.T) {
                require.EqualValues(t, 19, streamInfo.EntriesAdded)
                require.EqualValues(t, "0-0", streamInfo.MaxDeletedEntryID)
                require.EqualValues(t, 19, streamInfo.Length)
+               // type json
+               require.Equal(t, `{"a":1,"b":"hello"}`, rdb1.JSONGet(ctx, 
keys["json"]).Val())
                // topology is changed on source server
                for _, typ := range []string{"string", "list", "hash", "set", 
"zset", "bitmap", "sortint", "stream"} {
                        require.ErrorContains(t, rdb0.Exists(ctx, 
keys[typ]).Err(), "MOVED")

Reply via email to