Repository: kudu Updated Branches: refs/heads/master 7b048b8db -> a81d80a79
[tools] extra integration tests for the rebalancer This patch adds more integration tests for the rebalancer, providing coverage for the following scenarios: * a new tablet server added post-rebalancing, and the rebalancer is run again * DDL operations are run concurrently with the rebalancing * two rebalancers are running concurrently * a tablet server goes down during rebalancing * a tablet server is added during rebalancing Change-Id: I78b3dcea71ed303f6ecd199604b2385796d05da8 Reviewed-on: http://gerrit.cloudera.org:8080/10540 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo <a...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a81d80a7 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a81d80a7 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a81d80a7 Branch: refs/heads/master Commit: a81d80a7987cd66c4d0e697058ecd817d2707a9e Parents: 7b048b8 Author: Alexey Serbin <aser...@cloudera.com> Authored: Tue May 29 12:42:43 2018 -0700 Committer: Alexey Serbin <aser...@cloudera.com> Committed: Wed Jul 4 01:24:17 2018 +0000 ---------------------------------------------------------------------- src/kudu/tools/CMakeLists.txt | 3 +- src/kudu/tools/kudu-admin-test.cc | 654 ++++++++++++++++++++++++++++++--- 2 files changed, 610 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/a81d80a7/src/kudu/tools/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt index 97d8bdb..e367d64 100644 --- a/src/kudu/tools/CMakeLists.txt +++ b/src/kudu/tools/CMakeLists.txt @@ -164,7 +164,8 @@ set(KUDU_TEST_LINK_LIBS ADD_KUDU_TEST(diagnostics_log_parser-test) ADD_KUDU_TEST(ksck-test) ADD_KUDU_TEST(ksck_remote-test PROCESSORS 3) -ADD_KUDU_TEST(kudu-admin-test PROCESSORS 3) +ADD_KUDU_TEST(kudu-admin-test + NUM_SHARDS 4 PROCESSORS 3) ADD_KUDU_TEST_DEPENDENCIES(kudu-admin-test kudu) ADD_KUDU_TEST(kudu-tool-test http://git-wip-us.apache.org/repos/asf/kudu/blob/a81d80a7/src/kudu/tools/kudu-admin-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc index 1868c90..856f1f4 100644 --- a/src/kudu/tools/kudu-admin-test.cc +++ b/src/kudu/tools/kudu-admin-test.cc @@ -16,6 +16,7 @@ // under the License. #include <algorithm> +#include <atomic> #include <cstdint> #include <cstdio> #include <deque> @@ -23,6 +24,7 @@ #include <memory> #include <ostream> #include <string> +#include <thread> #include <unordered_map> #include <unordered_set> #include <utility> @@ -46,6 +48,7 @@ #include "kudu/gutil/map-util.h" #include "kudu/gutil/strings/split.h" #include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/strings/util.h" #include "kudu/integration-tests/cluster_itest_util.h" #include "kudu/integration-tests/cluster_verifier.h" #include "kudu/integration-tests/test_workload.h" @@ -55,10 +58,13 @@ #include "kudu/tablet/metadata.pb.h" #include "kudu/tools/tool_test_util.h" #include "kudu/tserver/tablet_server-test-base.h" +#include "kudu/util/countdown_latch.h" #include "kudu/util/monotime.h" #include "kudu/util/net/net_util.h" #include "kudu/util/net/sockaddr.h" #include "kudu/util/pb_util.h" +#include "kudu/util/random.h" +#include "kudu/util/scoped_cleanup.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" @@ -68,7 +74,10 @@ DECLARE_int32(num_tablet_servers); using kudu::client::KuduClient; using kudu::client::KuduClientBuilder; +using kudu::client::KuduColumnSchema; using kudu::client::KuduSchema; +using kudu::client::KuduSchemaBuilder; +using kudu::client::KuduTableAlterer; using kudu::client::KuduTableCreator; using kudu::client::sp::shared_ptr; using kudu::cluster::ExternalTabletServer; @@ -92,16 +101,22 @@ using kudu::itest::WaitUntilTabletInState; using kudu::itest::WaitUntilTabletRunning; using kudu::master::VOTER_REPLICA; using kudu::pb_util::SecureDebugString; +using std::atomic; using std::back_inserter; using std::copy; using std::deque; +using std::ostringstream; using std::string; +using std::thread; using std::unique_ptr; using std::vector; using strings::Split; using strings::Substitute; namespace kudu { + +class Schema; + namespace tools { class AdminCliTest : public tserver::TabletServerIntegrationTestBase { @@ -1357,6 +1372,55 @@ TEST_P(RebalanceStartCriteriaTest, TabletServerIsDown) { ASSERT_STR_MATCHES(err, err_msg_pattern); } +// Create tables with unbalanced replica distribution: useful in +// rebalancer-related tests. +static Status CreateUnbalancedTables( + cluster::ExternalMiniCluster* cluster, + client::KuduClient* client, + const Schema& table_schema, + const string& table_name_pattern, + int num_tables, + int rep_factor, + int tserver_idx_from, + int tserver_num, + int tserver_unresponsive_ms) { + // Keep running only some tablet servers and shut down the rest. + for (auto i = tserver_idx_from; i < tserver_num; ++i) { + cluster->tablet_server(i)->Shutdown(); + } + + // Wait for the catalog manager to understand that not all tablet servers + // are available. + SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms / 4)); + + // Create tables with their tablet replicas landing only on the tablet servers + // which are up and running. + KuduSchema client_schema(client::KuduSchemaFromSchema(table_schema)); + for (auto i = 0; i < num_tables; ++i) { + const string table_name = Substitute(table_name_pattern, i); + unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator()); + RETURN_NOT_OK(table_creator->table_name(table_name) + .schema(&client_schema) + .add_hash_partitions({ "key" }, 3) + .num_replicas(rep_factor) + .Create()); + RETURN_NOT_OK(RunKuduTool({ + "perf", + "loadgen", + cluster->master()->bound_rpc_addr().ToString(), + Substitute("--table_name=$0", table_name), + Substitute("--table_num_replicas=$0", rep_factor), + "--string_fixed=unbalanced_tables_test", + })); + } + + for (auto i = tserver_idx_from; i < tserver_num; ++i) { + RETURN_NOT_OK(cluster->tablet_server(i)->Restart()); + } + + return Status::OK(); +} + // A test to verify that rebalancing works for both 3-4-3 and 3-2-3 replica // management schemes. During replica movement, a light workload is run against // every table being rebalanced. This test covers different replication factors. @@ -1394,33 +1458,9 @@ TEST_P(RebalanceParamTest, Rebalance) { FLAGS_num_replicas = kRepFactor; NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags)); - // Keep running only (kRepFactor + 1) tablet servers and shut down the rest. - for (auto i = kRepFactor + 1; i < kNumTservers; ++i) { - cluster_->tablet_server(i)->Shutdown(); - } - - // Wait for the catalog manager to understand that only (kRepFactor + 1) - // tablet servers are available. - SleepFor(MonoDelta::FromMilliseconds(5 * kTserverUnresponsiveMs / 4)); - - // Create few tables with their tablet replicas landing only on those - // (kRepFactor + 1) running tablet servers. - KuduSchema client_schema(client::KuduSchemaFromSchema(schema_)); - for (auto i = 0; i < kNumTables; ++i) { - const string table_name = Substitute(table_name_pattern, i); - unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); - ASSERT_OK(table_creator->table_name(table_name) - .schema(&client_schema) - .add_hash_partitions({ "key" }, 3) - .num_replicas(kRepFactor) - .Create()); - ASSERT_OK(RunKuduTool({ - "perf", - "loadgen", - cluster_->master()->bound_rpc_addr().ToString(), - Substitute("--table_name=$0", table_name), - })); - } + ASSERT_OK(CreateUnbalancedTables( + cluster_.get(), client_.get(), schema_, table_name_pattern, kNumTables, + kRepFactor, kRepFactor + 1, kNumTservers, kTserverUnresponsiveMs)); // Workloads aren't run for 3-2-3 replica movement with RF = 1 because // the tablet is unavailable during the move until the target voter replica @@ -1449,21 +1489,20 @@ TEST_P(RebalanceParamTest, Rebalance) { } } - for (auto i = (kRepFactor + 1); i < kNumTservers; ++i) { - ASSERT_OK(cluster_->tablet_server(i)->Restart()); - } + const vector<string> tool_args = { + "cluster", + "rebalance", + cluster_->master()->bound_rpc_addr().ToString(), + "--move_single_replicas", + }; { string out; string err; - const Status s = RunKuduTool({ - "cluster", - "rebalance", - cluster_->master()->bound_rpc_addr().ToString(), - "--move_single_replicas", - }, &out, &err); + const Status s = RunKuduTool(tool_args, &out, &err); ASSERT_TRUE(s.ok()) << s.ToString() << ":" << err; - ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced"); + ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced") + << "stderr: " << err; } // Next run should report the cluster as balanced and no replica movement @@ -1471,15 +1510,11 @@ TEST_P(RebalanceParamTest, Rebalance) { { string out; string err; - const Status s = RunKuduTool({ - "cluster", - "rebalance", - cluster_->master()->bound_rpc_addr().ToString(), - "--move_single_replicas", - }, &out, &err); + const Status s = RunKuduTool(tool_args, &out, &err); ASSERT_TRUE(s.ok()) << s.ToString() << ":" << err; ASSERT_STR_CONTAINS(out, - "rebalancing is complete: cluster is balanced (moved 0 replicas)"); + "rebalancing is complete: cluster is balanced (moved 0 replicas)") + << "stderr: " << err; } for (auto& workload : workloads) { @@ -1487,9 +1522,536 @@ TEST_P(RebalanceParamTest, Rebalance) { } NO_FATALS(cluster_->AssertNoCrashes()); + NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster()); - ClusterVerifier v(cluster_.get()); - NO_FATALS(v.CheckCluster()); + // Now add a new tablet server into the cluster and make sure the rebalancer + // will re-distribute replicas. + ASSERT_OK(cluster_->AddTabletServer()); + { + string out; + string err; + const Status s = RunKuduTool(tool_args, &out, &err); + ASSERT_TRUE(s.ok()) << s.ToString() << ":" << err; + ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced") + << "stderr: " << err; + // The cluster is un-balanced, so many replicas should have been moved. + ASSERT_STR_NOT_CONTAINS(out, "(moved 0 replicas)"); + } + + NO_FATALS(cluster_->AssertNoCrashes()); + NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster()); +} + +// Common base for the rebalancer-related test below. +class RebalancingTest : + public tserver::TabletServerIntegrationTestBase, + public ::testing::WithParamInterface<Kudu1097> { + public: + RebalancingTest(int num_tables = 10, + int rep_factor = 3, + int num_tservers = 8, + int tserver_unresponsive_ms = 3000, + const string& table_name_pattern = "rebalance_test_table_$0") + : TabletServerIntegrationTestBase(), + is_343_scheme_(GetParam() == Kudu1097::Enable), + num_tables_(num_tables), + rep_factor_(rep_factor), + num_tservers_(num_tservers), + tserver_unresponsive_ms_(tserver_unresponsive_ms), + table_name_pattern_(table_name_pattern) { + master_flags_ = { + Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme_), + Substitute("--tserver_unresponsive_timeout_ms=$0", tserver_unresponsive_ms_), + }; + tserver_flags_ = { + Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme_), + }; + } + + protected: + void Prepare(const vector<string>& extra_tserver_flags = {}, + const vector<string>& extra_master_flags = {}) { + copy(extra_tserver_flags.begin(), extra_tserver_flags.end(), + back_inserter(tserver_flags_)); + copy(extra_master_flags.begin(), extra_master_flags.end(), + back_inserter(master_flags_)); + + FLAGS_num_tablet_servers = num_tservers_; + FLAGS_num_replicas = rep_factor_; + NO_FATALS(BuildAndStart(tserver_flags_, master_flags_)); + + ASSERT_OK(CreateUnbalancedTables( + cluster_.get(), client_.get(), schema_, table_name_pattern_, + num_tables_, rep_factor_, rep_factor_ + 1, num_tservers_, + tserver_unresponsive_ms_)); + } + + // When the rebalancer starts moving replicas, ksck detects corruption + // (that's why RuntimeError), seeing affected tables as non-healthy + // with data state of corresponding tablets as TABLET_DATA_COPYING. If using + // this method, it's a good idea to inject some latency into tablet copying + // to be able to spot the TABLET_DATA_COPYING state, see the + // '--tablet_copy_download_file_inject_latency_ms' flag for tservers. + bool IsRebalancingInProgress() { + string out; + const auto s = RunKuduTool({ + "cluster", + "ksck", + cluster_->master()->bound_rpc_addr().ToString(), + }, &out); + if (s.IsRuntimeError() && + out.find("Data state: TABLET_DATA_COPYING") != string::npos) { + return true; + } + return false; + } + + const bool is_343_scheme_; + const int num_tables_; + const int rep_factor_; + const int num_tservers_; + const int tserver_unresponsive_ms_; + const string table_name_pattern_; + vector<string> tserver_flags_; + vector<string> master_flags_; +}; + +// Make sure the rebalancer is able to do its job if running concurrently +// with DDL activity on the cluster. +class DDLDuringRebalancingTest : public RebalancingTest { + public: + DDLDuringRebalancingTest() + : RebalancingTest(20 /* num_tables */) { + } +}; +INSTANTIATE_TEST_CASE_P(, DDLDuringRebalancingTest, + ::testing::Values(Kudu1097::Disable, Kudu1097::Enable)); +TEST_P(DDLDuringRebalancingTest, TablesCreatedAndDeletedDuringRebalancing) { + if (!AllowSlowTests()) { + LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; + return; + } + + NO_FATALS(Prepare()); + + // The latch that controls the lifecycle of the concurrent DDL activity. + CountDownLatch run_latch(1); + + thread creator([&]() { + KuduSchema client_schema(client::KuduSchemaFromSchema(schema_)); + for (auto idx = 0; ; ++idx) { + if (run_latch.WaitFor(MonoDelta::FromMilliseconds(500))) { + break; + } + const string table_name = Substitute("rebalancer_extra_table_$0", idx++); + unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); + CHECK_OK(table_creator->table_name(table_name) + .schema(&client_schema) + .add_hash_partitions({ "key" }, 3) + .num_replicas(rep_factor_) + .Create()); + } + }); + auto creator_cleanup = MakeScopedCleanup([&]() { + run_latch.CountDown(); + creator.join(); + }); + + thread deleter([&]() { + for (auto idx = 0; idx < num_tables_; ++idx) { + if (run_latch.WaitFor(MonoDelta::FromMilliseconds(500))) { + break; + } + CHECK_OK(client_->DeleteTable(Substitute(table_name_pattern_, idx++))); + } + }); + auto deleter_cleanup = MakeScopedCleanup([&]() { + run_latch.CountDown(); + deleter.join(); + }); + + thread alterer([&]() { + const string kTableName = "rebalancer_dynamic_table"; + const string kNewTableName = "rebalancer_dynamic_table_new_name"; + while (true) { + // Create table. + { + KuduSchema schema; + KuduSchemaBuilder builder; + builder.AddColumn("key")->Type(KuduColumnSchema::INT64)-> + NotNull()-> + PrimaryKey(); + builder.AddColumn("a")->Type(KuduColumnSchema::INT64); + CHECK_OK(builder.Build(&schema)); + unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator()); + CHECK_OK(table_creator->table_name(kTableName) + .schema(&schema) + .set_range_partition_columns({}) + .num_replicas(rep_factor_) + .Create()); + } + if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) { + break; + } + + // Drop a column. + { + unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName)); + alt->DropColumn("a"); + CHECK_OK(alt->Alter()); + } + if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) { + break; + } + + // Add back the column with different type. + { + unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName)); + alt->AddColumn("a")->Type(KuduColumnSchema::STRING); + CHECK_OK(alt->Alter()); + } + if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) { + break; + } + + // Rename the table. + { + unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName)); + alt->RenameTo(kNewTableName); + CHECK_OK(alt->Alter()); + } + if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) { + break; + } + + // Drop the renamed table. + CHECK_OK(client_->DeleteTable(kNewTableName)); + if (run_latch.WaitFor(MonoDelta::FromMilliseconds(100))) { + break; + } + } + }); + auto alterer_cleanup = MakeScopedCleanup([&]() { + run_latch.CountDown(); + alterer.join(); + }); + + thread timer([&]() { + SleepFor(MonoDelta::FromSeconds(30)); + run_latch.CountDown(); + }); + auto timer_cleanup = MakeScopedCleanup([&]() { + timer.join(); + }); + + const vector<string> tool_args = { + "cluster", + "rebalance", + cluster_->master()->bound_rpc_addr().ToString(), + "--move_single_replicas", + }; + + // Run the rebalancer concurrently with the DDL operations. The second run + // of the rebalancer (the second run starts after joining the timer thread) + // is necessary to balance the cluster after the DDL activity stops: that's + // the easiest way to make sure the rebalancer will take into account + // all DDL changes that happened. + // + // The signal to terminate the DDL activity (done via run_latch.CountDown()) + // is sent from a separate timer thread instead of doing SleepFor() after + // the first run of the rebalancer followed by run_latch.CountDown(). + // That's to avoid dependency on the rebalancer behavior if it spots on-going + // DDL activity and continues running over and over again. + for (auto i = 0; i < 2; ++i) { + if (i == 1) { + timer.join(); + timer_cleanup.cancel(); + + // Wait for all the DDL activity to complete. + alterer.join(); + alterer_cleanup.cancel(); + + deleter.join(); + deleter_cleanup.cancel(); + + creator.join(); + creator_cleanup.cancel(); + } + + string out; + string err; + const auto s = RunKuduTool(tool_args, &out, &err); + ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err; + ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced") + << "stderr: " << err; + } + + // Next (3rd) run should report the cluster as balanced and + // no replica movement should be attempted. + { + string out; + string err; + const auto s = RunKuduTool(tool_args, &out, &err); + ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err; + ASSERT_STR_CONTAINS(out, + "rebalancing is complete: cluster is balanced (moved 0 replicas)") + << "stderr: " << err; + } + + NO_FATALS(cluster_->AssertNoCrashes()); + NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster()); +} + +// Make sure it's safe to run multiple rebalancers concurrently. The rebalancers +// might report errors, but they should not get stuck and the cluster should +// remain in good shape (i.e. no crashes, no data inconsistencies). Re-running a +// single rebalancer session again should bring the cluster to a balanced state. +class ConcurrentRebalancersTest : public RebalancingTest { + public: + ConcurrentRebalancersTest() + : RebalancingTest(10 /* num_tables */) { + } +}; +INSTANTIATE_TEST_CASE_P(, ConcurrentRebalancersTest, + ::testing::Values(Kudu1097::Disable, Kudu1097::Enable)); +TEST_P(ConcurrentRebalancersTest, TwoConcurrentRebalancers) { + if (!AllowSlowTests()) { + LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; + return; + } + + NO_FATALS(Prepare()); + + const vector<string> tool_args = { + "cluster", + "rebalance", + cluster_->master()->bound_rpc_addr().ToString(), + }; + + const auto runner_func = [&]() { + string err; + const auto s = RunKuduTool(tool_args, nullptr, &err); + + ostringstream os; + os << "rebalancer status: " << s.ToString(); + // One might expect a bad status returned: e.g., due to some race so + // the rebalancer didn't able to make progress for more than + // --max_staleness_interval_sec, etc. + if (!s.ok()) { + os << " : " << err; + } + LOG(INFO) << os.str(); + + // Should not exit on a signal: not expecting SIGSEGV, SIGABRT, etc. + return !MatchPattern(err, "*kudu: process exited on signal*"); + }; + + CountDownLatch start_synchronizer(1); + vector<thread> concurrent_runners; + for (auto i = 0; i < 5; ++i) { + concurrent_runners.emplace_back([&]() { + start_synchronizer.Wait(); + CHECK(runner_func()); + }); + } + + // Run rebalancers concurrently and wait for their completion. + start_synchronizer.CountDown(); + for (auto& runner : concurrent_runners) { + runner.join(); + } + + NO_FATALS(cluster_->AssertNoCrashes()); + NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster()); + + { + string out; + string err; + // TODO(aserbin): sometimes, when replica movement fails because of + // concurrent rebalancers or other reasons, the REPLACE attribute is left + // in replica's Raft config. In such cases, rebalancing fails because + // it cannot make progress due to the semantics of the ChangeConfig() + // method, returning error + // 'Invalid argument: must modify a field when calling MODIFY_PEER' + // in attempt to set REPLACE attribute again. + const auto s = RunKuduTool(tool_args, &out, &err); + ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err; + ASSERT_STR_CONTAINS(out, "rebalancing is complete: cluster is balanced") + << "stderr: " << err; + } + + // Next run should report the cluster as balanced and no replica movement + // should be attempted: at least one run of the rebalancer prior to this + // should succeed, so next run is about running the tool against already + // balanced cluster. + { + string out; + string err; + const auto s = RunKuduTool(tool_args, &out, &err); + ASSERT_TRUE(s.ok()) << s.ToString() << ": " << err; + ASSERT_STR_CONTAINS(out, + "rebalancing is complete: cluster is balanced (moved 0 replicas)") + << "stderr: " << err; + } + + NO_FATALS(cluster_->AssertNoCrashes()); + NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster()); +} + +// The rebalancer should stop and exit upon detecting a tablet server that +// went down. That's a simple and effective way of preventing concurrent replica +// movement by the rebalancer and the automatic re-replication (the catalog +// manager tries to move replicas from the unreachable tablet server). +class TserverGoesDownDuringRebalancingTest : public RebalancingTest { + public: + TserverGoesDownDuringRebalancingTest() : + RebalancingTest(5 /* num_tables */) { + } +}; +INSTANTIATE_TEST_CASE_P(, TserverGoesDownDuringRebalancingTest, + ::testing::Values(Kudu1097::Disable, Kudu1097::Enable)); +TEST_P(TserverGoesDownDuringRebalancingTest, TserverDown) { + if (!AllowSlowTests()) { + LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; + return; + } + + const vector<string> kTserverExtraFlags = { + // Slow down tablet copy to make rebalancing step running longer + // and become observable via tablet data states output by ksck. + "--tablet_copy_download_file_inject_latency_ms=1500", + + "--follower_unavailable_considered_failed_sec=30", + }; + NO_FATALS(Prepare(kTserverExtraFlags)); + + // Pre-condition: 'kudu cluster ksck' should be happy with the cluster state + // shortly after initial setup. + ASSERT_EVENTUALLY([&]() { + string err; + const auto s = RunKuduTool({ + "cluster", + "ksck", + cluster_->master()->bound_rpc_addr().ToString(), + }, nullptr, &err); + ASSERT_TRUE(s.ok()) << "stderr: " << err; + }); + + Random r(SeedRandom()); + const uint32_t shutdown_tserver_idx = r.Next() % num_tservers_; + + atomic<bool> run(true); + // The thread that shuts down the selected tablet server. + thread stopper([&]() { + while (run && !IsRebalancingInProgress()) { + SleepFor(MonoDelta::FromMilliseconds(10)); + } + + // All right, it's time to stop the selected tablet server. + cluster_->tablet_server(shutdown_tserver_idx)->Shutdown(); + }); + auto stopper_cleanup = MakeScopedCleanup([&]() { + run = false; + stopper.join(); + }); + + { + string out; + string err; + const auto s = RunKuduTool({ + "cluster", + "rebalance", + cluster_->master()->bound_rpc_addr().ToString(), + // Limiting the number of replicas to move. This is to make the rebalancer + // run longer, making sure the rebalancing is in progress when the tablet + // server goes down. + "--max_moves_per_server=1", + }, &out, &err); + ASSERT_TRUE(s.IsRuntimeError()) << s.ToString(); + ASSERT_STR_MATCHES( + err, "Illegal state: tablet server .* \\(.*\\): " + "unacceptable health status UNAVAILABLE"); + + // The rebalancer tool should not crash. + ASSERT_STR_NOT_CONTAINS(err, "kudu: process exited on signal"); + } + + run = false; + stopper.join(); + stopper_cleanup.cancel(); + + ASSERT_OK(cluster_->tablet_server(shutdown_tserver_idx)->Restart()); + NO_FATALS(cluster_->AssertNoCrashes()); +} + +// The rebalancer should continue working and complete rebalancing successfully +// if a new tablet server is added while the cluster is being rebalanced. +class TserverAddedDuringRebalancingTest : public RebalancingTest { + public: + TserverAddedDuringRebalancingTest() + : RebalancingTest(10 /* num_tables */) { + } +}; +INSTANTIATE_TEST_CASE_P(, TserverAddedDuringRebalancingTest, + ::testing::Values(Kudu1097::Disable, Kudu1097::Enable)); +TEST_P(TserverAddedDuringRebalancingTest, TserverStarts) { + if (!AllowSlowTests()) { + LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; + return; + } + + const vector<string> kTserverExtraFlags = { + // Slow down tablet copy to make rebalancing step running longer + // and become observable via tablet data states output by ksck. + "--tablet_copy_download_file_inject_latency_ms=1500", + + "--follower_unavailable_considered_failed_sec=30", + }; + NO_FATALS(Prepare(kTserverExtraFlags)); + + const vector<string> tool_args = { + "cluster", + "rebalance", + cluster_->master()->bound_rpc_addr().ToString(), + }; + + atomic<bool> run(true); + thread runner([&]() { + while (run) { + string err; + const auto s = RunKuduTool(tool_args, nullptr, &err); + CHECK(s.ok()) << s.ToString() << "stderr: " << err; + } + }); + auto runner_cleanup = MakeScopedCleanup([&]() { + run = false; + runner.join(); + }); + + while (!IsRebalancingInProgress()) { + SleepFor(MonoDelta::FromMilliseconds(10)); + } + + // It's time to sneak in and add new tablet server. + ASSERT_OK(cluster_->AddTabletServer()); + run = false; + runner.join(); + runner_cleanup.cancel(); + + // The rebalancer should not fail, and eventually, after a new tablet server + // is added, the cluster should become balanced. + ASSERT_EVENTUALLY([&]() { + string out; + string err; + const auto s = RunKuduTool(tool_args, &out, &err); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_STR_CONTAINS(out, + "rebalancing is complete: cluster is balanced (moved 0 replicas)") + << "stderr: " << err; + }); + + NO_FATALS(cluster_->AssertNoCrashes()); + NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster()); } } // namespace tools