Copilot commented on code in PR #3295:
URL: https://github.com/apache/kvrocks/pull/3295#discussion_r2617794622


##########
tests/gocase/integration/failover/failover_test.go:
##########
@@ -0,0 +1,925 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package failover
+
+import (
+       "context"
+       "fmt"
+       "strings"
+       "testing"
+       "time"
+
+       "github.com/apache/kvrocks/tests/gocase/util"
+       "github.com/redis/go-redis/v9"
+       "github.com/stretchr/testify/require"
+)
+
+// testNameWrapper wraps testing.TB to sanitize test names for MkdirTemp
+// This is needed because subtest names contain "/" which causes MkdirTemp to 
fail
+type testNameWrapper struct {
+       testing.TB
+       sanitizedName string
+}
+
+func (w *testNameWrapper) Name() string {
+       return w.sanitizedName
+}
+
+// sanitizeTestName replaces path separators in test names to avoid issues 
with MkdirTemp
+func sanitizeTestName(tb testing.TB) testing.TB {
+       sanitizedName := strings.ReplaceAll(tb.Name(), "/", "_")
+       return &testNameWrapper{TB: tb, sanitizedName: sanitizedName}
+}
+
+// startServerWithSanitizedName starts a server with a sanitized test name
+func startServerWithSanitizedName(t testing.TB, configs map[string]string) 
*util.KvrocksServer {
+       return util.StartServer(sanitizeTestName(t), configs)
+}
+
+type FailoverState string
+
+const (
+       FailoverStateNone       FailoverState = "none"
+       FailoverStateStarted    FailoverState = "started"
+       FailoverStateCheckSlave FailoverState = "check_slave"
+       FailoverStatePauseWrite FailoverState = "pause_write"
+       FailoverStateWaitSync   FailoverState = "wait_sync"
+       FailoverStateSwitching  FailoverState = "switching"
+       FailoverStateSuccess    FailoverState = "success"
+       FailoverStateFailed     FailoverState = "failed"
+)
+
+func TestFailoverBasicFlow(t *testing.T) {
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { master.Close() }()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+       masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", 
masterID).Err())
+
+       slave := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { slave.Close() }()
+       slaveClient := slave.NewClient()
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+       slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+       clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, 
master.Host(), master.Port())
+       clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), 
slave.Port(), masterID)
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+
+       // Wait for replication to establish
+       require.Eventually(t, func() bool {
+               info := masterClient.Info(ctx, "replication").Val()
+               return strings.Contains(info, "connected_slaves:1")
+       }, 10*time.Second, 100*time.Millisecond)
+
+       t.Run("FAILOVER - Basic failover flow", func(t *testing.T) {
+               // Write some data
+               require.NoError(t, masterClient.Set(ctx, "key1", "value1", 
0).Err())
+               require.NoError(t, masterClient.Set(ctx, "key2", "value2", 
0).Err())
+
+               // Start failover
+               result := masterClient.Do(ctx, "clusterx", "failover", slaveID)
+               if result.Err() != nil {
+                       t.Logf("FAILOVER command error: %v", result.Err())
+               }
+               require.NoError(t, result.Err(), "FAILOVER command should 
succeed")
+               require.Equal(t, "OK", result.Val())
+
+               // Wait for failover to complete
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
10*time.Second)
+
+               // Verify slots are migrated (MOVED response)
+               require.ErrorContains(t, masterClient.Set(ctx, "key1", 
"newvalue", 0).Err(), "MOVED")
+               require.ErrorContains(t, masterClient.Get(ctx, "key1").Err(), 
"MOVED")
+
+               // Verify data is accessible on new master (slave)
+               require.Equal(t, "value1", slaveClient.Get(ctx, "key1").Val())
+               require.Equal(t, "value2", slaveClient.Get(ctx, "key2").Val())
+       })
+
+       t.Run("FAILOVER - Failover with custom timeout", func(t *testing.T) {
+               // Reset failover state by updating topology
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               // Start failover with custom timeout
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "5000").Val())
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
10*time.Second)
+       })
+}
+
+func TestFailoverFailureCases(t *testing.T) {
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { master.Close() }()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+       masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", 
masterID).Err())
+
+       t.Run("FAILOVER - Failover to non-existent node", func(t *testing.T) {
+               clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383", 
masterID, master.Host(), master.Port())
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+
+               nonExistentID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx99"
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", nonExistentID).Val())
+               waitForFailoverState(t, masterClient, FailoverStateFailed, 
5*time.Second)
+       })
+
+       t.Run("FAILOVER - Failover to non-slave node", func(t *testing.T) {
+               slave := startServerWithSanitizedName(t, 
map[string]string{"cluster-enabled": "yes"})
+               defer func() { slave.Close() }()
+               slaveClient := slave.NewClient()
+               defer func() { require.NoError(t, slaveClient.Close()) }()
+               slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+               // Set slave as master (not slave)
+               clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", 
masterID, master.Host(), master.Port())
+               clusterNodes += fmt.Sprintf("%s %s %d master -", slaveID, 
slave.Host(), slave.Port())
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID).Val())
+               waitForFailoverState(t, masterClient, FailoverStateFailed, 
5*time.Second)
+       })
+
+       t.Run("FAILOVER - Invalid timeout value", func(t *testing.T) {
+               slave := startServerWithSanitizedName(t, 
map[string]string{"cluster-enabled": "yes"})
+               defer func() { slave.Close() }()
+               slaveClient := slave.NewClient()
+               defer func() { require.NoError(t, slaveClient.Close()) }()
+               slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+               clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", 
masterID, master.Host(), master.Port())
+               clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, 
slave.Host(), slave.Port(), masterID)
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "3").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "3").Err())
+
+               // Negative timeout should return error
+               require.Error(t, masterClient.Do(ctx, "clusterx", "failover", 
slaveID, "-1").Err())
+       })
+
+       t.Run("FAILOVER - Different timeout values", func(t *testing.T) {
+               slave := startServerWithSanitizedName(t, 
map[string]string{"cluster-enabled": "yes"})
+               defer func() { slave.Close() }()
+               slaveClient := slave.NewClient()
+               defer func() { require.NoError(t, slaveClient.Close()) }()
+               slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+               clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", 
masterID, master.Host(), master.Port())
+               clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, 
slave.Host(), slave.Port(), masterID)
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "4").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "4").Err())
+
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               // Test with timeout = 0 when slave is already synced (lag=0)
+               // When lag=0, failover should succeed because no waiting is 
needed
+               // But if slave has lag, it will fail
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "0").Val())
+               // Wait for either success or failed state
+               require.Eventually(t, func() bool {
+                       info := masterClient.ClusterInfo(ctx).Val()
+                       return strings.Contains(info, 
"cluster_failover_state:success") ||
+                               strings.Contains(info, 
"cluster_failover_state:failed")
+               }, 5*time.Second, 100*time.Millisecond)
+
+               // Check final state - can be success (if lag=0) or failed (if 
lag>0)
+               finalInfo := masterClient.ClusterInfo(ctx).Val()
+               if strings.Contains(finalInfo, 
"cluster_failover_state:success") {
+                       t.Logf("timeout=0 with lag=0: failover succeeded as 
expected")
+               } else if strings.Contains(finalInfo, 
"cluster_failover_state:failed") {
+                       t.Logf("timeout=0: failover failed (slave may have 
lag)")
+               }
+
+               // Reset for next test
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "5").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "5").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               // Test with timeout = 0 when slave has lag
+               // Create lag by writing data to master without waiting for sync
+               for i := 0; i < 100; i++ {
+                       require.NoError(t, masterClient.Set(ctx, 
fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i), 0).Err())
+               }
+               // Don't wait for sync, start failover immediately to create 
lag scenario
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "0").Val())
+               // With lag, timeout=0 should fail
+               waitForFailoverState(t, masterClient, FailoverStateFailed, 
5*time.Second)
+
+               // Reset and test with small timeout
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "6").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "6").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "100").Val())
+               // Small timeout may fail, but should start
+               time.Sleep(200 * time.Millisecond)
+               info := masterClient.ClusterInfo(ctx).Val()
+               require.True(t, strings.Contains(info, 
"cluster_failover_state:failed") ||
+                       strings.Contains(info, 
"cluster_failover_state:success") ||
+                       strings.Contains(info, 
"cluster_failover_state:wait_sync") ||
+                       strings.Contains(info, 
"cluster_failover_state:switching"))
+
+               // Reset and test with large timeout
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "7").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "7").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "10000").Val())
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
15*time.Second)
+       })
+}
+
+func TestFailoverConcurrency(t *testing.T) {
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { master.Close() }()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+       masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", 
masterID).Err())
+
+       slave := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { slave.Close() }()
+       slaveClient := slave.NewClient()
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+       slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+       clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, 
master.Host(), master.Port())
+       clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), 
slave.Port(), masterID)
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+
+       require.Eventually(t, func() bool {
+               info := masterClient.Info(ctx, "replication").Val()
+               return strings.Contains(info, "connected_slaves:1")
+       }, 10*time.Second, 100*time.Millisecond)
+
+       t.Run("FAILOVER - Cannot start failover when one is in progress", 
func(t *testing.T) {
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID).Val())
+
+               // Try to start another failover immediately - should return 
error
+               // Wait a bit to ensure first failover has started
+               time.Sleep(100 * time.Millisecond)
+               result := masterClient.Do(ctx, "clusterx", "failover", slaveID)
+               // The second call may return OK but won't start a new failover
+               // We verify the first one completes successfully
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
10*time.Second)
+               _ = result // Use result to avoid unused variable

Review Comment:
   The variable 'result' is assigned but its value is never used except to 
avoid an "unused variable" error. The comment "Use result to avoid unused 
variable" confirms this is intentional dead code. Consider removing this 
variable entirely or using it to verify expected behavior (e.g., checking if 
the second failover attempt returns an error or OK).
   ```suggestion
                // The second call may return OK but won't start a new failover,
                // or it may return an error indicating a failover is already 
in progress.
                val, err := result.Result()
                if err != nil {
                        require.Contains(t, err.Error(), "failover already in 
progress")
                } else {
                        require.Equal(t, "OK", val)
                }
                // We verify the first one completes successfully
                waitForFailoverState(t, masterClient, FailoverStateSuccess, 
10*time.Second)
   ```



##########
src/cluster/cluster_failover.cc:
##########
@@ -0,0 +1,269 @@
+#include "cluster_failover.h"
+
+#include <unistd.h>
+
+#include "cluster/cluster.h"
+#include "common/io_util.h"
+#include "common/time_util.h"
+#include "logging.h"
+#include "server/redis_reply.h"
+#include "server/server.h"
+
+ClusterFailover::ClusterFailover(Server *srv) : srv_(srv) {
+  t_ = std::thread([this]() { loop(); });
+}
+
+ClusterFailover::~ClusterFailover() {
+  {
+    std::lock_guard<std::mutex> lock(mutex_);
+    stop_thread_ = true;
+    cv_.notify_all();
+  }
+  if (t_.joinable()) t_.join();
+}
+
+Status ClusterFailover::Run(std::string slave_node_id, int timeout_ms) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  if (state_ != FailoverState::kNone && state_ != FailoverState::kFailed) {
+    return {Status::NotOK, "Failover is already in progress"};
+  }
+
+  slave_node_id_ = std::move(slave_node_id);
+  timeout_ms_ = timeout_ms;
+  state_ = FailoverState::kStarted;
+  failover_job_triggered_ = true;
+  cv_.notify_all();
+  return Status::OK();
+}
+
+void ClusterFailover::loop() {
+  while (true) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    cv_.wait(lock, [this]() { return stop_thread_ || failover_job_triggered_; 
});
+
+    if (stop_thread_) return;
+
+    if (failover_job_triggered_) {
+      failover_job_triggered_ = false;
+      lock.unlock();
+      runFailoverProcess();
+    }
+  }
+}
+
+void ClusterFailover::runFailoverProcess() {
+  auto ip_port = srv_->cluster->GetNodeIPPort(slave_node_id_);
+  if (!ip_port.IsOK()) {
+    error("[Failover] slave node not found in cluster {}", slave_node_id_);
+    abortFailover("Slave node not found in cluster");
+    return;
+  }
+  node_ip_port_ = ip_port.GetValue().first + ":" + 
std::to_string(ip_port.GetValue().second);
+  node_ip_ = ip_port.GetValue().first;
+  node_port_ = ip_port.GetValue().second;
+  info("[Failover] slave node {} {} failover state: {}", slave_node_id_, 
node_ip_port_, static_cast<int>(state_.load()));
+  state_ = FailoverState::kCheck;
+
+  auto s = checkSlaveStatus();
+  if (!s.IsOK()) {
+    abortFailover(s.Msg());
+    return;
+  }
+
+  s = checkSlaveLag();
+  if (!s.IsOK()) {
+    abortFailover("Slave lag check failed: " + s.Msg());
+    return;
+  }
+
+  info("[Failover] slave node {} {} check slave status success, enter pause 
state", slave_node_id_, node_ip_port_);
+  start_time_ms_ = util::GetTimeStampMS();
+  // Enter Pause state (Stop writing)
+  state_ = FailoverState::kPause;
+  // Get current sequence
+  target_seq_ = srv_->storage->LatestSeqNumber();
+  info("[Failover] slave node {} {} target sequence {}", slave_node_id_, 
node_ip_port_, target_seq_);
+
+  state_ = FailoverState::kSyncWait;
+  s = waitReplicationSync();
+  if (!s.IsOK()) {
+    abortFailover(s.Msg());
+    return;
+  }
+  info("[Failover] slave node {} {} wait replication sync success, enter 
switch state, cost {} ms", slave_node_id_,
+       node_ip_port_, util::GetTimeStampMS() - start_time_ms_);
+
+  state_ = FailoverState::kSwitch;
+  s = sendTakeoverCmd();
+  if (!s.IsOK()) {
+    abortFailover(s.Msg());
+    return;
+  }
+
+  // Redirect slots
+  srv_->cluster->SetMySlotsMigrated(node_ip_port_);
+
+  state_ = FailoverState::kSuccess;
+  info("[Failover] success {} {}", slave_node_id_, node_ip_port_);
+}
+
+Status ClusterFailover::checkSlaveLag() {
+  auto start_offset_status = srv_->GetSlaveReplicationOffset(node_ip_port_);
+  if (!start_offset_status.IsOK()) {
+    return {Status::NotOK, "Failed to get slave offset: " + 
start_offset_status.Msg()};
+  }
+  uint64_t start_offset = *start_offset_status;
+  int64_t start_sampling_ms = util::GetTimeStampMS();
+
+  // Wait 3s or half of timeout, but at least a bit to measure speed
+  int64_t wait_time = std::max(100, std::min(3000, timeout_ms_ / 2));
+  std::this_thread::sleep_for(std::chrono::milliseconds(wait_time));
+
+  auto end_offset_status = srv_->GetSlaveReplicationOffset(node_ip_port_);
+  if (!end_offset_status.IsOK()) {
+    return {Status::NotOK, "Failed to get slave offset: " + 
end_offset_status.Msg()};
+  }
+  uint64_t end_offset = *end_offset_status;
+  int64_t end_sampling_ms = util::GetTimeStampMS();
+
+  double elapsed_sec = (end_sampling_ms - start_sampling_ms) / 1000.0;
+  if (elapsed_sec <= 0) elapsed_sec = 0.001;
+
+  uint64_t bytes = 0;
+  if (end_offset > start_offset) bytes = end_offset - start_offset;
+  double speed = bytes / elapsed_sec;
+
+  uint64_t master_seq = srv_->storage->LatestSeqNumber();
+  uint64_t lag = 0;
+  if (master_seq > end_offset) lag = master_seq - end_offset;
+
+  if (lag == 0) return Status::OK();
+
+  if (speed <= 0.1) {  // Basically 0
+    return {Status::NotOK, fmt::format("Slave is not replicating (lag: {})", 
lag)};
+  }
+
+  double required_sec = lag / speed;
+  int64_t required_ms = static_cast<int64_t>(required_sec * 1000);
+
+  int64_t elapsed_total = end_sampling_ms - start_sampling_ms;
+  int64_t remaining = timeout_ms_ - elapsed_total;
+
+  if (required_ms > remaining) {
+    return {Status::NotOK, fmt::format("Estimated catchup time {}ms > 
remaining time {}ms (lag: {}, speed: {:.2f}/s)",
+                                       required_ms, remaining, lag, speed)};
+  }
+
+  info("[Failover] check: lag={}, speed={:.2f}/s, estimated_time={}ms, 
remaining={}ms", lag, speed, required_ms,
+       remaining);
+  return Status::OK();
+}
+
+Status ClusterFailover::checkSlaveStatus() {
+  // We could try to connect, but GetSlaveReplicationOffset checks connection.
+  auto offset = srv_->GetSlaveReplicationOffset(node_ip_port_);
+  if (!offset.IsOK()) {
+    error("[Failover] slave node {} {} not connected or not syncing", 
slave_node_id_, node_ip_port_);
+    return {Status::NotOK, "Slave not connected or not syncing"};
+  }
+  info("[Failover] slave node {} {} is connected and syncing offset {}", 
slave_node_id_, node_ip_port_, offset.Msg());
+  return Status::OK();
+}
+
+Status ClusterFailover::waitReplicationSync() {
+  while (true) {
+    if (util::GetTimeStampMS() - start_time_ms_ > 
static_cast<u_int64_t>(timeout_ms_)) {

Review Comment:
   Using 'u_int64_t' is non-standard. The standard type is 'uint64_t' (without 
underscore). While u_int64_t may be defined on some systems (like BSD), it's 
not portable. Use uint64_t for consistency with the rest of the codebase.



##########
src/cluster/cluster_failover.cc:
##########
@@ -0,0 +1,269 @@
+#include "cluster_failover.h"
+
+#include <unistd.h>
+
+#include "cluster/cluster.h"
+#include "common/io_util.h"
+#include "common/time_util.h"
+#include "logging.h"
+#include "server/redis_reply.h"
+#include "server/server.h"
+
+ClusterFailover::ClusterFailover(Server *srv) : srv_(srv) {
+  t_ = std::thread([this]() { loop(); });
+}
+
+ClusterFailover::~ClusterFailover() {
+  {
+    std::lock_guard<std::mutex> lock(mutex_);
+    stop_thread_ = true;
+    cv_.notify_all();
+  }
+  if (t_.joinable()) t_.join();
+}
+
+Status ClusterFailover::Run(std::string slave_node_id, int timeout_ms) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  if (state_ != FailoverState::kNone && state_ != FailoverState::kFailed) {
+    return {Status::NotOK, "Failover is already in progress"};
+  }
+
+  slave_node_id_ = std::move(slave_node_id);
+  timeout_ms_ = timeout_ms;
+  state_ = FailoverState::kStarted;
+  failover_job_triggered_ = true;
+  cv_.notify_all();
+  return Status::OK();
+}
+
+void ClusterFailover::loop() {
+  while (true) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    cv_.wait(lock, [this]() { return stop_thread_ || failover_job_triggered_; 
});
+
+    if (stop_thread_) return;
+
+    if (failover_job_triggered_) {
+      failover_job_triggered_ = false;
+      lock.unlock();
+      runFailoverProcess();
+    }
+  }
+}
+
+void ClusterFailover::runFailoverProcess() {
+  auto ip_port = srv_->cluster->GetNodeIPPort(slave_node_id_);
+  if (!ip_port.IsOK()) {
+    error("[Failover] slave node not found in cluster {}", slave_node_id_);
+    abortFailover("Slave node not found in cluster");
+    return;
+  }
+  node_ip_port_ = ip_port.GetValue().first + ":" + 
std::to_string(ip_port.GetValue().second);
+  node_ip_ = ip_port.GetValue().first;
+  node_port_ = ip_port.GetValue().second;
+  info("[Failover] slave node {} {} failover state: {}", slave_node_id_, 
node_ip_port_, static_cast<int>(state_.load()));
+  state_ = FailoverState::kCheck;
+
+  auto s = checkSlaveStatus();
+  if (!s.IsOK()) {
+    abortFailover(s.Msg());
+    return;
+  }
+
+  s = checkSlaveLag();
+  if (!s.IsOK()) {
+    abortFailover("Slave lag check failed: " + s.Msg());
+    return;
+  }
+
+  info("[Failover] slave node {} {} check slave status success, enter pause 
state", slave_node_id_, node_ip_port_);
+  start_time_ms_ = util::GetTimeStampMS();
+  // Enter Pause state (Stop writing)
+  state_ = FailoverState::kPause;
+  // Get current sequence
+  target_seq_ = srv_->storage->LatestSeqNumber();
+  info("[Failover] slave node {} {} target sequence {}", slave_node_id_, 
node_ip_port_, target_seq_);
+
+  state_ = FailoverState::kSyncWait;
+  s = waitReplicationSync();
+  if (!s.IsOK()) {
+    abortFailover(s.Msg());
+    return;
+  }
+  info("[Failover] slave node {} {} wait replication sync success, enter 
switch state, cost {} ms", slave_node_id_,
+       node_ip_port_, util::GetTimeStampMS() - start_time_ms_);
+
+  state_ = FailoverState::kSwitch;
+  s = sendTakeoverCmd();
+  if (!s.IsOK()) {
+    abortFailover(s.Msg());
+    return;
+  }
+
+  // Redirect slots
+  srv_->cluster->SetMySlotsMigrated(node_ip_port_);
+
+  state_ = FailoverState::kSuccess;
+  info("[Failover] success {} {}", slave_node_id_, node_ip_port_);
+}
+
+Status ClusterFailover::checkSlaveLag() {
+  auto start_offset_status = srv_->GetSlaveReplicationOffset(node_ip_port_);
+  if (!start_offset_status.IsOK()) {
+    return {Status::NotOK, "Failed to get slave offset: " + 
start_offset_status.Msg()};
+  }
+  uint64_t start_offset = *start_offset_status;
+  int64_t start_sampling_ms = util::GetTimeStampMS();
+
+  // Wait 3s or half of timeout, but at least a bit to measure speed
+  int64_t wait_time = std::max(100, std::min(3000, timeout_ms_ / 2));
+  std::this_thread::sleep_for(std::chrono::milliseconds(wait_time));
+
+  auto end_offset_status = srv_->GetSlaveReplicationOffset(node_ip_port_);
+  if (!end_offset_status.IsOK()) {
+    return {Status::NotOK, "Failed to get slave offset: " + 
end_offset_status.Msg()};
+  }
+  uint64_t end_offset = *end_offset_status;
+  int64_t end_sampling_ms = util::GetTimeStampMS();
+
+  double elapsed_sec = (end_sampling_ms - start_sampling_ms) / 1000.0;
+  if (elapsed_sec <= 0) elapsed_sec = 0.001;
+
+  uint64_t bytes = 0;
+  if (end_offset > start_offset) bytes = end_offset - start_offset;
+  double speed = bytes / elapsed_sec;
+
+  uint64_t master_seq = srv_->storage->LatestSeqNumber();
+  uint64_t lag = 0;
+  if (master_seq > end_offset) lag = master_seq - end_offset;
+
+  if (lag == 0) return Status::OK();
+
+  if (speed <= 0.1) {  // Basically 0
+    return {Status::NotOK, fmt::format("Slave is not replicating (lag: {})", 
lag)};
+  }
+
+  double required_sec = lag / speed;
+  int64_t required_ms = static_cast<int64_t>(required_sec * 1000);
+
+  int64_t elapsed_total = end_sampling_ms - start_sampling_ms;
+  int64_t remaining = timeout_ms_ - elapsed_total;
+
+  if (required_ms > remaining) {
+    return {Status::NotOK, fmt::format("Estimated catchup time {}ms > 
remaining time {}ms (lag: {}, speed: {:.2f}/s)",
+                                       required_ms, remaining, lag, speed)};
+  }
+
+  info("[Failover] check: lag={}, speed={:.2f}/s, estimated_time={}ms, 
remaining={}ms", lag, speed, required_ms,
+       remaining);
+  return Status::OK();
+}
+
+Status ClusterFailover::checkSlaveStatus() {
+  // We could try to connect, but GetSlaveReplicationOffset checks connection.
+  auto offset = srv_->GetSlaveReplicationOffset(node_ip_port_);
+  if (!offset.IsOK()) {
+    error("[Failover] slave node {} {} not connected or not syncing", 
slave_node_id_, node_ip_port_);
+    return {Status::NotOK, "Slave not connected or not syncing"};
+  }
+  info("[Failover] slave node {} {} is connected and syncing offset {}", 
slave_node_id_, node_ip_port_, offset.Msg());

Review Comment:
   The log message uses offset.Msg() which is intended for error messages, not 
for displaying the actual offset value. The StatusOr type returns Msg() for 
error messages. To log the actual offset value, use *offset instead.
   ```suggestion
     info("[Failover] slave node {} {} is connected and syncing offset {}", 
slave_node_id_, node_ip_port_, *offset);
   ```



##########
tests/gocase/integration/failover/failover_test.go:
##########
@@ -0,0 +1,925 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package failover
+
+import (
+       "context"
+       "fmt"
+       "strings"
+       "testing"
+       "time"
+
+       "github.com/apache/kvrocks/tests/gocase/util"
+       "github.com/redis/go-redis/v9"
+       "github.com/stretchr/testify/require"
+)
+
+// testNameWrapper wraps testing.TB to sanitize test names for MkdirTemp
+// This is needed because subtest names contain "/" which causes MkdirTemp to 
fail
+type testNameWrapper struct {
+       testing.TB
+       sanitizedName string
+}
+
+func (w *testNameWrapper) Name() string {
+       return w.sanitizedName
+}
+
+// sanitizeTestName replaces path separators in test names to avoid issues 
with MkdirTemp
+func sanitizeTestName(tb testing.TB) testing.TB {
+       sanitizedName := strings.ReplaceAll(tb.Name(), "/", "_")
+       return &testNameWrapper{TB: tb, sanitizedName: sanitizedName}
+}
+
+// startServerWithSanitizedName starts a server with a sanitized test name
+func startServerWithSanitizedName(t testing.TB, configs map[string]string) 
*util.KvrocksServer {
+       return util.StartServer(sanitizeTestName(t), configs)
+}
+
+type FailoverState string
+
+const (
+       FailoverStateNone       FailoverState = "none"
+       FailoverStateStarted    FailoverState = "started"
+       FailoverStateCheckSlave FailoverState = "check_slave"
+       FailoverStatePauseWrite FailoverState = "pause_write"
+       FailoverStateWaitSync   FailoverState = "wait_sync"
+       FailoverStateSwitching  FailoverState = "switching"
+       FailoverStateSuccess    FailoverState = "success"
+       FailoverStateFailed     FailoverState = "failed"
+)
+
+func TestFailoverBasicFlow(t *testing.T) {
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { master.Close() }()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+       masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", 
masterID).Err())
+
+       slave := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { slave.Close() }()
+       slaveClient := slave.NewClient()
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+       slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+       clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, 
master.Host(), master.Port())
+       clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), 
slave.Port(), masterID)
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+
+       // Wait for replication to establish
+       require.Eventually(t, func() bool {
+               info := masterClient.Info(ctx, "replication").Val()
+               return strings.Contains(info, "connected_slaves:1")
+       }, 10*time.Second, 100*time.Millisecond)
+
+       t.Run("FAILOVER - Basic failover flow", func(t *testing.T) {
+               // Write some data
+               require.NoError(t, masterClient.Set(ctx, "key1", "value1", 
0).Err())
+               require.NoError(t, masterClient.Set(ctx, "key2", "value2", 
0).Err())
+
+               // Start failover
+               result := masterClient.Do(ctx, "clusterx", "failover", slaveID)
+               if result.Err() != nil {
+                       t.Logf("FAILOVER command error: %v", result.Err())
+               }
+               require.NoError(t, result.Err(), "FAILOVER command should 
succeed")
+               require.Equal(t, "OK", result.Val())
+
+               // Wait for failover to complete
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
10*time.Second)
+
+               // Verify slots are migrated (MOVED response)
+               require.ErrorContains(t, masterClient.Set(ctx, "key1", 
"newvalue", 0).Err(), "MOVED")
+               require.ErrorContains(t, masterClient.Get(ctx, "key1").Err(), 
"MOVED")
+
+               // Verify data is accessible on new master (slave)
+               require.Equal(t, "value1", slaveClient.Get(ctx, "key1").Val())
+               require.Equal(t, "value2", slaveClient.Get(ctx, "key2").Val())
+       })
+
+       t.Run("FAILOVER - Failover with custom timeout", func(t *testing.T) {
+               // Reset failover state by updating topology
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               // Start failover with custom timeout
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "5000").Val())
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
10*time.Second)
+       })
+}
+
+func TestFailoverFailureCases(t *testing.T) {
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { master.Close() }()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+       masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", 
masterID).Err())
+
+       t.Run("FAILOVER - Failover to non-existent node", func(t *testing.T) {
+               clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383", 
masterID, master.Host(), master.Port())
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+
+               nonExistentID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx99"
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", nonExistentID).Val())
+               waitForFailoverState(t, masterClient, FailoverStateFailed, 
5*time.Second)
+       })
+
+       t.Run("FAILOVER - Failover to non-slave node", func(t *testing.T) {
+               slave := startServerWithSanitizedName(t, 
map[string]string{"cluster-enabled": "yes"})
+               defer func() { slave.Close() }()
+               slaveClient := slave.NewClient()
+               defer func() { require.NoError(t, slaveClient.Close()) }()
+               slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+               // Set slave as master (not slave)
+               clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", 
masterID, master.Host(), master.Port())
+               clusterNodes += fmt.Sprintf("%s %s %d master -", slaveID, 
slave.Host(), slave.Port())
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID).Val())
+               waitForFailoverState(t, masterClient, FailoverStateFailed, 
5*time.Second)
+       })
+
+       t.Run("FAILOVER - Invalid timeout value", func(t *testing.T) {
+               slave := startServerWithSanitizedName(t, 
map[string]string{"cluster-enabled": "yes"})
+               defer func() { slave.Close() }()
+               slaveClient := slave.NewClient()
+               defer func() { require.NoError(t, slaveClient.Close()) }()
+               slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+               clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", 
masterID, master.Host(), master.Port())
+               clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, 
slave.Host(), slave.Port(), masterID)
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "3").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "3").Err())
+
+               // Negative timeout should return error
+               require.Error(t, masterClient.Do(ctx, "clusterx", "failover", 
slaveID, "-1").Err())
+       })
+
+       t.Run("FAILOVER - Different timeout values", func(t *testing.T) {
+               slave := startServerWithSanitizedName(t, 
map[string]string{"cluster-enabled": "yes"})
+               defer func() { slave.Close() }()
+               slaveClient := slave.NewClient()
+               defer func() { require.NoError(t, slaveClient.Close()) }()
+               slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+               clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", 
masterID, master.Host(), master.Port())
+               clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, 
slave.Host(), slave.Port(), masterID)
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "4").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "4").Err())
+
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               // Test with timeout = 0 when slave is already synced (lag=0)
+               // When lag=0, failover should succeed because no waiting is 
needed
+               // But if slave has lag, it will fail
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "0").Val())
+               // Wait for either success or failed state
+               require.Eventually(t, func() bool {
+                       info := masterClient.ClusterInfo(ctx).Val()
+                       return strings.Contains(info, 
"cluster_failover_state:success") ||
+                               strings.Contains(info, 
"cluster_failover_state:failed")
+               }, 5*time.Second, 100*time.Millisecond)
+
+               // Check final state - can be success (if lag=0) or failed (if 
lag>0)
+               finalInfo := masterClient.ClusterInfo(ctx).Val()
+               if strings.Contains(finalInfo, 
"cluster_failover_state:success") {
+                       t.Logf("timeout=0 with lag=0: failover succeeded as 
expected")
+               } else if strings.Contains(finalInfo, 
"cluster_failover_state:failed") {
+                       t.Logf("timeout=0: failover failed (slave may have 
lag)")
+               }
+
+               // Reset for next test
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "5").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "5").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               // Test with timeout = 0 when slave has lag
+               // Create lag by writing data to master without waiting for sync
+               for i := 0; i < 100; i++ {
+                       require.NoError(t, masterClient.Set(ctx, 
fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i), 0).Err())
+               }
+               // Don't wait for sync, start failover immediately to create 
lag scenario
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "0").Val())
+               // With lag, timeout=0 should fail
+               waitForFailoverState(t, masterClient, FailoverStateFailed, 
5*time.Second)
+
+               // Reset and test with small timeout
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "6").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "6").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "100").Val())
+               // Small timeout may fail, but should start
+               time.Sleep(200 * time.Millisecond)
+               info := masterClient.ClusterInfo(ctx).Val()
+               require.True(t, strings.Contains(info, 
"cluster_failover_state:failed") ||
+                       strings.Contains(info, 
"cluster_failover_state:success") ||
+                       strings.Contains(info, 
"cluster_failover_state:wait_sync") ||
+                       strings.Contains(info, 
"cluster_failover_state:switching"))
+
+               // Reset and test with large timeout
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "7").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "7").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "10000").Val())
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
15*time.Second)
+       })
+}
+
+func TestFailoverConcurrency(t *testing.T) {
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { master.Close() }()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+       masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", 
masterID).Err())
+
+       slave := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { slave.Close() }()
+       slaveClient := slave.NewClient()
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+       slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+       clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, 
master.Host(), master.Port())
+       clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), 
slave.Port(), masterID)
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+
+       require.Eventually(t, func() bool {
+               info := masterClient.Info(ctx, "replication").Val()
+               return strings.Contains(info, "connected_slaves:1")
+       }, 10*time.Second, 100*time.Millisecond)
+
+       t.Run("FAILOVER - Cannot start failover when one is in progress", 
func(t *testing.T) {
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID).Val())
+
+               // Try to start another failover immediately - should return 
error
+               // Wait a bit to ensure first failover has started
+               time.Sleep(100 * time.Millisecond)
+               result := masterClient.Do(ctx, "clusterx", "failover", slaveID)
+               // The second call may return OK but won't start a new failover
+               // We verify the first one completes successfully
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
10*time.Second)
+               _ = result // Use result to avoid unused variable
+       })
+
+       t.Run("FAILOVER - Can restart after failure", func(t *testing.T) {
+               // Reset state
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               // Start a failover with very short timeout
+               // If slave is synced (lag=0), it may succeed; if slave has 
lag, it will fail
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "1").Val())
+               // Accept either success or failed state
+               require.Eventually(t, func() bool {
+                       info := masterClient.ClusterInfo(ctx).Val()
+                       return strings.Contains(info, 
"cluster_failover_state:success") ||
+                               strings.Contains(info, 
"cluster_failover_state:failed")
+               }, 5*time.Second, 100*time.Millisecond)
+
+               // Can restart after failure
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "3").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "3").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "10000").Val())
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
15*time.Second)
+       })
+}
+
+func TestFailoverWriteBlocking(t *testing.T) {
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { master.Close() }()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+       masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", 
masterID).Err())
+
+       slave := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { slave.Close() }()
+       slaveClient := slave.NewClient()
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+       slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+       clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, 
master.Host(), master.Port())
+       clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), 
slave.Port(), masterID)
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+
+       require.Eventually(t, func() bool {
+               info := masterClient.Info(ctx, "replication").Val()
+               return strings.Contains(info, "connected_slaves:1")
+       }, 10*time.Second, 100*time.Millisecond)
+
+       t.Run("FAILOVER - Write requests blocked during failover", func(t 
*testing.T) {
+               // Write initial data
+               require.NoError(t, masterClient.Set(ctx, "testkey", 
"testvalue", 0).Err())
+
+               // Start failover with long timeout to observe blocking
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "10000").Val())
+
+               // Try to write during failover - should return TRYAGAIN in 
blocking states
+               // Poll for blocking state (pause_write, wait_sync, or 
switching)
+               var writeBlocked bool
+               for i := 0; i < 50; i++ {
+                       time.Sleep(50 * time.Millisecond)
+                       err := masterClient.Set(ctx, "testkey", "newvalue", 
0).Err()
+                       if err != nil && (strings.Contains(err.Error(), 
"TRYAGAIN") || strings.Contains(err.Error(), "Failover in progress")) {
+                               writeBlocked = true
+                               break
+                       }
+                       // Check if failover already completed
+                       info := masterClient.ClusterInfo(ctx).Val()
+                       if strings.Contains(info, 
"cluster_failover_state:success") {
+                               break
+                       }
+               }
+               // At least one write should have been blocked, or failover 
completed very quickly
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
15*time.Second)
+
+               // After success, writes should return MOVED
+               require.ErrorContains(t, masterClient.Set(ctx, "testkey", 
"newvalue2", 0).Err(), "MOVED")
+               _ = writeBlocked // May be false if failover was very fast

Review Comment:
   The variable 'writeBlocked' is assigned but never used in a meaningful way. 
It's set to true when a write is blocked during failover, but the comment 
acknowledges it may be false if failover was very fast. This appears to be dead 
code that should either be removed or used in an assertion to verify the 
expected behavior.



##########
tests/gocase/integration/failover/failover_test.go:
##########
@@ -0,0 +1,925 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package failover
+
+import (
+       "context"
+       "fmt"
+       "strings"
+       "testing"
+       "time"
+
+       "github.com/apache/kvrocks/tests/gocase/util"
+       "github.com/redis/go-redis/v9"
+       "github.com/stretchr/testify/require"
+)
+
+// testNameWrapper wraps testing.TB to sanitize test names for MkdirTemp
+// This is needed because subtest names contain "/" which causes MkdirTemp to 
fail
+type testNameWrapper struct {
+       testing.TB
+       sanitizedName string
+}
+
+func (w *testNameWrapper) Name() string {
+       return w.sanitizedName
+}
+
+// sanitizeTestName replaces path separators in test names to avoid issues 
with MkdirTemp
+func sanitizeTestName(tb testing.TB) testing.TB {
+       sanitizedName := strings.ReplaceAll(tb.Name(), "/", "_")
+       return &testNameWrapper{TB: tb, sanitizedName: sanitizedName}
+}
+
+// startServerWithSanitizedName starts a server with a sanitized test name
+func startServerWithSanitizedName(t testing.TB, configs map[string]string) 
*util.KvrocksServer {
+       return util.StartServer(sanitizeTestName(t), configs)
+}
+
+type FailoverState string
+
+const (
+       FailoverStateNone       FailoverState = "none"
+       FailoverStateStarted    FailoverState = "started"
+       FailoverStateCheckSlave FailoverState = "check_slave"
+       FailoverStatePauseWrite FailoverState = "pause_write"
+       FailoverStateWaitSync   FailoverState = "wait_sync"
+       FailoverStateSwitching  FailoverState = "switching"
+       FailoverStateSuccess    FailoverState = "success"
+       FailoverStateFailed     FailoverState = "failed"
+)
+
+func TestFailoverBasicFlow(t *testing.T) {
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { master.Close() }()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+       masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", 
masterID).Err())
+
+       slave := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { slave.Close() }()
+       slaveClient := slave.NewClient()
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+       slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+       clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, 
master.Host(), master.Port())
+       clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), 
slave.Port(), masterID)
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+
+       // Wait for replication to establish
+       require.Eventually(t, func() bool {
+               info := masterClient.Info(ctx, "replication").Val()
+               return strings.Contains(info, "connected_slaves:1")
+       }, 10*time.Second, 100*time.Millisecond)
+
+       t.Run("FAILOVER - Basic failover flow", func(t *testing.T) {
+               // Write some data
+               require.NoError(t, masterClient.Set(ctx, "key1", "value1", 
0).Err())
+               require.NoError(t, masterClient.Set(ctx, "key2", "value2", 
0).Err())
+
+               // Start failover
+               result := masterClient.Do(ctx, "clusterx", "failover", slaveID)
+               if result.Err() != nil {
+                       t.Logf("FAILOVER command error: %v", result.Err())
+               }
+               require.NoError(t, result.Err(), "FAILOVER command should 
succeed")
+               require.Equal(t, "OK", result.Val())
+
+               // Wait for failover to complete
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
10*time.Second)
+
+               // Verify slots are migrated (MOVED response)
+               require.ErrorContains(t, masterClient.Set(ctx, "key1", 
"newvalue", 0).Err(), "MOVED")
+               require.ErrorContains(t, masterClient.Get(ctx, "key1").Err(), 
"MOVED")
+
+               // Verify data is accessible on new master (slave)
+               require.Equal(t, "value1", slaveClient.Get(ctx, "key1").Val())
+               require.Equal(t, "value2", slaveClient.Get(ctx, "key2").Val())
+       })
+
+       t.Run("FAILOVER - Failover with custom timeout", func(t *testing.T) {
+               // Reset failover state by updating topology
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               // Start failover with custom timeout
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "5000").Val())
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
10*time.Second)
+       })
+}
+
+func TestFailoverFailureCases(t *testing.T) {
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { master.Close() }()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+       masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", 
masterID).Err())
+
+       t.Run("FAILOVER - Failover to non-existent node", func(t *testing.T) {
+               clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383", 
masterID, master.Host(), master.Port())
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+
+               nonExistentID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx99"
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", nonExistentID).Val())
+               waitForFailoverState(t, masterClient, FailoverStateFailed, 
5*time.Second)
+       })
+
+       t.Run("FAILOVER - Failover to non-slave node", func(t *testing.T) {
+               slave := startServerWithSanitizedName(t, 
map[string]string{"cluster-enabled": "yes"})
+               defer func() { slave.Close() }()
+               slaveClient := slave.NewClient()
+               defer func() { require.NoError(t, slaveClient.Close()) }()
+               slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+               // Set slave as master (not slave)
+               clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", 
masterID, master.Host(), master.Port())
+               clusterNodes += fmt.Sprintf("%s %s %d master -", slaveID, 
slave.Host(), slave.Port())
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID).Val())
+               waitForFailoverState(t, masterClient, FailoverStateFailed, 
5*time.Second)
+       })
+
+       t.Run("FAILOVER - Invalid timeout value", func(t *testing.T) {
+               slave := startServerWithSanitizedName(t, 
map[string]string{"cluster-enabled": "yes"})
+               defer func() { slave.Close() }()
+               slaveClient := slave.NewClient()
+               defer func() { require.NoError(t, slaveClient.Close()) }()
+               slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+               clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", 
masterID, master.Host(), master.Port())
+               clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, 
slave.Host(), slave.Port(), masterID)
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "3").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "3").Err())
+
+               // Negative timeout should return error
+               require.Error(t, masterClient.Do(ctx, "clusterx", "failover", 
slaveID, "-1").Err())
+       })
+
+       t.Run("FAILOVER - Different timeout values", func(t *testing.T) {
+               slave := startServerWithSanitizedName(t, 
map[string]string{"cluster-enabled": "yes"})
+               defer func() { slave.Close() }()
+               slaveClient := slave.NewClient()
+               defer func() { require.NoError(t, slaveClient.Close()) }()
+               slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+               clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", 
masterID, master.Host(), master.Port())
+               clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, 
slave.Host(), slave.Port(), masterID)
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "4").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "4").Err())
+
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               // Test with timeout = 0 when slave is already synced (lag=0)
+               // When lag=0, failover should succeed because no waiting is 
needed
+               // But if slave has lag, it will fail
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "0").Val())
+               // Wait for either success or failed state
+               require.Eventually(t, func() bool {
+                       info := masterClient.ClusterInfo(ctx).Val()
+                       return strings.Contains(info, 
"cluster_failover_state:success") ||
+                               strings.Contains(info, 
"cluster_failover_state:failed")
+               }, 5*time.Second, 100*time.Millisecond)
+
+               // Check final state - can be success (if lag=0) or failed (if 
lag>0)
+               finalInfo := masterClient.ClusterInfo(ctx).Val()
+               if strings.Contains(finalInfo, 
"cluster_failover_state:success") {
+                       t.Logf("timeout=0 with lag=0: failover succeeded as 
expected")
+               } else if strings.Contains(finalInfo, 
"cluster_failover_state:failed") {
+                       t.Logf("timeout=0: failover failed (slave may have 
lag)")
+               }
+
+               // Reset for next test
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "5").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "5").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               // Test with timeout = 0 when slave has lag
+               // Create lag by writing data to master without waiting for sync
+               for i := 0; i < 100; i++ {
+                       require.NoError(t, masterClient.Set(ctx, 
fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i), 0).Err())
+               }
+               // Don't wait for sync, start failover immediately to create 
lag scenario
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "0").Val())
+               // With lag, timeout=0 should fail
+               waitForFailoverState(t, masterClient, FailoverStateFailed, 
5*time.Second)
+
+               // Reset and test with small timeout
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "6").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "6").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "100").Val())
+               // Small timeout may fail, but should start
+               time.Sleep(200 * time.Millisecond)
+               info := masterClient.ClusterInfo(ctx).Val()
+               require.True(t, strings.Contains(info, 
"cluster_failover_state:failed") ||
+                       strings.Contains(info, 
"cluster_failover_state:success") ||
+                       strings.Contains(info, 
"cluster_failover_state:wait_sync") ||
+                       strings.Contains(info, 
"cluster_failover_state:switching"))
+
+               // Reset and test with large timeout
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "7").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "7").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "10000").Val())
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
15*time.Second)
+       })
+}
+
+func TestFailoverConcurrency(t *testing.T) {
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { master.Close() }()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+       masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", 
masterID).Err())
+
+       slave := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { slave.Close() }()
+       slaveClient := slave.NewClient()
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+       slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+       clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, 
master.Host(), master.Port())
+       clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), 
slave.Port(), masterID)
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+
+       require.Eventually(t, func() bool {
+               info := masterClient.Info(ctx, "replication").Val()
+               return strings.Contains(info, "connected_slaves:1")
+       }, 10*time.Second, 100*time.Millisecond)
+
+       t.Run("FAILOVER - Cannot start failover when one is in progress", 
func(t *testing.T) {
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID).Val())
+
+               // Try to start another failover immediately - should return 
error
+               // Wait a bit to ensure first failover has started
+               time.Sleep(100 * time.Millisecond)
+               result := masterClient.Do(ctx, "clusterx", "failover", slaveID)
+               // The second call may return OK but won't start a new failover
+               // We verify the first one completes successfully
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
10*time.Second)
+               _ = result // Use result to avoid unused variable
+       })
+
+       t.Run("FAILOVER - Can restart after failure", func(t *testing.T) {
+               // Reset state
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               // Start a failover with very short timeout
+               // If slave is synced (lag=0), it may succeed; if slave has 
lag, it will fail
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "1").Val())
+               // Accept either success or failed state
+               require.Eventually(t, func() bool {
+                       info := masterClient.ClusterInfo(ctx).Val()
+                       return strings.Contains(info, 
"cluster_failover_state:success") ||
+                               strings.Contains(info, 
"cluster_failover_state:failed")
+               }, 5*time.Second, 100*time.Millisecond)
+
+               // Can restart after failure
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "3").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "3").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "10000").Val())
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
15*time.Second)
+       })
+}
+
+func TestFailoverWriteBlocking(t *testing.T) {
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { master.Close() }()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+       masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", 
masterID).Err())
+
+       slave := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { slave.Close() }()
+       slaveClient := slave.NewClient()
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+       slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+       clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, 
master.Host(), master.Port())
+       clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), 
slave.Port(), masterID)
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+
+       require.Eventually(t, func() bool {
+               info := masterClient.Info(ctx, "replication").Val()
+               return strings.Contains(info, "connected_slaves:1")
+       }, 10*time.Second, 100*time.Millisecond)
+
+       t.Run("FAILOVER - Write requests blocked during failover", func(t 
*testing.T) {
+               // Write initial data
+               require.NoError(t, masterClient.Set(ctx, "testkey", 
"testvalue", 0).Err())
+
+               // Start failover with long timeout to observe blocking
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "10000").Val())
+
+               // Try to write during failover - should return TRYAGAIN in 
blocking states
+               // Poll for blocking state (pause_write, wait_sync, or 
switching)
+               var writeBlocked bool
+               for i := 0; i < 50; i++ {
+                       time.Sleep(50 * time.Millisecond)
+                       err := masterClient.Set(ctx, "testkey", "newvalue", 
0).Err()
+                       if err != nil && (strings.Contains(err.Error(), 
"TRYAGAIN") || strings.Contains(err.Error(), "Failover in progress")) {
+                               writeBlocked = true
+                               break
+                       }
+                       // Check if failover already completed
+                       info := masterClient.ClusterInfo(ctx).Val()
+                       if strings.Contains(info, 
"cluster_failover_state:success") {
+                               break
+                       }
+               }
+               // At least one write should have been blocked, or failover 
completed very quickly
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
15*time.Second)
+
+               // After success, writes should return MOVED
+               require.ErrorContains(t, masterClient.Set(ctx, "testkey", 
"newvalue2", 0).Err(), "MOVED")
+               _ = writeBlocked // May be false if failover was very fast
+       })
+
+       t.Run("FAILOVER - Read requests not blocked during failover", func(t 
*testing.T) {
+               // Reset state
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               require.NoError(t, masterClient.Set(ctx, "readkey", 
"readvalue", 0).Err())
+
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "10000").Val())
+
+               // Reads should work during failover (not blocked)
+               // Try reading multiple times during failover
+               for i := 0; i < 10; i++ {
+                       time.Sleep(100 * time.Millisecond)
+                       val := masterClient.Get(ctx, "readkey").Val()
+                       // Value should be accessible (may return empty if 
already moved, but shouldn't error with TRYAGAIN)
+                       _ = val
+                       // Check if failover completed
+                       info := masterClient.ClusterInfo(ctx).Val()
+                       if strings.Contains(info, 
"cluster_failover_state:success") {
+                               break
+                       }
+               }
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
15*time.Second)
+       })
+}
+
+func TestFailoverWithAuth(t *testing.T) {
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{
+               "cluster-enabled": "yes",
+               "requirepass":     "password123",
+       })
+       defer func() { master.Close() }()
+       masterClient := master.NewClient()
+       masterClient = redis.NewClient(&redis.Options{
+               Addr:     master.HostPort(),
+               Password: "password123",
+       })
+       defer func() { require.NoError(t, masterClient.Close()) }()
+       masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", 
masterID).Err())
+
+       slave := startServerWithSanitizedName(t, map[string]string{
+               "cluster-enabled": "yes",
+               "requirepass":     "password123",
+               "masterauth":      "password123",
+       })
+       defer func() { slave.Close() }()
+       slaveClient := slave.NewClient()
+       slaveClient = redis.NewClient(&redis.Options{
+               Addr:     slave.HostPort(),
+               Password: "password123",
+       })
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+       slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+       clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, 
master.Host(), master.Port())
+       clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), 
slave.Port(), masterID)
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+
+       require.Eventually(t, func() bool {
+               info := masterClient.Info(ctx, "replication").Val()
+               return strings.Contains(info, "connected_slaves:1")
+       }, 10*time.Second, 100*time.Millisecond)
+
+       t.Run("FAILOVER - Failover with authentication", func(t *testing.T) {
+               require.NoError(t, masterClient.Set(ctx, "authkey", 
"authvalue", 0).Err())
+
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID).Val())
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
10*time.Second)
+
+               // Verify data on new master
+               require.Equal(t, "authvalue", slaveClient.Get(ctx, 
"authkey").Val())
+       })
+}
+
+func TestFailoverStateQuery(t *testing.T) {
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { master.Close() }()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+       masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", 
masterID).Err())
+
+       slave := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { slave.Close() }()
+       slaveClient := slave.NewClient()
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+       slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+       clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, 
master.Host(), master.Port())
+       clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), 
slave.Port(), masterID)
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+
+       require.Eventually(t, func() bool {
+               info := masterClient.Info(ctx, "replication").Val()
+               return strings.Contains(info, "connected_slaves:1")
+       }, 10*time.Second, 100*time.Millisecond)
+
+       t.Run("FAILOVER - Query failover state via CLUSTER INFO", func(t 
*testing.T) {
+               // Initial state should be none
+               info := masterClient.ClusterInfo(ctx).Val()
+               require.Contains(t, info, "cluster_failover_state:none")
+
+               // Start failover
+               result := masterClient.Do(ctx, "clusterx", "failover", slaveID)
+               if result.Err() != nil {
+                       t.Logf("FAILOVER command error: %v", result.Err())
+               }
+               require.NoError(t, result.Err(), "FAILOVER command should 
succeed")
+               require.Equal(t, "OK", result.Val())
+
+               // Wait for success
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
10*time.Second)
+
+               // Verify state is success
+               info = masterClient.ClusterInfo(ctx).Val()
+               require.Contains(t, info, "cluster_failover_state:success")
+       })
+
+       t.Run("FAILOVER - All state transitions", func(t *testing.T) {
+               // Reset state
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               // Initial state: none
+               info := masterClient.ClusterInfo(ctx).Val()
+               require.Contains(t, info, "cluster_failover_state:none")
+
+               // Start failover
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID).Val())
+
+               // We may catch intermediate states, but they're very fast
+               // The important thing is we transition through them and end at 
success
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
10*time.Second)
+
+               // Verify final state
+               info = masterClient.ClusterInfo(ctx).Val()
+               require.Contains(t, info, "cluster_failover_state:success")
+       })
+}
+
+func TestFailoverTakeoverCommand(t *testing.T) {
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { master.Close() }()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+       masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", 
masterID).Err())
+
+       slave := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { slave.Close() }()
+       slaveClient := slave.NewClient()
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+       slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+       clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, 
master.Host(), master.Port())
+       clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), 
slave.Port(), masterID)
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+
+       require.Eventually(t, func() bool {
+               info := masterClient.Info(ctx, "replication").Val()
+               return strings.Contains(info, "connected_slaves:1")
+       }, 10*time.Second, 100*time.Millisecond)
+
+       t.Run("FAILOVER - TAKEOVER command on slave", func(t *testing.T) {
+               // Slave should accept TAKEOVER command
+               require.Equal(t, "OK", slaveClient.Do(ctx, "clusterx", 
"takeover").Val())
+
+               // Verify imported slots are set
+               // After takeover, slave should be able to serve the slots
+               require.NoError(t, masterClient.Set(ctx, "takeoverkey", 
"takeovervalue", 0).Err())
+
+               // Start failover to test the full flow
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID).Val())
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
10*time.Second)
+       })
+}
+
+func TestFailoverDataConsistency(t *testing.T) {
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { master.Close() }()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+       masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", 
masterID).Err())
+
+       slave := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { slave.Close() }()
+       slaveClient := slave.NewClient()
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+       slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+       clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, 
master.Host(), master.Port())
+       clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), 
slave.Port(), masterID)
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+
+       require.Eventually(t, func() bool {
+               info := masterClient.Info(ctx, "replication").Val()
+               return strings.Contains(info, "connected_slaves:1")
+       }, 10*time.Second, 100*time.Millisecond)
+
+       t.Run("FAILOVER - Data consistency after failover", func(t *testing.T) {
+               // Write various types of data
+               require.NoError(t, masterClient.Set(ctx, "string_key", 
"string_value", 0).Err())
+               require.NoError(t, masterClient.LPush(ctx, "list_key", "item1", 
"item2", "item3").Err())
+               require.NoError(t, masterClient.HSet(ctx, "hash_key", "field1", 
"value1", "field2", "value2").Err())
+               require.NoError(t, masterClient.SAdd(ctx, "set_key", "member1", 
"member2").Err())
+               require.NoError(t, masterClient.ZAdd(ctx, "zset_key", 
redis.Z{Score: 1.0, Member: "member1"}).Err())
+
+               // Start failover with longer timeout to ensure data sync
+               result := masterClient.Do(ctx, "clusterx", "failover", slaveID, 
"10000")
+               if result.Err() != nil {
+                       t.Logf("FAILOVER command error: %v", result.Err())
+               }
+               require.NoError(t, result.Err(), "FAILOVER command should 
succeed")
+               require.Equal(t, "OK", result.Val())
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
15*time.Second)
+
+               // Verify all data is on new master
+               require.Equal(t, "string_value", slaveClient.Get(ctx, 
"string_key").Val())
+               require.EqualValues(t, []string{"item3", "item2", "item1"}, 
slaveClient.LRange(ctx, "list_key", 0, -1).Val())
+               require.Equal(t, map[string]string{"field1": "value1", 
"field2": "value2"}, slaveClient.HGetAll(ctx, "hash_key").Val())
+               require.EqualValues(t, []string{"member1", "member2"}, 
slaveClient.SMembers(ctx, "set_key").Val())
+               require.EqualValues(t, []redis.Z{{Score: 1.0, Member: 
"member1"}}, slaveClient.ZRangeWithScores(ctx, "zset_key", 0, -1).Val())
+       })
+}
+
+func TestFailoverStateReset(t *testing.T) {
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { master.Close() }()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+       masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", 
masterID).Err())
+
+       slave := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { slave.Close() }()
+       slaveClient := slave.NewClient()
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+       slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+       clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, 
master.Host(), master.Port())
+       clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), 
slave.Port(), masterID)
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+
+       require.Eventually(t, func() bool {
+               info := masterClient.Info(ctx, "replication").Val()
+               return strings.Contains(info, "connected_slaves:1")
+       }, 10*time.Second, 100*time.Millisecond)
+
+       t.Run("FAILOVER - State reset after SETNODES", func(t *testing.T) {
+               // Start and complete failover
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID).Val())
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
10*time.Second)
+
+               // Update topology (simulating controller update)
+               newClusterNodes := fmt.Sprintf("%s %s %d slave %s\n", masterID, 
master.Host(), master.Port(), slaveID)
+               newClusterNodes += fmt.Sprintf("%s %s %d master - 0-16383", 
slaveID, slave.Host(), slave.Port())
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
newClusterNodes, "2").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
newClusterNodes, "2").Err())
+
+               // State should be reset to none
+               // After SETNODES, the original master becomes slave, and 
original slave becomes master
+               // Wait for replication relationship to be re-established
+               // The new master (slaveClient) should have a slave connection
+               require.Eventually(t, func() bool {
+                       info := slaveClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 20*time.Second, 200*time.Millisecond)
+
+               // Verify failover state is reset to none on the new master
+               info := slaveClient.ClusterInfo(ctx).Val()
+               require.Contains(t, info, "cluster_failover_state:none")
+       })
+}
+
+// Helper functions
+
+func waitForFailoverState(t testing.TB, client *redis.Client, state 
FailoverState, timeout time.Duration) {
+       var lastInfo string
+       require.Eventually(t, func() bool {
+               info := client.ClusterInfo(context.Background()).Val()
+               if info != lastInfo && strings.Contains(info, 
"cluster_failover_state:") {
+                       // Log state changes for debugging
+                       t.Logf("Failover state: %s", info)
+                       lastInfo = info
+               }
+               return strings.Contains(info, 
fmt.Sprintf("cluster_failover_state:%s", state))
+       }, timeout, 100*time.Millisecond)
+}
+
+func requireFailoverState(t testing.TB, client *redis.Client, state 
FailoverState) {
+       info := client.ClusterInfo(context.Background()).Val()
+       require.Contains(t, info, fmt.Sprintf("cluster_failover_state:%s", 
state))
+}

Review Comment:
   The function 'requireFailoverState' is defined but never called anywhere in 
the test file. This is dead code that should be removed unless it's intended 
for future use, in which case it should be marked with a TODO comment or 
removed until needed.
   ```suggestion
   
   ```



##########
tests/gocase/integration/failover/failover_test.go:
##########
@@ -0,0 +1,925 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package failover
+
+import (
+       "context"
+       "fmt"
+       "strings"
+       "testing"
+       "time"
+
+       "github.com/apache/kvrocks/tests/gocase/util"
+       "github.com/redis/go-redis/v9"
+       "github.com/stretchr/testify/require"
+)
+
+// testNameWrapper wraps testing.TB to sanitize test names for MkdirTemp
+// This is needed because subtest names contain "/" which causes MkdirTemp to 
fail
+type testNameWrapper struct {
+       testing.TB
+       sanitizedName string
+}
+
+func (w *testNameWrapper) Name() string {
+       return w.sanitizedName
+}
+
+// sanitizeTestName replaces path separators in test names to avoid issues 
with MkdirTemp
+func sanitizeTestName(tb testing.TB) testing.TB {
+       sanitizedName := strings.ReplaceAll(tb.Name(), "/", "_")
+       return &testNameWrapper{TB: tb, sanitizedName: sanitizedName}
+}
+
+// startServerWithSanitizedName starts a server with a sanitized test name
+func startServerWithSanitizedName(t testing.TB, configs map[string]string) 
*util.KvrocksServer {
+       return util.StartServer(sanitizeTestName(t), configs)
+}
+
+type FailoverState string
+
+const (
+       FailoverStateNone       FailoverState = "none"
+       FailoverStateStarted    FailoverState = "started"
+       FailoverStateCheckSlave FailoverState = "check_slave"
+       FailoverStatePauseWrite FailoverState = "pause_write"
+       FailoverStateWaitSync   FailoverState = "wait_sync"
+       FailoverStateSwitching  FailoverState = "switching"
+       FailoverStateSuccess    FailoverState = "success"
+       FailoverStateFailed     FailoverState = "failed"
+)
+
+func TestFailoverBasicFlow(t *testing.T) {
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { master.Close() }()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+       masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", 
masterID).Err())
+
+       slave := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { slave.Close() }()
+       slaveClient := slave.NewClient()
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+       slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+       clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, 
master.Host(), master.Port())
+       clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), 
slave.Port(), masterID)
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+
+       // Wait for replication to establish
+       require.Eventually(t, func() bool {
+               info := masterClient.Info(ctx, "replication").Val()
+               return strings.Contains(info, "connected_slaves:1")
+       }, 10*time.Second, 100*time.Millisecond)
+
+       t.Run("FAILOVER - Basic failover flow", func(t *testing.T) {
+               // Write some data
+               require.NoError(t, masterClient.Set(ctx, "key1", "value1", 
0).Err())
+               require.NoError(t, masterClient.Set(ctx, "key2", "value2", 
0).Err())
+
+               // Start failover
+               result := masterClient.Do(ctx, "clusterx", "failover", slaveID)
+               if result.Err() != nil {
+                       t.Logf("FAILOVER command error: %v", result.Err())
+               }
+               require.NoError(t, result.Err(), "FAILOVER command should 
succeed")
+               require.Equal(t, "OK", result.Val())
+
+               // Wait for failover to complete
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
10*time.Second)
+
+               // Verify slots are migrated (MOVED response)
+               require.ErrorContains(t, masterClient.Set(ctx, "key1", 
"newvalue", 0).Err(), "MOVED")
+               require.ErrorContains(t, masterClient.Get(ctx, "key1").Err(), 
"MOVED")
+
+               // Verify data is accessible on new master (slave)
+               require.Equal(t, "value1", slaveClient.Get(ctx, "key1").Val())
+               require.Equal(t, "value2", slaveClient.Get(ctx, "key2").Val())
+       })
+
+       t.Run("FAILOVER - Failover with custom timeout", func(t *testing.T) {
+               // Reset failover state by updating topology
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               // Start failover with custom timeout
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "5000").Val())
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
10*time.Second)
+       })
+}
+
+func TestFailoverFailureCases(t *testing.T) {
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { master.Close() }()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+       masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", 
masterID).Err())
+
+       t.Run("FAILOVER - Failover to non-existent node", func(t *testing.T) {
+               clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383", 
masterID, master.Host(), master.Port())
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+
+               nonExistentID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx99"
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", nonExistentID).Val())
+               waitForFailoverState(t, masterClient, FailoverStateFailed, 
5*time.Second)
+       })
+
+       t.Run("FAILOVER - Failover to non-slave node", func(t *testing.T) {
+               slave := startServerWithSanitizedName(t, 
map[string]string{"cluster-enabled": "yes"})
+               defer func() { slave.Close() }()
+               slaveClient := slave.NewClient()
+               defer func() { require.NoError(t, slaveClient.Close()) }()
+               slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+               // Set slave as master (not slave)
+               clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", 
masterID, master.Host(), master.Port())
+               clusterNodes += fmt.Sprintf("%s %s %d master -", slaveID, 
slave.Host(), slave.Port())
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID).Val())
+               waitForFailoverState(t, masterClient, FailoverStateFailed, 
5*time.Second)
+       })
+
+       t.Run("FAILOVER - Invalid timeout value", func(t *testing.T) {
+               slave := startServerWithSanitizedName(t, 
map[string]string{"cluster-enabled": "yes"})
+               defer func() { slave.Close() }()
+               slaveClient := slave.NewClient()
+               defer func() { require.NoError(t, slaveClient.Close()) }()
+               slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+               clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", 
masterID, master.Host(), master.Port())
+               clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, 
slave.Host(), slave.Port(), masterID)
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "3").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "3").Err())
+
+               // Negative timeout should return error
+               require.Error(t, masterClient.Do(ctx, "clusterx", "failover", 
slaveID, "-1").Err())
+       })
+
+       t.Run("FAILOVER - Different timeout values", func(t *testing.T) {
+               slave := startServerWithSanitizedName(t, 
map[string]string{"cluster-enabled": "yes"})
+               defer func() { slave.Close() }()
+               slaveClient := slave.NewClient()
+               defer func() { require.NoError(t, slaveClient.Close()) }()
+               slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+               clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", 
masterID, master.Host(), master.Port())
+               clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, 
slave.Host(), slave.Port(), masterID)
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "4").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "4").Err())
+
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               // Test with timeout = 0 when slave is already synced (lag=0)
+               // When lag=0, failover should succeed because no waiting is 
needed
+               // But if slave has lag, it will fail
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "0").Val())
+               // Wait for either success or failed state
+               require.Eventually(t, func() bool {
+                       info := masterClient.ClusterInfo(ctx).Val()
+                       return strings.Contains(info, 
"cluster_failover_state:success") ||
+                               strings.Contains(info, 
"cluster_failover_state:failed")
+               }, 5*time.Second, 100*time.Millisecond)
+
+               // Check final state - can be success (if lag=0) or failed (if 
lag>0)
+               finalInfo := masterClient.ClusterInfo(ctx).Val()
+               if strings.Contains(finalInfo, 
"cluster_failover_state:success") {
+                       t.Logf("timeout=0 with lag=0: failover succeeded as 
expected")
+               } else if strings.Contains(finalInfo, 
"cluster_failover_state:failed") {
+                       t.Logf("timeout=0: failover failed (slave may have 
lag)")
+               }
+
+               // Reset for next test
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "5").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "5").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               // Test with timeout = 0 when slave has lag
+               // Create lag by writing data to master without waiting for sync
+               for i := 0; i < 100; i++ {
+                       require.NoError(t, masterClient.Set(ctx, 
fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i), 0).Err())
+               }
+               // Don't wait for sync, start failover immediately to create 
lag scenario
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "0").Val())
+               // With lag, timeout=0 should fail
+               waitForFailoverState(t, masterClient, FailoverStateFailed, 
5*time.Second)
+
+               // Reset and test with small timeout
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "6").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "6").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "100").Val())
+               // Small timeout may fail, but should start
+               time.Sleep(200 * time.Millisecond)
+               info := masterClient.ClusterInfo(ctx).Val()
+               require.True(t, strings.Contains(info, 
"cluster_failover_state:failed") ||
+                       strings.Contains(info, 
"cluster_failover_state:success") ||
+                       strings.Contains(info, 
"cluster_failover_state:wait_sync") ||
+                       strings.Contains(info, 
"cluster_failover_state:switching"))
+
+               // Reset and test with large timeout
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "7").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "7").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "10000").Val())
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
15*time.Second)
+       })
+}
+
+func TestFailoverConcurrency(t *testing.T) {
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { master.Close() }()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+       masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", 
masterID).Err())
+
+       slave := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { slave.Close() }()
+       slaveClient := slave.NewClient()
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+       slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+       clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, 
master.Host(), master.Port())
+       clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), 
slave.Port(), masterID)
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+
+       require.Eventually(t, func() bool {
+               info := masterClient.Info(ctx, "replication").Val()
+               return strings.Contains(info, "connected_slaves:1")
+       }, 10*time.Second, 100*time.Millisecond)
+
+       t.Run("FAILOVER - Cannot start failover when one is in progress", 
func(t *testing.T) {
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID).Val())
+
+               // Try to start another failover immediately - should return 
error
+               // Wait a bit to ensure first failover has started
+               time.Sleep(100 * time.Millisecond)
+               result := masterClient.Do(ctx, "clusterx", "failover", slaveID)
+               // The second call may return OK but won't start a new failover
+               // We verify the first one completes successfully
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
10*time.Second)
+               _ = result // Use result to avoid unused variable
+       })
+
+       t.Run("FAILOVER - Can restart after failure", func(t *testing.T) {
+               // Reset state
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               // Start a failover with very short timeout
+               // If slave is synced (lag=0), it may succeed; if slave has 
lag, it will fail
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "1").Val())
+               // Accept either success or failed state
+               require.Eventually(t, func() bool {
+                       info := masterClient.ClusterInfo(ctx).Val()
+                       return strings.Contains(info, 
"cluster_failover_state:success") ||
+                               strings.Contains(info, 
"cluster_failover_state:failed")
+               }, 5*time.Second, 100*time.Millisecond)
+
+               // Can restart after failure
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "3").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "3").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "10000").Val())
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
15*time.Second)
+       })
+}
+
+func TestFailoverWriteBlocking(t *testing.T) {
+       ctx := context.Background()
+
+       master := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { master.Close() }()
+       masterClient := master.NewClient()
+       defer func() { require.NoError(t, masterClient.Close()) }()
+       masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", 
masterID).Err())
+
+       slave := util.StartServer(t, map[string]string{"cluster-enabled": 
"yes"})
+       defer func() { slave.Close() }()
+       slaveClient := slave.NewClient()
+       defer func() { require.NoError(t, slaveClient.Close()) }()
+       slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", 
slaveID).Err())
+
+       clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, 
master.Host(), master.Port())
+       clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), 
slave.Port(), masterID)
+       require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+       require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "1").Err())
+
+       require.Eventually(t, func() bool {
+               info := masterClient.Info(ctx, "replication").Val()
+               return strings.Contains(info, "connected_slaves:1")
+       }, 10*time.Second, 100*time.Millisecond)
+
+       t.Run("FAILOVER - Write requests blocked during failover", func(t 
*testing.T) {
+               // Write initial data
+               require.NoError(t, masterClient.Set(ctx, "testkey", 
"testvalue", 0).Err())
+
+               // Start failover with long timeout to observe blocking
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "10000").Val())
+
+               // Try to write during failover - should return TRYAGAIN in 
blocking states
+               // Poll for blocking state (pause_write, wait_sync, or 
switching)
+               var writeBlocked bool
+               for i := 0; i < 50; i++ {
+                       time.Sleep(50 * time.Millisecond)
+                       err := masterClient.Set(ctx, "testkey", "newvalue", 
0).Err()
+                       if err != nil && (strings.Contains(err.Error(), 
"TRYAGAIN") || strings.Contains(err.Error(), "Failover in progress")) {
+                               writeBlocked = true
+                               break
+                       }
+                       // Check if failover already completed
+                       info := masterClient.ClusterInfo(ctx).Val()
+                       if strings.Contains(info, 
"cluster_failover_state:success") {
+                               break
+                       }
+               }
+               // At least one write should have been blocked, or failover 
completed very quickly
+               waitForFailoverState(t, masterClient, FailoverStateSuccess, 
15*time.Second)
+
+               // After success, writes should return MOVED
+               require.ErrorContains(t, masterClient.Set(ctx, "testkey", 
"newvalue2", 0).Err(), "MOVED")
+               _ = writeBlocked // May be false if failover was very fast
+       })
+
+       t.Run("FAILOVER - Read requests not blocked during failover", func(t 
*testing.T) {
+               // Reset state
+               require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", 
clusterNodes, "2").Err())
+               require.Eventually(t, func() bool {
+                       info := masterClient.Info(ctx, "replication").Val()
+                       return strings.Contains(info, "connected_slaves:1")
+               }, 10*time.Second, 100*time.Millisecond)
+
+               require.NoError(t, masterClient.Set(ctx, "readkey", 
"readvalue", 0).Err())
+
+               require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", 
"failover", slaveID, "10000").Val())
+
+               // Reads should work during failover (not blocked)
+               // Try reading multiple times during failover
+               for i := 0; i < 10; i++ {
+                       time.Sleep(100 * time.Millisecond)
+                       val := masterClient.Get(ctx, "readkey").Val()
+                       // Value should be accessible (may return empty if 
already moved, but shouldn't error with TRYAGAIN)
+                       _ = val

Review Comment:
   The variable 'val' is assigned the result of Get() but never used. This 
appears to be dead code. If the intention is to verify that reads work during 
failover, consider adding an assertion to check the returned value or at 
minimum add a comment explaining why the value is intentionally ignored.
   ```suggestion
                        require.True(t, val == "readvalue" || val == "", 
"expected value to be 'readvalue' or '', got: %q", val)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to