KUDU-2274 [itest] stress test for replica replacement This is a stress test for the automatic replica replacement in Kudu.
Parameters of the test are configurable via run-time gflags, so it's possible to run it in a 'standalone' mode, targeting it to be a sort of an endurance test. This test reproduces the race described in KUDU-2274: it triggers DCHECK() assertions added in 194fd8b169f29aafbd78a47709ac51d2e8354a1a before the relevant fixes for KUDU-2274 were checked in. Change-Id: I036a882f7e9132a5c26013227c50c0699b59ed6e Reviewed-on: http://gerrit.cloudera.org:8080/9255 Tested-by: Kudu Jenkins Reviewed-by: Mike Percy <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a6658539 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a6658539 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a6658539 Branch: refs/heads/master Commit: a66585398e3873a3b8a38f6a3a1becb644979a50 Parents: a964b0e Author: Alexey Serbin <[email protected]> Authored: Tue Feb 13 16:45:32 2018 -0800 Committer: Alexey Serbin <[email protected]> Committed: Wed Feb 21 02:14:04 2018 +0000 ---------------------------------------------------------------------- src/kudu/integration-tests/CMakeLists.txt | 1 + src/kudu/integration-tests/cluster_verifier.cc | 4 +- src/kudu/integration-tests/cluster_verifier.h | 5 +- .../raft_consensus_stress-itest.cc | 299 +++++++++++++++++++ 4 files changed, 305 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/a6658539/src/kudu/integration-tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt index af5dae9..c3922eb 100644 --- a/src/kudu/integration-tests/CMakeLists.txt +++ b/src/kudu/integration-tests/CMakeLists.txt @@ -91,6 +91,7 @@ ADD_KUDU_TEST(raft_config_change-itest) ADD_KUDU_TEST(raft_consensus_election-itest) ADD_KUDU_TEST(raft_consensus_failure_detector-imc-itest) ADD_KUDU_TEST(raft_consensus_nonvoter-itest) +ADD_KUDU_TEST(raft_consensus_stress-itest) ADD_KUDU_TEST(raft_consensus-itest RUN_SERIAL true) ADD_KUDU_TEST(registration-test RESOURCE_LOCK "master-web-port") ADD_KUDU_TEST(security-faults-itest) http://git-wip-us.apache.org/repos/asf/kudu/blob/a6658539/src/kudu/integration-tests/cluster_verifier.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/cluster_verifier.cc b/src/kudu/integration-tests/cluster_verifier.cc index b903a81..22cba09 100644 --- a/src/kudu/integration-tests/cluster_verifier.cc +++ b/src/kudu/integration-tests/cluster_verifier.cc @@ -75,7 +75,7 @@ void ClusterVerifier::CheckCluster() { Status s; double sleep_time = 0.1; while (MonoTime::Now() < deadline) { - s = DoKsck(); + s = RunKsck(); if (s.ok()) { break; } @@ -97,7 +97,7 @@ void ClusterVerifier::CheckCluster() { }); } -Status ClusterVerifier::DoKsck() { +Status ClusterVerifier::RunKsck() { vector<string> hp_strs; for (const auto& hp : cluster_->master_rpc_addrs()) { hp_strs.emplace_back(hp.ToString()); http://git-wip-us.apache.org/repos/asf/kudu/blob/a6658539/src/kudu/integration-tests/cluster_verifier.h ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/cluster_verifier.h b/src/kudu/integration-tests/cluster_verifier.h index 56eaffa..533a707 100644 --- a/src/kudu/integration-tests/cluster_verifier.h +++ b/src/kudu/integration-tests/cluster_verifier.h @@ -76,9 +76,10 @@ class ClusterVerifier { int expected_row_count, const MonoDelta& timeout); - private: - Status DoKsck(); + // Run the ksck utility against the cluster. + Status RunKsck(); + private: // Implementation for CheckRowCount -- returns a Status instead of firing // gtest assertions. Status DoCheckRowCount(const std::string& table_name, http://git-wip-us.apache.org/repos/asf/kudu/blob/a6658539/src/kudu/integration-tests/raft_consensus_stress-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/raft_consensus_stress-itest.cc b/src/kudu/integration-tests/raft_consensus_stress-itest.cc new file mode 100644 index 0000000..7857930 --- /dev/null +++ b/src/kudu/integration-tests/raft_consensus_stress-itest.cc @@ -0,0 +1,299 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <atomic> +#include <cstdint> +#include <cstdlib> +#include <ostream> +#include <string> +#include <thread> +#include <vector> + +#include <gflags/gflags.h> +#include <gflags/gflags_declare.h> +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/integration-tests/cluster_itest_util.h" +#include "kudu/integration-tests/cluster_verifier.h" +#include "kudu/integration-tests/raft_consensus-itest-base.h" +#include "kudu/integration-tests/test_workload.h" +#include "kudu/mini-cluster/external_mini_cluster.h" +#include "kudu/util/monotime.h" +#include "kudu/util/scoped_cleanup.h" +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" + +// Binaries built in the address- and thread-sanitizer build configurations +// run much slower than binaries built in debug and release configurations. +// The paramters are adjusted accordingly to avoid test flakiness. +#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER) +constexpr int kBuildCfgFactor = 2; +#else +constexpr int kBuildCfgFactor = 1; +#endif + +DEFINE_bool(test_raft_prepare_replacement_before_eviction, true, + "When enabled, failed replicas will only be evicted after a " + "replacement has been prepared for them."); +DEFINE_double(test_tablet_copy_early_session_timeout_prob, + 0.25 / kBuildCfgFactor, + "The probability that a tablet copy session will time out early, " + "resulting in a tablet copy failure."); +DEFINE_int32(test_follower_unavailable_considered_failed_sec, + 1 * kBuildCfgFactor, + "Seconds that a leader tablet replica is unable to successfully " + "heartbeat to a follower after which the follower is considered " + "to be failed and evicted from the config."); +DEFINE_int32(test_heartbeat_interval_ms, + 250 * kBuildCfgFactor, + "Interval at which the TS heartbeats to the master."); +DEFINE_int32(test_max_ksck_failures, 30, + "Maximum number of ksck failures in a row to tolerate before " + "considering the test as failed."); +// GLOG_FATAL: 3 +// GLOG_ERROR: 2 +// GLOG_WARNING: 1 +// GLOG_INFO: 0 +DEFINE_int32(test_minloglevel, google::GLOG_ERROR, + "Logging level for masters and tablet servers under test."); +DEFINE_int32(test_num_iterations, + 10 / kBuildCfgFactor, + "Number of iterations, repeating the test scenario."); +DEFINE_int32(test_num_replicas_per_server, + 20 / kBuildCfgFactor, + "Number of tablets per server to create."); +DEFINE_int32(test_raft_heartbeat_interval_ms, + 100 * kBuildCfgFactor, + "The Raft heartbeat interval for tablet servers under the test."); +DEFINE_int32(test_replication_factor, 3, + "The replication factor of the test table."); + +DECLARE_int32(num_replicas); +DECLARE_int32(num_tablet_servers); + +using kudu::cluster::ExternalTabletServer; +using kudu::itest::StartElection; +using kudu::itest::TServerDetails; +using std::string; +using std::vector; +using strings::Substitute; + +namespace kudu { +namespace tserver { + +class RaftConsensusStressITest : public RaftConsensusITestBase { +}; + +// Test scenario to verify the behavior of the system when tablet replicas fail +// and are replaced again and again. With high enough number of iterations, at +// some point all replacement replicas are placed on top of previously +// tombstoned ones. +TEST_F(RaftConsensusStressITest, RemoveReplaceInCycle) { + if (!AllowSlowTests()) { + LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; + return; + } + + const bool is_343_scheme = FLAGS_test_raft_prepare_replacement_before_eviction; + const int kReplicaUnavailableSec = FLAGS_test_follower_unavailable_considered_failed_sec; + const int kReplicationFactor = FLAGS_test_replication_factor; + const int kNumTabletServers = 2 * kReplicationFactor; + const int kNumReplicasPerServer = FLAGS_test_num_replicas_per_server; + const int kNumTablets = kNumTabletServers * kNumReplicasPerServer / kReplicationFactor; + const MonoDelta kTimeout = MonoDelta::FromSeconds(60 * kBuildCfgFactor); + const MonoDelta kShortTimeout = MonoDelta::FromSeconds(1 * kBuildCfgFactor); + + // Translate the replicas-per-server parameter into the catalog manager's + // --max_create_tablets_per_ts flag. Current logic in the catalog manager + // does not take into account the replication factor, that's why the + // additional division by kReplicationFactor. + const int kMaxCreateTabletsPerTs = kNumTablets / kReplicationFactor; + + // This test scenario induces a lot of faults/errors and it runs multiple + // iterations. Tablet servers and master are too chatty in this case, logging + // a lot of information. Setting --minloglevel=2 allow for logging only of + // error and fatal messages from tablet servers and masters. + const vector<string> kMasterFlags = { + Substitute("--minloglevel=$0", FLAGS_test_minloglevel), + Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme), + Substitute("--max_create_tablets_per_ts=$0", kMaxCreateTabletsPerTs), + }; + const vector<string> kTserverFlags = { + Substitute("--minloglevel=$0", FLAGS_test_minloglevel), + Substitute("--tablet_copy_early_session_timeout_prob=$0", + FLAGS_test_tablet_copy_early_session_timeout_prob), + Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme), + Substitute("--follower_unavailable_considered_failed_sec=$0", + kReplicaUnavailableSec), + Substitute("--consensus_rpc_timeout_ms=$0", 1000 * kReplicaUnavailableSec), + Substitute("--heartbeat_interval_ms=$0", FLAGS_test_heartbeat_interval_ms), + Substitute("--raft_heartbeat_interval_ms=$0", FLAGS_test_raft_heartbeat_interval_ms), + }; + + FLAGS_num_replicas = kReplicationFactor; + FLAGS_num_tablet_servers = kNumTabletServers; + NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags)); + + TestWorkload workload(cluster_.get()); + workload.set_table_name("RemoveReplaceInCycle"); + // Keep half of the total avaialable 'location space' for the replacement + // replicas spawned by the scenario below. + workload.set_num_tablets(kNumTablets); + workload.set_num_replicas(kReplicationFactor); + workload.set_num_write_threads(1); + workload.set_write_timeout_millis(kTimeout.ToMilliseconds()); + workload.set_timeout_allowed(true); + // TODO(KUDU-1188): start using at least one read thread once leader leases + // are implemented. Without leader leases, keeping a reader + // thread leads to intermittent failures due to the CHECK() + // assertion in test_workload.cc:243 with messages like: + // + // Check failed: row_count >= expected_row_count (31049 vs. 31050) + // + workload.set_num_read_threads(0); + workload.set_client_default_admin_operation_timeout_millis(kTimeout.ToMilliseconds()); + workload.Setup(); + workload.Start(); + while (workload.rows_inserted() < 100L * kNumTablets) { + SleepFor(MonoDelta::FromMilliseconds(10)); + } + workload.StopAndJoin(); + + std::atomic<bool> is_running(true); + std::atomic<bool> do_elections(true); + std::atomic<bool> do_pauses(true); + std::thread pause_and_resume_thread([&] { + // Select random tablet server and pause it for some time to make the system + // spawn the replacement replica elsewhere. + int prev_ts_idx = -1; + while (is_running) { + const int ts_idx = rand() % cluster_->num_tablet_servers(); + if (ts_idx == prev_ts_idx) { + continue; + } + ExternalTabletServer* ext_ts = cluster_->tablet_server(ts_idx); + TServerDetails* ts = tablet_servers_[ext_ts->uuid()]; + vector<string> tablet_ids; + Status s = ListRunningTabletIds(ts, kShortTimeout, &tablet_ids); + if (s.IsNetworkError() || tablet_ids.empty()) { + continue; + } + CHECK_OK(s); + prev_ts_idx = ts_idx; + if (do_pauses) { + CHECK_OK(ext_ts->Pause()); + SleepFor(MonoDelta::FromSeconds(3 * kReplicaUnavailableSec)); + CHECK_OK(ext_ts->Resume()); + } + SleepFor(MonoDelta::FromMilliseconds(250)); + } + }); + std::thread election_thread([&] { + while (is_running) { + if (do_elections) { + const auto ts_idx = rand() % cluster_->num_tablet_servers(); + ExternalTabletServer* ext_ts = cluster_->tablet_server(ts_idx); + TServerDetails* ts = tablet_servers_[ext_ts->uuid()]; + vector<string> tablet_ids; + Status s = ListRunningTabletIds(ts, kShortTimeout, &tablet_ids); + if (s.IsNetworkError() || s.IsTimedOut() || tablet_ids.empty()) { + continue; + } + CHECK_OK(s); + const auto tablet_idx = rand() % tablet_ids.size(); + // Best effort attempt: ignoring the result of StartElection() call. + StartElection(ts, tablet_ids[tablet_idx], kShortTimeout); + } + SleepFor(kShortTimeout); + } + }); + SCOPED_CLEANUP({ + is_running = false; + pause_and_resume_thread.join(); + election_thread.join(); + }); + + auto ksck_failures_in_a_row = 0; + int64_t rows_inserted = workload.rows_inserted(); + for (auto iteration = 0; iteration < FLAGS_test_num_iterations; ) { + workload.Start(); + while (workload.rows_inserted() < rows_inserted + 10) { + SleepFor(MonoDelta::FromMilliseconds(1)); + } + workload.StopAndJoin(); + rows_inserted = workload.rows_inserted(); + + ClusterVerifier v(cluster_.get()); + // Set shorter timeouts for the verification to abort earlier + // and signal the actor threads to stop messing with the tablets. + v.SetOperationsTimeout(kShortTimeout); + v.SetVerificationTimeout(kShortTimeout); + + const auto& s = v.RunKsck(); + if (!s.ok()) { + do_elections = false; + do_pauses = false; + if (!s.IsTimedOut()) { + ++ksck_failures_in_a_row; + } + if (ksck_failures_in_a_row > FLAGS_test_max_ksck_failures) { + break; + } + continue; + } + ksck_failures_in_a_row = 0; + do_elections = true; + do_pauses = true; + + SleepFor(MonoDelta::FromSeconds(1)); + LOG(INFO) << "completed iteration " << iteration; + ++iteration; + } + is_running = false; + + NO_FATALS(cluster_->AssertNoCrashes()); + + ClusterVerifier v(cluster_.get()); + v.SetOperationsTimeout(kTimeout); + v.SetVerificationTimeout(kTimeout); + if (ksck_failures_in_a_row > FLAGS_test_max_ksck_failures) { + // Suspecting a Raft consensus failure while running ksck with shorter + // timeout (see above). Run an extra round of ksck with the regular timeout + // to verify that replicas haven't really converged and, if so, just bail + // right at this point. + const auto& s = v.RunKsck(); + if (!s.ok()) { + FAIL() << Substitute("$0: tablet replicas haven't converged", s.ToString()); + } + } + + NO_FATALS(v.CheckCluster()); + // Using ClusterVerifier::AT_LEAST because the TestWorkload instance was run + // with the 'timeout_allowed' option enabled. In that case, the actual actual + // number of inserted rows may be greater than workload reports via its + // rows_inserted() method. + NO_FATALS(v.CheckRowCount(workload.table_name(), + ClusterVerifier::AT_LEAST, + workload.rows_inserted())); +} + +} // namespace tserver +} // namespace kudu
