This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 019faed9 Fallback to use the redis command migration type if the 
target don't support the ApplyBatch command (#2117)
019faed9 is described below

commit 019faed9201ea7cc07bb748f0dba9aa645da692c
Author: Myth <[email protected]>
AuthorDate: Wed Feb 28 23:29:42 2024 +0800

    Fallback to use the redis command migration type if the target don't 
support the ApplyBatch command (#2117)
---
 src/cluster/slot_migrate.cc                        | 47 ++++++++++++++++++++--
 src/cluster/slot_migrate.h                         |  2 +
 .../integration/slotmigrate/slotmigrate_test.go    | 44 ++++++++++++++++++++
 3 files changed, 89 insertions(+), 4 deletions(-)

diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index eba2f901..884c8ace 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -290,24 +290,36 @@ Status SlotMigrator::startMigration() {
     return s.Prefixed(errFailedToSetImportStatus);
   }
 
+  migration_type_ = srv_->GetConfig()->migrate_type;
+
+  // If the APPLYBATCH command is not supported on the destination,
+  // we will fall back to the redis-command migration type.
+  if (migration_type_ == MigrationType::kRawKeyValue) {
+    bool supported = GET_OR_RET(supportedApplyBatchCommandOnDstNode(*dst_fd_));
+    if (!supported) {
+      LOG(INFO) << "APPLYBATCH command is not supported, use redis command for 
migration";
+      migration_type_ = MigrationType::kRedisCommand;
+    }
+  }
+
   LOG(INFO) << "[migrate] Start migrating slot " << migrating_slot_ << ", 
connect destination fd " << *dst_fd_;
 
   return Status::OK();
 }
 
 Status SlotMigrator::sendSnapshot() {
-  if (srv_->GetConfig()->migrate_type == MigrationType::kRedisCommand) {
+  if (migration_type_ == MigrationType::kRedisCommand) {
     return sendSnapshotByCmd();
-  } else if (srv_->GetConfig()->migrate_type == MigrationType::kRawKeyValue) {
+  } else if (migration_type_ == MigrationType::kRawKeyValue) {
     return sendSnapshotByRawKV();
   }
   return {Status::NotOK, errUnsupportedMigrationType};
 }
 
 Status SlotMigrator::syncWAL() {
-  if (srv_->GetConfig()->migrate_type == MigrationType::kRedisCommand) {
+  if (migration_type_ == MigrationType::kRedisCommand) {
     return syncWALByCmd();
-  } else if (srv_->GetConfig()->migrate_type == MigrationType::kRawKeyValue) {
+  } else if (migration_type_ == MigrationType::kRawKeyValue) {
     return syncWALByRawKV();
   }
   return {Status::NotOK, errUnsupportedMigrationType};
@@ -485,6 +497,33 @@ Status SlotMigrator::setImportStatusOnDstNode(int sock_fd, 
int status) {
   return Status::OK();
 }
 
+StatusOr<bool> SlotMigrator::supportedApplyBatchCommandOnDstNode(int sock_fd) {
+  std::string cmd = redis::ArrayOfBulkStrings({"command", "info", 
"applybatch"});
+  auto s = util::SockSend(sock_fd, cmd);
+  if (!s.IsOK()) {
+    return s.Prefixed("failed to send command info to the destination node");
+  }
+
+  UniqueEvbuf evbuf;
+  if (evbuffer_read(evbuf.get(), sock_fd, -1) <= 0) {
+    return Status::FromErrno("read response error");
+  }
+
+  UniqueEvbufReadln line(evbuf.get(), EVBUFFER_EOL_CRLF_STRICT);
+  if (!line) {
+    return Status::FromErrno("read empty response");
+  }
+
+  if (line[0] == '*') {
+    line = UniqueEvbufReadln(evbuf.get(), EVBUFFER_EOL_LF);
+    if (line && line[0] == '*') {
+      return true;
+    }
+  }
+
+  return false;
+}
+
 Status SlotMigrator::checkSingleResponse(int sock_fd) { return 
checkMultipleResponses(sock_fd, 1); }
 
 // Commands  |  Response            |  Instance
diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h
index 8fefdbc9..e1faf404 100644
--- a/src/cluster/slot_migrate.h
+++ b/src/cluster/slot_migrate.h
@@ -119,6 +119,7 @@ class SlotMigrator : public redis::Database {
 
   Status authOnDstNode(int sock_fd, const std::string &password);
   Status setImportStatusOnDstNode(int sock_fd, int status);
+  static StatusOr<bool> supportedApplyBatchCommandOnDstNode(int sock_fd);
 
   Status sendSnapshotByCmd();
   Status syncWALByCmd();
@@ -187,6 +188,7 @@ class SlotMigrator : public redis::Database {
   int dst_port_ = -1;
   UniqueFD dst_fd_;
 
+  MigrationType migration_type_ = MigrationType::kRedisCommand;
   std::atomic<int16_t> forbidden_slot_ = -1;
   std::atomic<int16_t> migrating_slot_ = -1;
   int16_t migrate_failed_slot_ = -1;
diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go 
b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
index 6901f5f2..d782e64d 100644
--- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
+++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
@@ -1116,6 +1116,50 @@ func TestSlotMigrateDataType(t *testing.T) {
        })
 }
 
+func TestSlotMigrateTypeFallback(t *testing.T) {
+       ctx := context.Background()
+
+       srv0 := util.StartServer(t, map[string]string{
+               "cluster-enabled": "yes",
+               "migrate-type":    "raw-key-value",
+       })
+
+       defer srv0.Close()
+       rdb0 := srv0.NewClient()
+       defer func() { require.NoError(t, rdb0.Close()) }()
+       id0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, rdb0.Do(ctx, "clusterx", "setnodeid", id0).Err())
+
+       srv1 := util.StartServer(t, map[string]string{
+               "cluster-enabled": "yes",
+               "rename-command":  "APPLYBATCH APPLYBATCH_RENAMED",
+       })
+       defer srv1.Close()
+       rdb1 := srv1.NewClient()
+       defer func() { require.NoError(t, rdb1.Close()) }()
+       id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, rdb1.Do(ctx, "clusterx", "setnodeid", id1).Err())
+
+       clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", id0, 
srv0.Host(), srv0.Port())
+       clusterNodes += fmt.Sprintf("%s %s %d master -", id1, srv1.Host(), 
srv1.Port())
+       require.NoError(t, rdb0.Do(ctx, "clusterx", "setnodes", clusterNodes, 
"1").Err())
+       require.NoError(t, rdb1.Do(ctx, "clusterx", "setnodes", clusterNodes, 
"1").Err())
+
+       t.Run("MIGRATE - Fall back to redis-command migration type when the 
destination does not support APPLYBATCH", func(t *testing.T) {
+               info, err := rdb1.Do(ctx, "command", "info", 
"applybatch").Slice()
+               require.NoError(t, err)
+               require.Len(t, info, 1)
+               require.Nil(t, info[0])
+               testSlot += 1
+               key := util.SlotTable[testSlot]
+               value := "value"
+               require.NoError(t, rdb0.Set(ctx, key, value, 0).Err())
+               require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", 
testSlot, id1).Val())
+               waitForMigrateState(t, rdb0, testSlot, 
SlotMigrationStateSuccess)
+               require.Equal(t, value, rdb1.Get(ctx, key).Val())
+       })
+}
+
 func waitForMigrateState(t testing.TB, client *redis.Client, slot int, state 
SlotMigrationState) {
        waitForMigrateStateInDuration(t, client, slot, state, 5*time.Second)
 }

Reply via email to