Copilot commented on code in PR #3295: URL: https://github.com/apache/kvrocks/pull/3295#discussion_r2617794622
########## tests/gocase/integration/failover/failover_test.go: ########## @@ -0,0 +1,925 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package failover + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/apache/kvrocks/tests/gocase/util" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/require" +) + +// testNameWrapper wraps testing.TB to sanitize test names for MkdirTemp +// This is needed because subtest names contain "/" which causes MkdirTemp to fail +type testNameWrapper struct { + testing.TB + sanitizedName string +} + +func (w *testNameWrapper) Name() string { + return w.sanitizedName +} + +// sanitizeTestName replaces path separators in test names to avoid issues with MkdirTemp +func sanitizeTestName(tb testing.TB) testing.TB { + sanitizedName := strings.ReplaceAll(tb.Name(), "/", "_") + return &testNameWrapper{TB: tb, sanitizedName: sanitizedName} +} + +// startServerWithSanitizedName starts a server with a sanitized test name +func startServerWithSanitizedName(t testing.TB, configs map[string]string) *util.KvrocksServer { + return util.StartServer(sanitizeTestName(t), configs) +} + +type FailoverState string + +const ( + FailoverStateNone FailoverState = "none" + FailoverStateStarted FailoverState = "started" + FailoverStateCheckSlave FailoverState = "check_slave" + FailoverStatePauseWrite FailoverState = "pause_write" + FailoverStateWaitSync FailoverState = "wait_sync" + FailoverStateSwitching FailoverState = "switching" + FailoverStateSuccess FailoverState = "success" + FailoverStateFailed FailoverState = "failed" +) + +func TestFailoverBasicFlow(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + // Wait for replication to establish + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + t.Run("FAILOVER - Basic failover flow", func(t *testing.T) { + // Write some data + require.NoError(t, masterClient.Set(ctx, "key1", "value1", 0).Err()) + require.NoError(t, masterClient.Set(ctx, "key2", "value2", 0).Err()) + + // Start failover + result := masterClient.Do(ctx, "clusterx", "failover", slaveID) + if result.Err() != nil { + t.Logf("FAILOVER command error: %v", result.Err()) + } + require.NoError(t, result.Err(), "FAILOVER command should succeed") + require.Equal(t, "OK", result.Val()) + + // Wait for failover to complete + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + + // Verify slots are migrated (MOVED response) + require.ErrorContains(t, masterClient.Set(ctx, "key1", "newvalue", 0).Err(), "MOVED") + require.ErrorContains(t, masterClient.Get(ctx, "key1").Err(), "MOVED") + + // Verify data is accessible on new master (slave) + require.Equal(t, "value1", slaveClient.Get(ctx, "key1").Val()) + require.Equal(t, "value2", slaveClient.Get(ctx, "key2").Val()) + }) + + t.Run("FAILOVER - Failover with custom timeout", func(t *testing.T) { + // Reset failover state by updating topology + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Start failover with custom timeout + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "5000").Val()) + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + }) +} + +func TestFailoverFailureCases(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + t.Run("FAILOVER - Failover to non-existent node", func(t *testing.T) { + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383", masterID, master.Host(), master.Port()) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + nonExistentID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx99" + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", nonExistentID).Val()) + waitForFailoverState(t, masterClient, FailoverStateFailed, 5*time.Second) + }) + + t.Run("FAILOVER - Failover to non-slave node", func(t *testing.T) { + slave := startServerWithSanitizedName(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + // Set slave as master (not slave) + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d master -", slaveID, slave.Host(), slave.Port()) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID).Val()) + waitForFailoverState(t, masterClient, FailoverStateFailed, 5*time.Second) + }) + + t.Run("FAILOVER - Invalid timeout value", func(t *testing.T) { + slave := startServerWithSanitizedName(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "3").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "3").Err()) + + // Negative timeout should return error + require.Error(t, masterClient.Do(ctx, "clusterx", "failover", slaveID, "-1").Err()) + }) + + t.Run("FAILOVER - Different timeout values", func(t *testing.T) { + slave := startServerWithSanitizedName(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "4").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "4").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Test with timeout = 0 when slave is already synced (lag=0) + // When lag=0, failover should succeed because no waiting is needed + // But if slave has lag, it will fail + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "0").Val()) + // Wait for either success or failed state + require.Eventually(t, func() bool { + info := masterClient.ClusterInfo(ctx).Val() + return strings.Contains(info, "cluster_failover_state:success") || + strings.Contains(info, "cluster_failover_state:failed") + }, 5*time.Second, 100*time.Millisecond) + + // Check final state - can be success (if lag=0) or failed (if lag>0) + finalInfo := masterClient.ClusterInfo(ctx).Val() + if strings.Contains(finalInfo, "cluster_failover_state:success") { + t.Logf("timeout=0 with lag=0: failover succeeded as expected") + } else if strings.Contains(finalInfo, "cluster_failover_state:failed") { + t.Logf("timeout=0: failover failed (slave may have lag)") + } + + // Reset for next test + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "5").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "5").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Test with timeout = 0 when slave has lag + // Create lag by writing data to master without waiting for sync + for i := 0; i < 100; i++ { + require.NoError(t, masterClient.Set(ctx, fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i), 0).Err()) + } + // Don't wait for sync, start failover immediately to create lag scenario + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "0").Val()) + // With lag, timeout=0 should fail + waitForFailoverState(t, masterClient, FailoverStateFailed, 5*time.Second) + + // Reset and test with small timeout + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "6").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "6").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "100").Val()) + // Small timeout may fail, but should start + time.Sleep(200 * time.Millisecond) + info := masterClient.ClusterInfo(ctx).Val() + require.True(t, strings.Contains(info, "cluster_failover_state:failed") || + strings.Contains(info, "cluster_failover_state:success") || + strings.Contains(info, "cluster_failover_state:wait_sync") || + strings.Contains(info, "cluster_failover_state:switching")) + + // Reset and test with large timeout + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "7").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "7").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "10000").Val()) + waitForFailoverState(t, masterClient, FailoverStateSuccess, 15*time.Second) + }) +} + +func TestFailoverConcurrency(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + t.Run("FAILOVER - Cannot start failover when one is in progress", func(t *testing.T) { + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID).Val()) + + // Try to start another failover immediately - should return error + // Wait a bit to ensure first failover has started + time.Sleep(100 * time.Millisecond) + result := masterClient.Do(ctx, "clusterx", "failover", slaveID) + // The second call may return OK but won't start a new failover + // We verify the first one completes successfully + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + _ = result // Use result to avoid unused variable Review Comment: The variable 'result' is assigned but its value is never used except to avoid an "unused variable" error. The comment "Use result to avoid unused variable" confirms this is intentional dead code. Consider removing this variable entirely or using it to verify expected behavior (e.g., checking if the second failover attempt returns an error or OK). ```suggestion // The second call may return OK but won't start a new failover, // or it may return an error indicating a failover is already in progress. val, err := result.Result() if err != nil { require.Contains(t, err.Error(), "failover already in progress") } else { require.Equal(t, "OK", val) } // We verify the first one completes successfully waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) ``` ########## src/cluster/cluster_failover.cc: ########## @@ -0,0 +1,269 @@ +#include "cluster_failover.h" + +#include <unistd.h> + +#include "cluster/cluster.h" +#include "common/io_util.h" +#include "common/time_util.h" +#include "logging.h" +#include "server/redis_reply.h" +#include "server/server.h" + +ClusterFailover::ClusterFailover(Server *srv) : srv_(srv) { + t_ = std::thread([this]() { loop(); }); +} + +ClusterFailover::~ClusterFailover() { + { + std::lock_guard<std::mutex> lock(mutex_); + stop_thread_ = true; + cv_.notify_all(); + } + if (t_.joinable()) t_.join(); +} + +Status ClusterFailover::Run(std::string slave_node_id, int timeout_ms) { + std::lock_guard<std::mutex> lock(mutex_); + if (state_ != FailoverState::kNone && state_ != FailoverState::kFailed) { + return {Status::NotOK, "Failover is already in progress"}; + } + + slave_node_id_ = std::move(slave_node_id); + timeout_ms_ = timeout_ms; + state_ = FailoverState::kStarted; + failover_job_triggered_ = true; + cv_.notify_all(); + return Status::OK(); +} + +void ClusterFailover::loop() { + while (true) { + std::unique_lock<std::mutex> lock(mutex_); + cv_.wait(lock, [this]() { return stop_thread_ || failover_job_triggered_; }); + + if (stop_thread_) return; + + if (failover_job_triggered_) { + failover_job_triggered_ = false; + lock.unlock(); + runFailoverProcess(); + } + } +} + +void ClusterFailover::runFailoverProcess() { + auto ip_port = srv_->cluster->GetNodeIPPort(slave_node_id_); + if (!ip_port.IsOK()) { + error("[Failover] slave node not found in cluster {}", slave_node_id_); + abortFailover("Slave node not found in cluster"); + return; + } + node_ip_port_ = ip_port.GetValue().first + ":" + std::to_string(ip_port.GetValue().second); + node_ip_ = ip_port.GetValue().first; + node_port_ = ip_port.GetValue().second; + info("[Failover] slave node {} {} failover state: {}", slave_node_id_, node_ip_port_, static_cast<int>(state_.load())); + state_ = FailoverState::kCheck; + + auto s = checkSlaveStatus(); + if (!s.IsOK()) { + abortFailover(s.Msg()); + return; + } + + s = checkSlaveLag(); + if (!s.IsOK()) { + abortFailover("Slave lag check failed: " + s.Msg()); + return; + } + + info("[Failover] slave node {} {} check slave status success, enter pause state", slave_node_id_, node_ip_port_); + start_time_ms_ = util::GetTimeStampMS(); + // Enter Pause state (Stop writing) + state_ = FailoverState::kPause; + // Get current sequence + target_seq_ = srv_->storage->LatestSeqNumber(); + info("[Failover] slave node {} {} target sequence {}", slave_node_id_, node_ip_port_, target_seq_); + + state_ = FailoverState::kSyncWait; + s = waitReplicationSync(); + if (!s.IsOK()) { + abortFailover(s.Msg()); + return; + } + info("[Failover] slave node {} {} wait replication sync success, enter switch state, cost {} ms", slave_node_id_, + node_ip_port_, util::GetTimeStampMS() - start_time_ms_); + + state_ = FailoverState::kSwitch; + s = sendTakeoverCmd(); + if (!s.IsOK()) { + abortFailover(s.Msg()); + return; + } + + // Redirect slots + srv_->cluster->SetMySlotsMigrated(node_ip_port_); + + state_ = FailoverState::kSuccess; + info("[Failover] success {} {}", slave_node_id_, node_ip_port_); +} + +Status ClusterFailover::checkSlaveLag() { + auto start_offset_status = srv_->GetSlaveReplicationOffset(node_ip_port_); + if (!start_offset_status.IsOK()) { + return {Status::NotOK, "Failed to get slave offset: " + start_offset_status.Msg()}; + } + uint64_t start_offset = *start_offset_status; + int64_t start_sampling_ms = util::GetTimeStampMS(); + + // Wait 3s or half of timeout, but at least a bit to measure speed + int64_t wait_time = std::max(100, std::min(3000, timeout_ms_ / 2)); + std::this_thread::sleep_for(std::chrono::milliseconds(wait_time)); + + auto end_offset_status = srv_->GetSlaveReplicationOffset(node_ip_port_); + if (!end_offset_status.IsOK()) { + return {Status::NotOK, "Failed to get slave offset: " + end_offset_status.Msg()}; + } + uint64_t end_offset = *end_offset_status; + int64_t end_sampling_ms = util::GetTimeStampMS(); + + double elapsed_sec = (end_sampling_ms - start_sampling_ms) / 1000.0; + if (elapsed_sec <= 0) elapsed_sec = 0.001; + + uint64_t bytes = 0; + if (end_offset > start_offset) bytes = end_offset - start_offset; + double speed = bytes / elapsed_sec; + + uint64_t master_seq = srv_->storage->LatestSeqNumber(); + uint64_t lag = 0; + if (master_seq > end_offset) lag = master_seq - end_offset; + + if (lag == 0) return Status::OK(); + + if (speed <= 0.1) { // Basically 0 + return {Status::NotOK, fmt::format("Slave is not replicating (lag: {})", lag)}; + } + + double required_sec = lag / speed; + int64_t required_ms = static_cast<int64_t>(required_sec * 1000); + + int64_t elapsed_total = end_sampling_ms - start_sampling_ms; + int64_t remaining = timeout_ms_ - elapsed_total; + + if (required_ms > remaining) { + return {Status::NotOK, fmt::format("Estimated catchup time {}ms > remaining time {}ms (lag: {}, speed: {:.2f}/s)", + required_ms, remaining, lag, speed)}; + } + + info("[Failover] check: lag={}, speed={:.2f}/s, estimated_time={}ms, remaining={}ms", lag, speed, required_ms, + remaining); + return Status::OK(); +} + +Status ClusterFailover::checkSlaveStatus() { + // We could try to connect, but GetSlaveReplicationOffset checks connection. + auto offset = srv_->GetSlaveReplicationOffset(node_ip_port_); + if (!offset.IsOK()) { + error("[Failover] slave node {} {} not connected or not syncing", slave_node_id_, node_ip_port_); + return {Status::NotOK, "Slave not connected or not syncing"}; + } + info("[Failover] slave node {} {} is connected and syncing offset {}", slave_node_id_, node_ip_port_, offset.Msg()); + return Status::OK(); +} + +Status ClusterFailover::waitReplicationSync() { + while (true) { + if (util::GetTimeStampMS() - start_time_ms_ > static_cast<u_int64_t>(timeout_ms_)) { Review Comment: Using 'u_int64_t' is non-standard. The standard type is 'uint64_t' (without underscore). While u_int64_t may be defined on some systems (like BSD), it's not portable. Use uint64_t for consistency with the rest of the codebase. ########## src/cluster/cluster_failover.cc: ########## @@ -0,0 +1,269 @@ +#include "cluster_failover.h" + +#include <unistd.h> + +#include "cluster/cluster.h" +#include "common/io_util.h" +#include "common/time_util.h" +#include "logging.h" +#include "server/redis_reply.h" +#include "server/server.h" + +ClusterFailover::ClusterFailover(Server *srv) : srv_(srv) { + t_ = std::thread([this]() { loop(); }); +} + +ClusterFailover::~ClusterFailover() { + { + std::lock_guard<std::mutex> lock(mutex_); + stop_thread_ = true; + cv_.notify_all(); + } + if (t_.joinable()) t_.join(); +} + +Status ClusterFailover::Run(std::string slave_node_id, int timeout_ms) { + std::lock_guard<std::mutex> lock(mutex_); + if (state_ != FailoverState::kNone && state_ != FailoverState::kFailed) { + return {Status::NotOK, "Failover is already in progress"}; + } + + slave_node_id_ = std::move(slave_node_id); + timeout_ms_ = timeout_ms; + state_ = FailoverState::kStarted; + failover_job_triggered_ = true; + cv_.notify_all(); + return Status::OK(); +} + +void ClusterFailover::loop() { + while (true) { + std::unique_lock<std::mutex> lock(mutex_); + cv_.wait(lock, [this]() { return stop_thread_ || failover_job_triggered_; }); + + if (stop_thread_) return; + + if (failover_job_triggered_) { + failover_job_triggered_ = false; + lock.unlock(); + runFailoverProcess(); + } + } +} + +void ClusterFailover::runFailoverProcess() { + auto ip_port = srv_->cluster->GetNodeIPPort(slave_node_id_); + if (!ip_port.IsOK()) { + error("[Failover] slave node not found in cluster {}", slave_node_id_); + abortFailover("Slave node not found in cluster"); + return; + } + node_ip_port_ = ip_port.GetValue().first + ":" + std::to_string(ip_port.GetValue().second); + node_ip_ = ip_port.GetValue().first; + node_port_ = ip_port.GetValue().second; + info("[Failover] slave node {} {} failover state: {}", slave_node_id_, node_ip_port_, static_cast<int>(state_.load())); + state_ = FailoverState::kCheck; + + auto s = checkSlaveStatus(); + if (!s.IsOK()) { + abortFailover(s.Msg()); + return; + } + + s = checkSlaveLag(); + if (!s.IsOK()) { + abortFailover("Slave lag check failed: " + s.Msg()); + return; + } + + info("[Failover] slave node {} {} check slave status success, enter pause state", slave_node_id_, node_ip_port_); + start_time_ms_ = util::GetTimeStampMS(); + // Enter Pause state (Stop writing) + state_ = FailoverState::kPause; + // Get current sequence + target_seq_ = srv_->storage->LatestSeqNumber(); + info("[Failover] slave node {} {} target sequence {}", slave_node_id_, node_ip_port_, target_seq_); + + state_ = FailoverState::kSyncWait; + s = waitReplicationSync(); + if (!s.IsOK()) { + abortFailover(s.Msg()); + return; + } + info("[Failover] slave node {} {} wait replication sync success, enter switch state, cost {} ms", slave_node_id_, + node_ip_port_, util::GetTimeStampMS() - start_time_ms_); + + state_ = FailoverState::kSwitch; + s = sendTakeoverCmd(); + if (!s.IsOK()) { + abortFailover(s.Msg()); + return; + } + + // Redirect slots + srv_->cluster->SetMySlotsMigrated(node_ip_port_); + + state_ = FailoverState::kSuccess; + info("[Failover] success {} {}", slave_node_id_, node_ip_port_); +} + +Status ClusterFailover::checkSlaveLag() { + auto start_offset_status = srv_->GetSlaveReplicationOffset(node_ip_port_); + if (!start_offset_status.IsOK()) { + return {Status::NotOK, "Failed to get slave offset: " + start_offset_status.Msg()}; + } + uint64_t start_offset = *start_offset_status; + int64_t start_sampling_ms = util::GetTimeStampMS(); + + // Wait 3s or half of timeout, but at least a bit to measure speed + int64_t wait_time = std::max(100, std::min(3000, timeout_ms_ / 2)); + std::this_thread::sleep_for(std::chrono::milliseconds(wait_time)); + + auto end_offset_status = srv_->GetSlaveReplicationOffset(node_ip_port_); + if (!end_offset_status.IsOK()) { + return {Status::NotOK, "Failed to get slave offset: " + end_offset_status.Msg()}; + } + uint64_t end_offset = *end_offset_status; + int64_t end_sampling_ms = util::GetTimeStampMS(); + + double elapsed_sec = (end_sampling_ms - start_sampling_ms) / 1000.0; + if (elapsed_sec <= 0) elapsed_sec = 0.001; + + uint64_t bytes = 0; + if (end_offset > start_offset) bytes = end_offset - start_offset; + double speed = bytes / elapsed_sec; + + uint64_t master_seq = srv_->storage->LatestSeqNumber(); + uint64_t lag = 0; + if (master_seq > end_offset) lag = master_seq - end_offset; + + if (lag == 0) return Status::OK(); + + if (speed <= 0.1) { // Basically 0 + return {Status::NotOK, fmt::format("Slave is not replicating (lag: {})", lag)}; + } + + double required_sec = lag / speed; + int64_t required_ms = static_cast<int64_t>(required_sec * 1000); + + int64_t elapsed_total = end_sampling_ms - start_sampling_ms; + int64_t remaining = timeout_ms_ - elapsed_total; + + if (required_ms > remaining) { + return {Status::NotOK, fmt::format("Estimated catchup time {}ms > remaining time {}ms (lag: {}, speed: {:.2f}/s)", + required_ms, remaining, lag, speed)}; + } + + info("[Failover] check: lag={}, speed={:.2f}/s, estimated_time={}ms, remaining={}ms", lag, speed, required_ms, + remaining); + return Status::OK(); +} + +Status ClusterFailover::checkSlaveStatus() { + // We could try to connect, but GetSlaveReplicationOffset checks connection. + auto offset = srv_->GetSlaveReplicationOffset(node_ip_port_); + if (!offset.IsOK()) { + error("[Failover] slave node {} {} not connected or not syncing", slave_node_id_, node_ip_port_); + return {Status::NotOK, "Slave not connected or not syncing"}; + } + info("[Failover] slave node {} {} is connected and syncing offset {}", slave_node_id_, node_ip_port_, offset.Msg()); Review Comment: The log message uses offset.Msg() which is intended for error messages, not for displaying the actual offset value. The StatusOr type returns Msg() for error messages. To log the actual offset value, use *offset instead. ```suggestion info("[Failover] slave node {} {} is connected and syncing offset {}", slave_node_id_, node_ip_port_, *offset); ``` ########## tests/gocase/integration/failover/failover_test.go: ########## @@ -0,0 +1,925 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package failover + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/apache/kvrocks/tests/gocase/util" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/require" +) + +// testNameWrapper wraps testing.TB to sanitize test names for MkdirTemp +// This is needed because subtest names contain "/" which causes MkdirTemp to fail +type testNameWrapper struct { + testing.TB + sanitizedName string +} + +func (w *testNameWrapper) Name() string { + return w.sanitizedName +} + +// sanitizeTestName replaces path separators in test names to avoid issues with MkdirTemp +func sanitizeTestName(tb testing.TB) testing.TB { + sanitizedName := strings.ReplaceAll(tb.Name(), "/", "_") + return &testNameWrapper{TB: tb, sanitizedName: sanitizedName} +} + +// startServerWithSanitizedName starts a server with a sanitized test name +func startServerWithSanitizedName(t testing.TB, configs map[string]string) *util.KvrocksServer { + return util.StartServer(sanitizeTestName(t), configs) +} + +type FailoverState string + +const ( + FailoverStateNone FailoverState = "none" + FailoverStateStarted FailoverState = "started" + FailoverStateCheckSlave FailoverState = "check_slave" + FailoverStatePauseWrite FailoverState = "pause_write" + FailoverStateWaitSync FailoverState = "wait_sync" + FailoverStateSwitching FailoverState = "switching" + FailoverStateSuccess FailoverState = "success" + FailoverStateFailed FailoverState = "failed" +) + +func TestFailoverBasicFlow(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + // Wait for replication to establish + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + t.Run("FAILOVER - Basic failover flow", func(t *testing.T) { + // Write some data + require.NoError(t, masterClient.Set(ctx, "key1", "value1", 0).Err()) + require.NoError(t, masterClient.Set(ctx, "key2", "value2", 0).Err()) + + // Start failover + result := masterClient.Do(ctx, "clusterx", "failover", slaveID) + if result.Err() != nil { + t.Logf("FAILOVER command error: %v", result.Err()) + } + require.NoError(t, result.Err(), "FAILOVER command should succeed") + require.Equal(t, "OK", result.Val()) + + // Wait for failover to complete + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + + // Verify slots are migrated (MOVED response) + require.ErrorContains(t, masterClient.Set(ctx, "key1", "newvalue", 0).Err(), "MOVED") + require.ErrorContains(t, masterClient.Get(ctx, "key1").Err(), "MOVED") + + // Verify data is accessible on new master (slave) + require.Equal(t, "value1", slaveClient.Get(ctx, "key1").Val()) + require.Equal(t, "value2", slaveClient.Get(ctx, "key2").Val()) + }) + + t.Run("FAILOVER - Failover with custom timeout", func(t *testing.T) { + // Reset failover state by updating topology + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Start failover with custom timeout + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "5000").Val()) + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + }) +} + +func TestFailoverFailureCases(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + t.Run("FAILOVER - Failover to non-existent node", func(t *testing.T) { + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383", masterID, master.Host(), master.Port()) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + nonExistentID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx99" + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", nonExistentID).Val()) + waitForFailoverState(t, masterClient, FailoverStateFailed, 5*time.Second) + }) + + t.Run("FAILOVER - Failover to non-slave node", func(t *testing.T) { + slave := startServerWithSanitizedName(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + // Set slave as master (not slave) + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d master -", slaveID, slave.Host(), slave.Port()) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID).Val()) + waitForFailoverState(t, masterClient, FailoverStateFailed, 5*time.Second) + }) + + t.Run("FAILOVER - Invalid timeout value", func(t *testing.T) { + slave := startServerWithSanitizedName(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "3").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "3").Err()) + + // Negative timeout should return error + require.Error(t, masterClient.Do(ctx, "clusterx", "failover", slaveID, "-1").Err()) + }) + + t.Run("FAILOVER - Different timeout values", func(t *testing.T) { + slave := startServerWithSanitizedName(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "4").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "4").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Test with timeout = 0 when slave is already synced (lag=0) + // When lag=0, failover should succeed because no waiting is needed + // But if slave has lag, it will fail + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "0").Val()) + // Wait for either success or failed state + require.Eventually(t, func() bool { + info := masterClient.ClusterInfo(ctx).Val() + return strings.Contains(info, "cluster_failover_state:success") || + strings.Contains(info, "cluster_failover_state:failed") + }, 5*time.Second, 100*time.Millisecond) + + // Check final state - can be success (if lag=0) or failed (if lag>0) + finalInfo := masterClient.ClusterInfo(ctx).Val() + if strings.Contains(finalInfo, "cluster_failover_state:success") { + t.Logf("timeout=0 with lag=0: failover succeeded as expected") + } else if strings.Contains(finalInfo, "cluster_failover_state:failed") { + t.Logf("timeout=0: failover failed (slave may have lag)") + } + + // Reset for next test + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "5").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "5").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Test with timeout = 0 when slave has lag + // Create lag by writing data to master without waiting for sync + for i := 0; i < 100; i++ { + require.NoError(t, masterClient.Set(ctx, fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i), 0).Err()) + } + // Don't wait for sync, start failover immediately to create lag scenario + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "0").Val()) + // With lag, timeout=0 should fail + waitForFailoverState(t, masterClient, FailoverStateFailed, 5*time.Second) + + // Reset and test with small timeout + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "6").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "6").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "100").Val()) + // Small timeout may fail, but should start + time.Sleep(200 * time.Millisecond) + info := masterClient.ClusterInfo(ctx).Val() + require.True(t, strings.Contains(info, "cluster_failover_state:failed") || + strings.Contains(info, "cluster_failover_state:success") || + strings.Contains(info, "cluster_failover_state:wait_sync") || + strings.Contains(info, "cluster_failover_state:switching")) + + // Reset and test with large timeout + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "7").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "7").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "10000").Val()) + waitForFailoverState(t, masterClient, FailoverStateSuccess, 15*time.Second) + }) +} + +func TestFailoverConcurrency(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + t.Run("FAILOVER - Cannot start failover when one is in progress", func(t *testing.T) { + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID).Val()) + + // Try to start another failover immediately - should return error + // Wait a bit to ensure first failover has started + time.Sleep(100 * time.Millisecond) + result := masterClient.Do(ctx, "clusterx", "failover", slaveID) + // The second call may return OK but won't start a new failover + // We verify the first one completes successfully + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + _ = result // Use result to avoid unused variable + }) + + t.Run("FAILOVER - Can restart after failure", func(t *testing.T) { + // Reset state + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Start a failover with very short timeout + // If slave is synced (lag=0), it may succeed; if slave has lag, it will fail + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "1").Val()) + // Accept either success or failed state + require.Eventually(t, func() bool { + info := masterClient.ClusterInfo(ctx).Val() + return strings.Contains(info, "cluster_failover_state:success") || + strings.Contains(info, "cluster_failover_state:failed") + }, 5*time.Second, 100*time.Millisecond) + + // Can restart after failure + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "3").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "3").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "10000").Val()) + waitForFailoverState(t, masterClient, FailoverStateSuccess, 15*time.Second) + }) +} + +func TestFailoverWriteBlocking(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + t.Run("FAILOVER - Write requests blocked during failover", func(t *testing.T) { + // Write initial data + require.NoError(t, masterClient.Set(ctx, "testkey", "testvalue", 0).Err()) + + // Start failover with long timeout to observe blocking + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "10000").Val()) + + // Try to write during failover - should return TRYAGAIN in blocking states + // Poll for blocking state (pause_write, wait_sync, or switching) + var writeBlocked bool + for i := 0; i < 50; i++ { + time.Sleep(50 * time.Millisecond) + err := masterClient.Set(ctx, "testkey", "newvalue", 0).Err() + if err != nil && (strings.Contains(err.Error(), "TRYAGAIN") || strings.Contains(err.Error(), "Failover in progress")) { + writeBlocked = true + break + } + // Check if failover already completed + info := masterClient.ClusterInfo(ctx).Val() + if strings.Contains(info, "cluster_failover_state:success") { + break + } + } + // At least one write should have been blocked, or failover completed very quickly + waitForFailoverState(t, masterClient, FailoverStateSuccess, 15*time.Second) + + // After success, writes should return MOVED + require.ErrorContains(t, masterClient.Set(ctx, "testkey", "newvalue2", 0).Err(), "MOVED") + _ = writeBlocked // May be false if failover was very fast Review Comment: The variable 'writeBlocked' is assigned but never used in a meaningful way. It's set to true when a write is blocked during failover, but the comment acknowledges it may be false if failover was very fast. This appears to be dead code that should either be removed or used in an assertion to verify the expected behavior. ########## tests/gocase/integration/failover/failover_test.go: ########## @@ -0,0 +1,925 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package failover + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/apache/kvrocks/tests/gocase/util" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/require" +) + +// testNameWrapper wraps testing.TB to sanitize test names for MkdirTemp +// This is needed because subtest names contain "/" which causes MkdirTemp to fail +type testNameWrapper struct { + testing.TB + sanitizedName string +} + +func (w *testNameWrapper) Name() string { + return w.sanitizedName +} + +// sanitizeTestName replaces path separators in test names to avoid issues with MkdirTemp +func sanitizeTestName(tb testing.TB) testing.TB { + sanitizedName := strings.ReplaceAll(tb.Name(), "/", "_") + return &testNameWrapper{TB: tb, sanitizedName: sanitizedName} +} + +// startServerWithSanitizedName starts a server with a sanitized test name +func startServerWithSanitizedName(t testing.TB, configs map[string]string) *util.KvrocksServer { + return util.StartServer(sanitizeTestName(t), configs) +} + +type FailoverState string + +const ( + FailoverStateNone FailoverState = "none" + FailoverStateStarted FailoverState = "started" + FailoverStateCheckSlave FailoverState = "check_slave" + FailoverStatePauseWrite FailoverState = "pause_write" + FailoverStateWaitSync FailoverState = "wait_sync" + FailoverStateSwitching FailoverState = "switching" + FailoverStateSuccess FailoverState = "success" + FailoverStateFailed FailoverState = "failed" +) + +func TestFailoverBasicFlow(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + // Wait for replication to establish + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + t.Run("FAILOVER - Basic failover flow", func(t *testing.T) { + // Write some data + require.NoError(t, masterClient.Set(ctx, "key1", "value1", 0).Err()) + require.NoError(t, masterClient.Set(ctx, "key2", "value2", 0).Err()) + + // Start failover + result := masterClient.Do(ctx, "clusterx", "failover", slaveID) + if result.Err() != nil { + t.Logf("FAILOVER command error: %v", result.Err()) + } + require.NoError(t, result.Err(), "FAILOVER command should succeed") + require.Equal(t, "OK", result.Val()) + + // Wait for failover to complete + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + + // Verify slots are migrated (MOVED response) + require.ErrorContains(t, masterClient.Set(ctx, "key1", "newvalue", 0).Err(), "MOVED") + require.ErrorContains(t, masterClient.Get(ctx, "key1").Err(), "MOVED") + + // Verify data is accessible on new master (slave) + require.Equal(t, "value1", slaveClient.Get(ctx, "key1").Val()) + require.Equal(t, "value2", slaveClient.Get(ctx, "key2").Val()) + }) + + t.Run("FAILOVER - Failover with custom timeout", func(t *testing.T) { + // Reset failover state by updating topology + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Start failover with custom timeout + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "5000").Val()) + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + }) +} + +func TestFailoverFailureCases(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + t.Run("FAILOVER - Failover to non-existent node", func(t *testing.T) { + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383", masterID, master.Host(), master.Port()) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + nonExistentID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx99" + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", nonExistentID).Val()) + waitForFailoverState(t, masterClient, FailoverStateFailed, 5*time.Second) + }) + + t.Run("FAILOVER - Failover to non-slave node", func(t *testing.T) { + slave := startServerWithSanitizedName(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + // Set slave as master (not slave) + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d master -", slaveID, slave.Host(), slave.Port()) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID).Val()) + waitForFailoverState(t, masterClient, FailoverStateFailed, 5*time.Second) + }) + + t.Run("FAILOVER - Invalid timeout value", func(t *testing.T) { + slave := startServerWithSanitizedName(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "3").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "3").Err()) + + // Negative timeout should return error + require.Error(t, masterClient.Do(ctx, "clusterx", "failover", slaveID, "-1").Err()) + }) + + t.Run("FAILOVER - Different timeout values", func(t *testing.T) { + slave := startServerWithSanitizedName(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "4").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "4").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Test with timeout = 0 when slave is already synced (lag=0) + // When lag=0, failover should succeed because no waiting is needed + // But if slave has lag, it will fail + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "0").Val()) + // Wait for either success or failed state + require.Eventually(t, func() bool { + info := masterClient.ClusterInfo(ctx).Val() + return strings.Contains(info, "cluster_failover_state:success") || + strings.Contains(info, "cluster_failover_state:failed") + }, 5*time.Second, 100*time.Millisecond) + + // Check final state - can be success (if lag=0) or failed (if lag>0) + finalInfo := masterClient.ClusterInfo(ctx).Val() + if strings.Contains(finalInfo, "cluster_failover_state:success") { + t.Logf("timeout=0 with lag=0: failover succeeded as expected") + } else if strings.Contains(finalInfo, "cluster_failover_state:failed") { + t.Logf("timeout=0: failover failed (slave may have lag)") + } + + // Reset for next test + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "5").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "5").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Test with timeout = 0 when slave has lag + // Create lag by writing data to master without waiting for sync + for i := 0; i < 100; i++ { + require.NoError(t, masterClient.Set(ctx, fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i), 0).Err()) + } + // Don't wait for sync, start failover immediately to create lag scenario + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "0").Val()) + // With lag, timeout=0 should fail + waitForFailoverState(t, masterClient, FailoverStateFailed, 5*time.Second) + + // Reset and test with small timeout + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "6").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "6").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "100").Val()) + // Small timeout may fail, but should start + time.Sleep(200 * time.Millisecond) + info := masterClient.ClusterInfo(ctx).Val() + require.True(t, strings.Contains(info, "cluster_failover_state:failed") || + strings.Contains(info, "cluster_failover_state:success") || + strings.Contains(info, "cluster_failover_state:wait_sync") || + strings.Contains(info, "cluster_failover_state:switching")) + + // Reset and test with large timeout + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "7").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "7").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "10000").Val()) + waitForFailoverState(t, masterClient, FailoverStateSuccess, 15*time.Second) + }) +} + +func TestFailoverConcurrency(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + t.Run("FAILOVER - Cannot start failover when one is in progress", func(t *testing.T) { + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID).Val()) + + // Try to start another failover immediately - should return error + // Wait a bit to ensure first failover has started + time.Sleep(100 * time.Millisecond) + result := masterClient.Do(ctx, "clusterx", "failover", slaveID) + // The second call may return OK but won't start a new failover + // We verify the first one completes successfully + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + _ = result // Use result to avoid unused variable + }) + + t.Run("FAILOVER - Can restart after failure", func(t *testing.T) { + // Reset state + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Start a failover with very short timeout + // If slave is synced (lag=0), it may succeed; if slave has lag, it will fail + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "1").Val()) + // Accept either success or failed state + require.Eventually(t, func() bool { + info := masterClient.ClusterInfo(ctx).Val() + return strings.Contains(info, "cluster_failover_state:success") || + strings.Contains(info, "cluster_failover_state:failed") + }, 5*time.Second, 100*time.Millisecond) + + // Can restart after failure + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "3").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "3").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "10000").Val()) + waitForFailoverState(t, masterClient, FailoverStateSuccess, 15*time.Second) + }) +} + +func TestFailoverWriteBlocking(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + t.Run("FAILOVER - Write requests blocked during failover", func(t *testing.T) { + // Write initial data + require.NoError(t, masterClient.Set(ctx, "testkey", "testvalue", 0).Err()) + + // Start failover with long timeout to observe blocking + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "10000").Val()) + + // Try to write during failover - should return TRYAGAIN in blocking states + // Poll for blocking state (pause_write, wait_sync, or switching) + var writeBlocked bool + for i := 0; i < 50; i++ { + time.Sleep(50 * time.Millisecond) + err := masterClient.Set(ctx, "testkey", "newvalue", 0).Err() + if err != nil && (strings.Contains(err.Error(), "TRYAGAIN") || strings.Contains(err.Error(), "Failover in progress")) { + writeBlocked = true + break + } + // Check if failover already completed + info := masterClient.ClusterInfo(ctx).Val() + if strings.Contains(info, "cluster_failover_state:success") { + break + } + } + // At least one write should have been blocked, or failover completed very quickly + waitForFailoverState(t, masterClient, FailoverStateSuccess, 15*time.Second) + + // After success, writes should return MOVED + require.ErrorContains(t, masterClient.Set(ctx, "testkey", "newvalue2", 0).Err(), "MOVED") + _ = writeBlocked // May be false if failover was very fast + }) + + t.Run("FAILOVER - Read requests not blocked during failover", func(t *testing.T) { + // Reset state + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + require.NoError(t, masterClient.Set(ctx, "readkey", "readvalue", 0).Err()) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "10000").Val()) + + // Reads should work during failover (not blocked) + // Try reading multiple times during failover + for i := 0; i < 10; i++ { + time.Sleep(100 * time.Millisecond) + val := masterClient.Get(ctx, "readkey").Val() + // Value should be accessible (may return empty if already moved, but shouldn't error with TRYAGAIN) + _ = val + // Check if failover completed + info := masterClient.ClusterInfo(ctx).Val() + if strings.Contains(info, "cluster_failover_state:success") { + break + } + } + waitForFailoverState(t, masterClient, FailoverStateSuccess, 15*time.Second) + }) +} + +func TestFailoverWithAuth(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{ + "cluster-enabled": "yes", + "requirepass": "password123", + }) + defer func() { master.Close() }() + masterClient := master.NewClient() + masterClient = redis.NewClient(&redis.Options{ + Addr: master.HostPort(), + Password: "password123", + }) + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := startServerWithSanitizedName(t, map[string]string{ + "cluster-enabled": "yes", + "requirepass": "password123", + "masterauth": "password123", + }) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + slaveClient = redis.NewClient(&redis.Options{ + Addr: slave.HostPort(), + Password: "password123", + }) + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + t.Run("FAILOVER - Failover with authentication", func(t *testing.T) { + require.NoError(t, masterClient.Set(ctx, "authkey", "authvalue", 0).Err()) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID).Val()) + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + + // Verify data on new master + require.Equal(t, "authvalue", slaveClient.Get(ctx, "authkey").Val()) + }) +} + +func TestFailoverStateQuery(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + t.Run("FAILOVER - Query failover state via CLUSTER INFO", func(t *testing.T) { + // Initial state should be none + info := masterClient.ClusterInfo(ctx).Val() + require.Contains(t, info, "cluster_failover_state:none") + + // Start failover + result := masterClient.Do(ctx, "clusterx", "failover", slaveID) + if result.Err() != nil { + t.Logf("FAILOVER command error: %v", result.Err()) + } + require.NoError(t, result.Err(), "FAILOVER command should succeed") + require.Equal(t, "OK", result.Val()) + + // Wait for success + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + + // Verify state is success + info = masterClient.ClusterInfo(ctx).Val() + require.Contains(t, info, "cluster_failover_state:success") + }) + + t.Run("FAILOVER - All state transitions", func(t *testing.T) { + // Reset state + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Initial state: none + info := masterClient.ClusterInfo(ctx).Val() + require.Contains(t, info, "cluster_failover_state:none") + + // Start failover + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID).Val()) + + // We may catch intermediate states, but they're very fast + // The important thing is we transition through them and end at success + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + + // Verify final state + info = masterClient.ClusterInfo(ctx).Val() + require.Contains(t, info, "cluster_failover_state:success") + }) +} + +func TestFailoverTakeoverCommand(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + t.Run("FAILOVER - TAKEOVER command on slave", func(t *testing.T) { + // Slave should accept TAKEOVER command + require.Equal(t, "OK", slaveClient.Do(ctx, "clusterx", "takeover").Val()) + + // Verify imported slots are set + // After takeover, slave should be able to serve the slots + require.NoError(t, masterClient.Set(ctx, "takeoverkey", "takeovervalue", 0).Err()) + + // Start failover to test the full flow + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID).Val()) + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + }) +} + +func TestFailoverDataConsistency(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + t.Run("FAILOVER - Data consistency after failover", func(t *testing.T) { + // Write various types of data + require.NoError(t, masterClient.Set(ctx, "string_key", "string_value", 0).Err()) + require.NoError(t, masterClient.LPush(ctx, "list_key", "item1", "item2", "item3").Err()) + require.NoError(t, masterClient.HSet(ctx, "hash_key", "field1", "value1", "field2", "value2").Err()) + require.NoError(t, masterClient.SAdd(ctx, "set_key", "member1", "member2").Err()) + require.NoError(t, masterClient.ZAdd(ctx, "zset_key", redis.Z{Score: 1.0, Member: "member1"}).Err()) + + // Start failover with longer timeout to ensure data sync + result := masterClient.Do(ctx, "clusterx", "failover", slaveID, "10000") + if result.Err() != nil { + t.Logf("FAILOVER command error: %v", result.Err()) + } + require.NoError(t, result.Err(), "FAILOVER command should succeed") + require.Equal(t, "OK", result.Val()) + waitForFailoverState(t, masterClient, FailoverStateSuccess, 15*time.Second) + + // Verify all data is on new master + require.Equal(t, "string_value", slaveClient.Get(ctx, "string_key").Val()) + require.EqualValues(t, []string{"item3", "item2", "item1"}, slaveClient.LRange(ctx, "list_key", 0, -1).Val()) + require.Equal(t, map[string]string{"field1": "value1", "field2": "value2"}, slaveClient.HGetAll(ctx, "hash_key").Val()) + require.EqualValues(t, []string{"member1", "member2"}, slaveClient.SMembers(ctx, "set_key").Val()) + require.EqualValues(t, []redis.Z{{Score: 1.0, Member: "member1"}}, slaveClient.ZRangeWithScores(ctx, "zset_key", 0, -1).Val()) + }) +} + +func TestFailoverStateReset(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + t.Run("FAILOVER - State reset after SETNODES", func(t *testing.T) { + // Start and complete failover + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID).Val()) + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + + // Update topology (simulating controller update) + newClusterNodes := fmt.Sprintf("%s %s %d slave %s\n", masterID, master.Host(), master.Port(), slaveID) + newClusterNodes += fmt.Sprintf("%s %s %d master - 0-16383", slaveID, slave.Host(), slave.Port()) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", newClusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", newClusterNodes, "2").Err()) + + // State should be reset to none + // After SETNODES, the original master becomes slave, and original slave becomes master + // Wait for replication relationship to be re-established + // The new master (slaveClient) should have a slave connection + require.Eventually(t, func() bool { + info := slaveClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 20*time.Second, 200*time.Millisecond) + + // Verify failover state is reset to none on the new master + info := slaveClient.ClusterInfo(ctx).Val() + require.Contains(t, info, "cluster_failover_state:none") + }) +} + +// Helper functions + +func waitForFailoverState(t testing.TB, client *redis.Client, state FailoverState, timeout time.Duration) { + var lastInfo string + require.Eventually(t, func() bool { + info := client.ClusterInfo(context.Background()).Val() + if info != lastInfo && strings.Contains(info, "cluster_failover_state:") { + // Log state changes for debugging + t.Logf("Failover state: %s", info) + lastInfo = info + } + return strings.Contains(info, fmt.Sprintf("cluster_failover_state:%s", state)) + }, timeout, 100*time.Millisecond) +} + +func requireFailoverState(t testing.TB, client *redis.Client, state FailoverState) { + info := client.ClusterInfo(context.Background()).Val() + require.Contains(t, info, fmt.Sprintf("cluster_failover_state:%s", state)) +} Review Comment: The function 'requireFailoverState' is defined but never called anywhere in the test file. This is dead code that should be removed unless it's intended for future use, in which case it should be marked with a TODO comment or removed until needed. ```suggestion ``` ########## tests/gocase/integration/failover/failover_test.go: ########## @@ -0,0 +1,925 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package failover + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/apache/kvrocks/tests/gocase/util" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/require" +) + +// testNameWrapper wraps testing.TB to sanitize test names for MkdirTemp +// This is needed because subtest names contain "/" which causes MkdirTemp to fail +type testNameWrapper struct { + testing.TB + sanitizedName string +} + +func (w *testNameWrapper) Name() string { + return w.sanitizedName +} + +// sanitizeTestName replaces path separators in test names to avoid issues with MkdirTemp +func sanitizeTestName(tb testing.TB) testing.TB { + sanitizedName := strings.ReplaceAll(tb.Name(), "/", "_") + return &testNameWrapper{TB: tb, sanitizedName: sanitizedName} +} + +// startServerWithSanitizedName starts a server with a sanitized test name +func startServerWithSanitizedName(t testing.TB, configs map[string]string) *util.KvrocksServer { + return util.StartServer(sanitizeTestName(t), configs) +} + +type FailoverState string + +const ( + FailoverStateNone FailoverState = "none" + FailoverStateStarted FailoverState = "started" + FailoverStateCheckSlave FailoverState = "check_slave" + FailoverStatePauseWrite FailoverState = "pause_write" + FailoverStateWaitSync FailoverState = "wait_sync" + FailoverStateSwitching FailoverState = "switching" + FailoverStateSuccess FailoverState = "success" + FailoverStateFailed FailoverState = "failed" +) + +func TestFailoverBasicFlow(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + // Wait for replication to establish + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + t.Run("FAILOVER - Basic failover flow", func(t *testing.T) { + // Write some data + require.NoError(t, masterClient.Set(ctx, "key1", "value1", 0).Err()) + require.NoError(t, masterClient.Set(ctx, "key2", "value2", 0).Err()) + + // Start failover + result := masterClient.Do(ctx, "clusterx", "failover", slaveID) + if result.Err() != nil { + t.Logf("FAILOVER command error: %v", result.Err()) + } + require.NoError(t, result.Err(), "FAILOVER command should succeed") + require.Equal(t, "OK", result.Val()) + + // Wait for failover to complete + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + + // Verify slots are migrated (MOVED response) + require.ErrorContains(t, masterClient.Set(ctx, "key1", "newvalue", 0).Err(), "MOVED") + require.ErrorContains(t, masterClient.Get(ctx, "key1").Err(), "MOVED") + + // Verify data is accessible on new master (slave) + require.Equal(t, "value1", slaveClient.Get(ctx, "key1").Val()) + require.Equal(t, "value2", slaveClient.Get(ctx, "key2").Val()) + }) + + t.Run("FAILOVER - Failover with custom timeout", func(t *testing.T) { + // Reset failover state by updating topology + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Start failover with custom timeout + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "5000").Val()) + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + }) +} + +func TestFailoverFailureCases(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + t.Run("FAILOVER - Failover to non-existent node", func(t *testing.T) { + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383", masterID, master.Host(), master.Port()) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + nonExistentID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx99" + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", nonExistentID).Val()) + waitForFailoverState(t, masterClient, FailoverStateFailed, 5*time.Second) + }) + + t.Run("FAILOVER - Failover to non-slave node", func(t *testing.T) { + slave := startServerWithSanitizedName(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + // Set slave as master (not slave) + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d master -", slaveID, slave.Host(), slave.Port()) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID).Val()) + waitForFailoverState(t, masterClient, FailoverStateFailed, 5*time.Second) + }) + + t.Run("FAILOVER - Invalid timeout value", func(t *testing.T) { + slave := startServerWithSanitizedName(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "3").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "3").Err()) + + // Negative timeout should return error + require.Error(t, masterClient.Do(ctx, "clusterx", "failover", slaveID, "-1").Err()) + }) + + t.Run("FAILOVER - Different timeout values", func(t *testing.T) { + slave := startServerWithSanitizedName(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "4").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "4").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Test with timeout = 0 when slave is already synced (lag=0) + // When lag=0, failover should succeed because no waiting is needed + // But if slave has lag, it will fail + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "0").Val()) + // Wait for either success or failed state + require.Eventually(t, func() bool { + info := masterClient.ClusterInfo(ctx).Val() + return strings.Contains(info, "cluster_failover_state:success") || + strings.Contains(info, "cluster_failover_state:failed") + }, 5*time.Second, 100*time.Millisecond) + + // Check final state - can be success (if lag=0) or failed (if lag>0) + finalInfo := masterClient.ClusterInfo(ctx).Val() + if strings.Contains(finalInfo, "cluster_failover_state:success") { + t.Logf("timeout=0 with lag=0: failover succeeded as expected") + } else if strings.Contains(finalInfo, "cluster_failover_state:failed") { + t.Logf("timeout=0: failover failed (slave may have lag)") + } + + // Reset for next test + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "5").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "5").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Test with timeout = 0 when slave has lag + // Create lag by writing data to master without waiting for sync + for i := 0; i < 100; i++ { + require.NoError(t, masterClient.Set(ctx, fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i), 0).Err()) + } + // Don't wait for sync, start failover immediately to create lag scenario + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "0").Val()) + // With lag, timeout=0 should fail + waitForFailoverState(t, masterClient, FailoverStateFailed, 5*time.Second) + + // Reset and test with small timeout + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "6").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "6").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "100").Val()) + // Small timeout may fail, but should start + time.Sleep(200 * time.Millisecond) + info := masterClient.ClusterInfo(ctx).Val() + require.True(t, strings.Contains(info, "cluster_failover_state:failed") || + strings.Contains(info, "cluster_failover_state:success") || + strings.Contains(info, "cluster_failover_state:wait_sync") || + strings.Contains(info, "cluster_failover_state:switching")) + + // Reset and test with large timeout + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "7").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "7").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "10000").Val()) + waitForFailoverState(t, masterClient, FailoverStateSuccess, 15*time.Second) + }) +} + +func TestFailoverConcurrency(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + t.Run("FAILOVER - Cannot start failover when one is in progress", func(t *testing.T) { + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID).Val()) + + // Try to start another failover immediately - should return error + // Wait a bit to ensure first failover has started + time.Sleep(100 * time.Millisecond) + result := masterClient.Do(ctx, "clusterx", "failover", slaveID) + // The second call may return OK but won't start a new failover + // We verify the first one completes successfully + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + _ = result // Use result to avoid unused variable + }) + + t.Run("FAILOVER - Can restart after failure", func(t *testing.T) { + // Reset state + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Start a failover with very short timeout + // If slave is synced (lag=0), it may succeed; if slave has lag, it will fail + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "1").Val()) + // Accept either success or failed state + require.Eventually(t, func() bool { + info := masterClient.ClusterInfo(ctx).Val() + return strings.Contains(info, "cluster_failover_state:success") || + strings.Contains(info, "cluster_failover_state:failed") + }, 5*time.Second, 100*time.Millisecond) + + // Can restart after failure + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "3").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "3").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "10000").Val()) + waitForFailoverState(t, masterClient, FailoverStateSuccess, 15*time.Second) + }) +} + +func TestFailoverWriteBlocking(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + t.Run("FAILOVER - Write requests blocked during failover", func(t *testing.T) { + // Write initial data + require.NoError(t, masterClient.Set(ctx, "testkey", "testvalue", 0).Err()) + + // Start failover with long timeout to observe blocking + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "10000").Val()) + + // Try to write during failover - should return TRYAGAIN in blocking states + // Poll for blocking state (pause_write, wait_sync, or switching) + var writeBlocked bool + for i := 0; i < 50; i++ { + time.Sleep(50 * time.Millisecond) + err := masterClient.Set(ctx, "testkey", "newvalue", 0).Err() + if err != nil && (strings.Contains(err.Error(), "TRYAGAIN") || strings.Contains(err.Error(), "Failover in progress")) { + writeBlocked = true + break + } + // Check if failover already completed + info := masterClient.ClusterInfo(ctx).Val() + if strings.Contains(info, "cluster_failover_state:success") { + break + } + } + // At least one write should have been blocked, or failover completed very quickly + waitForFailoverState(t, masterClient, FailoverStateSuccess, 15*time.Second) + + // After success, writes should return MOVED + require.ErrorContains(t, masterClient.Set(ctx, "testkey", "newvalue2", 0).Err(), "MOVED") + _ = writeBlocked // May be false if failover was very fast + }) + + t.Run("FAILOVER - Read requests not blocked during failover", func(t *testing.T) { + // Reset state + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + require.NoError(t, masterClient.Set(ctx, "readkey", "readvalue", 0).Err()) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "10000").Val()) + + // Reads should work during failover (not blocked) + // Try reading multiple times during failover + for i := 0; i < 10; i++ { + time.Sleep(100 * time.Millisecond) + val := masterClient.Get(ctx, "readkey").Val() + // Value should be accessible (may return empty if already moved, but shouldn't error with TRYAGAIN) + _ = val Review Comment: The variable 'val' is assigned the result of Get() but never used. This appears to be dead code. If the intention is to verify that reads work during failover, consider adding an assertion to check the returned value or at minimum add a comment explaining why the value is intentionally ignored. ```suggestion require.True(t, val == "readvalue" || val == "", "expected value to be 'readvalue' or '', got: %q", val) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
