This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 019faed9 Fallback to use the redis command migration type if the
target don't support the ApplyBatch command (#2117)
019faed9 is described below
commit 019faed9201ea7cc07bb748f0dba9aa645da692c
Author: Myth <[email protected]>
AuthorDate: Wed Feb 28 23:29:42 2024 +0800
Fallback to use the redis command migration type if the target don't
support the ApplyBatch command (#2117)
---
src/cluster/slot_migrate.cc | 47 ++++++++++++++++++++--
src/cluster/slot_migrate.h | 2 +
.../integration/slotmigrate/slotmigrate_test.go | 44 ++++++++++++++++++++
3 files changed, 89 insertions(+), 4 deletions(-)
diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc
index eba2f901..884c8ace 100644
--- a/src/cluster/slot_migrate.cc
+++ b/src/cluster/slot_migrate.cc
@@ -290,24 +290,36 @@ Status SlotMigrator::startMigration() {
return s.Prefixed(errFailedToSetImportStatus);
}
+ migration_type_ = srv_->GetConfig()->migrate_type;
+
+ // If the APPLYBATCH command is not supported on the destination,
+ // we will fall back to the redis-command migration type.
+ if (migration_type_ == MigrationType::kRawKeyValue) {
+ bool supported = GET_OR_RET(supportedApplyBatchCommandOnDstNode(*dst_fd_));
+ if (!supported) {
+ LOG(INFO) << "APPLYBATCH command is not supported, use redis command for
migration";
+ migration_type_ = MigrationType::kRedisCommand;
+ }
+ }
+
LOG(INFO) << "[migrate] Start migrating slot " << migrating_slot_ << ",
connect destination fd " << *dst_fd_;
return Status::OK();
}
Status SlotMigrator::sendSnapshot() {
- if (srv_->GetConfig()->migrate_type == MigrationType::kRedisCommand) {
+ if (migration_type_ == MigrationType::kRedisCommand) {
return sendSnapshotByCmd();
- } else if (srv_->GetConfig()->migrate_type == MigrationType::kRawKeyValue) {
+ } else if (migration_type_ == MigrationType::kRawKeyValue) {
return sendSnapshotByRawKV();
}
return {Status::NotOK, errUnsupportedMigrationType};
}
Status SlotMigrator::syncWAL() {
- if (srv_->GetConfig()->migrate_type == MigrationType::kRedisCommand) {
+ if (migration_type_ == MigrationType::kRedisCommand) {
return syncWALByCmd();
- } else if (srv_->GetConfig()->migrate_type == MigrationType::kRawKeyValue) {
+ } else if (migration_type_ == MigrationType::kRawKeyValue) {
return syncWALByRawKV();
}
return {Status::NotOK, errUnsupportedMigrationType};
@@ -485,6 +497,33 @@ Status SlotMigrator::setImportStatusOnDstNode(int sock_fd,
int status) {
return Status::OK();
}
+StatusOr<bool> SlotMigrator::supportedApplyBatchCommandOnDstNode(int sock_fd) {
+ std::string cmd = redis::ArrayOfBulkStrings({"command", "info",
"applybatch"});
+ auto s = util::SockSend(sock_fd, cmd);
+ if (!s.IsOK()) {
+ return s.Prefixed("failed to send command info to the destination node");
+ }
+
+ UniqueEvbuf evbuf;
+ if (evbuffer_read(evbuf.get(), sock_fd, -1) <= 0) {
+ return Status::FromErrno("read response error");
+ }
+
+ UniqueEvbufReadln line(evbuf.get(), EVBUFFER_EOL_CRLF_STRICT);
+ if (!line) {
+ return Status::FromErrno("read empty response");
+ }
+
+ if (line[0] == '*') {
+ line = UniqueEvbufReadln(evbuf.get(), EVBUFFER_EOL_LF);
+ if (line && line[0] == '*') {
+ return true;
+ }
+ }
+
+ return false;
+}
+
Status SlotMigrator::checkSingleResponse(int sock_fd) { return
checkMultipleResponses(sock_fd, 1); }
// Commands | Response | Instance
diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h
index 8fefdbc9..e1faf404 100644
--- a/src/cluster/slot_migrate.h
+++ b/src/cluster/slot_migrate.h
@@ -119,6 +119,7 @@ class SlotMigrator : public redis::Database {
Status authOnDstNode(int sock_fd, const std::string &password);
Status setImportStatusOnDstNode(int sock_fd, int status);
+ static StatusOr<bool> supportedApplyBatchCommandOnDstNode(int sock_fd);
Status sendSnapshotByCmd();
Status syncWALByCmd();
@@ -187,6 +188,7 @@ class SlotMigrator : public redis::Database {
int dst_port_ = -1;
UniqueFD dst_fd_;
+ MigrationType migration_type_ = MigrationType::kRedisCommand;
std::atomic<int16_t> forbidden_slot_ = -1;
std::atomic<int16_t> migrating_slot_ = -1;
int16_t migrate_failed_slot_ = -1;
diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
index 6901f5f2..d782e64d 100644
--- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go
+++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go
@@ -1116,6 +1116,50 @@ func TestSlotMigrateDataType(t *testing.T) {
})
}
+func TestSlotMigrateTypeFallback(t *testing.T) {
+ ctx := context.Background()
+
+ srv0 := util.StartServer(t, map[string]string{
+ "cluster-enabled": "yes",
+ "migrate-type": "raw-key-value",
+ })
+
+ defer srv0.Close()
+ rdb0 := srv0.NewClient()
+ defer func() { require.NoError(t, rdb0.Close()) }()
+ id0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "setnodeid", id0).Err())
+
+ srv1 := util.StartServer(t, map[string]string{
+ "cluster-enabled": "yes",
+ "rename-command": "APPLYBATCH APPLYBATCH_RENAMED",
+ })
+ defer srv1.Close()
+ rdb1 := srv1.NewClient()
+ defer func() { require.NoError(t, rdb1.Close()) }()
+ id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "setnodeid", id1).Err())
+
+ clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", id0,
srv0.Host(), srv0.Port())
+ clusterNodes += fmt.Sprintf("%s %s %d master -", id1, srv1.Host(),
srv1.Port())
+ require.NoError(t, rdb0.Do(ctx, "clusterx", "setnodes", clusterNodes,
"1").Err())
+ require.NoError(t, rdb1.Do(ctx, "clusterx", "setnodes", clusterNodes,
"1").Err())
+
+ t.Run("MIGRATE - Fall back to redis-command migration type when the
destination does not support APPLYBATCH", func(t *testing.T) {
+ info, err := rdb1.Do(ctx, "command", "info",
"applybatch").Slice()
+ require.NoError(t, err)
+ require.Len(t, info, 1)
+ require.Nil(t, info[0])
+ testSlot += 1
+ key := util.SlotTable[testSlot]
+ value := "value"
+ require.NoError(t, rdb0.Set(ctx, key, value, 0).Err())
+ require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate",
testSlot, id1).Val())
+ waitForMigrateState(t, rdb0, testSlot,
SlotMigrationStateSuccess)
+ require.Equal(t, value, rdb1.Get(ctx, key).Val())
+ })
+}
+
func waitForMigrateState(t testing.TB, client *redis.Client, slot int, state
SlotMigrationState) {
waitForMigrateStateInDuration(t, client, slot, state, 5*time.Second)
}