[rebalancer] location-aware rebalancer (part 9/n) Updated reporting functionality of the rebalancer tool to output information on placement policy violations and other relevant information for location-aware clusters.
Added one simple integration test as well. Change-Id: I8407e9f8cf6b41a6aeb075372d852125d9739e08 Reviewed-on: http://gerrit.cloudera.org:8080/11862 Tested-by: Kudu Jenkins Reviewed-by: Will Berkeley <wdberke...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/8e9345a7 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/8e9345a7 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/8e9345a7 Branch: refs/heads/master Commit: 8e9345a79849ed3e96a85dc5240e0a4e709b2055 Parents: e172df4 Author: Alexey Serbin <aser...@cloudera.com> Authored: Fri Oct 26 18:25:24 2018 -0700 Committer: Alexey Serbin <aser...@cloudera.com> Committed: Tue Nov 6 21:59:21 2018 +0000 ---------------------------------------------------------------------- src/kudu/tools/placement_policy_util-test.cc | 38 +-- src/kudu/tools/placement_policy_util.cc | 2 +- src/kudu/tools/placement_policy_util.h | 1 - src/kudu/tools/rebalancer.cc | 332 +++++++++++++++------- src/kudu/tools/rebalancer.h | 15 + src/kudu/tools/rebalancer_tool-test.cc | 208 ++++++++++++-- 6 files changed, 456 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/placement_policy_util-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/placement_policy_util-test.cc b/src/kudu/tools/placement_policy_util-test.cc index 7d48cbe..cccd611 100644 --- a/src/kudu/tools/placement_policy_util-test.cc +++ b/src/kudu/tools/placement_policy_util-test.cc @@ -139,19 +139,19 @@ void ClusterConfigToClusterPlacementInfo(const TestClusterConfig& tcc, *tpi = std::move(result_tpi); } -// TODO(aserbin): is it needed at all? bool operator==(const PlacementPolicyViolationInfo& lhs, const PlacementPolicyViolationInfo& rhs) { return lhs.tablet_id == rhs.tablet_id && lhs.majority_location == rhs.majority_location && lhs.replicas_num_at_majority_location == - rhs.replicas_num_at_majority_location && - lhs.replication_factor == rhs.replication_factor; + rhs.replicas_num_at_majority_location; } ostream& operator<<(ostream& s, const PlacementPolicyViolationInfo& info) { s << "{tablet_id: " << info.tablet_id - << ", location: " << info.majority_location << "}"; + << ", location: " << info.majority_location + << ", replicas_num_at_majority_location: " + << info.replicas_num_at_majority_location << "}"; return s; } @@ -327,7 +327,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsSimple) { { "D", {} }, { "E", {} }, }, - { { "t0", "L0" }, }, + { { "t0", "L0", 3 }, }, { { "t0", "C" }, } }, @@ -345,7 +345,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsSimple) { { "B", { "t0", } }, { "C", { "t0", } }, }, - { { "t0", "L0" }, }, + { { "t0", "L0", 2 }, }, {}, }, @@ -364,7 +364,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsSimple) { { "C", { "t0", } }, { "D", {} }, }, - { { "t0", "L0" }, }, + { { "t0", "L0", 2 }, }, { { "t0", "B" }, } }, }; @@ -390,7 +390,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsMixed) { { "D", { "t1", "x1", } }, { "E", { "t1", } }, { "F", { "t1", } }, }, - { { "t0", "L0" }, { "t1", "L1" }, }, + { { "t0", "L0", 3 }, { "t1", "L1", 2 }, }, { { "t0", "B" }, { "t1", "E" } } }, @@ -410,7 +410,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsMixed) { { "D", { "t1", "t2", } }, { "E", { "t1", "t3", } }, { "F", { "t1", "t2", "t3", } }, }, - { { "t0", "L0" }, { "t1", "L1" }, }, + { { "t0", "L0", 3 }, { "t1", "L1", 2 }, }, { { "t0", "B" }, { "t1", "E" } } }, }; @@ -441,7 +441,7 @@ TEST_F(ClusterLocationTest, NoCandidateMovesToFixPolicyViolations) { { "E", { "t0", } }, { "F", { "t0", } }, }, - { { "t0", "L0" }, }, + { { "t0", "L0", 3 }, }, {}, }, // One RF=7 tablet with the distribution of its replica placement violating @@ -467,7 +467,7 @@ TEST_F(ClusterLocationTest, NoCandidateMovesToFixPolicyViolations) { { "G", { "t0", } }, { "H", { "t0", } }, }, - { { "t0", "L0" }, }, + { { "t0", "L0", 4 }, }, {}, }, { @@ -485,7 +485,7 @@ TEST_F(ClusterLocationTest, NoCandidateMovesToFixPolicyViolations) { { "D", { "t0", } }, { "E", { "t0", } }, { "F", { "t0", } }, }, - { { "t0", "L0" }, }, + { { "t0", "L0", 3 }, }, {} }, }; @@ -525,7 +525,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases) { { "C", { "t1", } }, { "D", { "t1", } }, }, - { { "t0", "L0" }, { "t1", "L0" }, }, + { { "t0", "L0", 2 }, { "t1", "L0", 4 }, }, {} }, { @@ -541,7 +541,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases) { { "A", { "t0", } }, { "B", { "t0", } }, { "D", { "t1", } }, { "E", { "t1", } }, }, - { { "t0", "L0" }, { "t1", "L1" }, }, + { { "t0", "L0", 2 }, { "t1", "L1", 2 }, }, {} }, { @@ -558,7 +558,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases) { { "A", { "t0", "t1", } }, { "B", { "t0", "t1", } }, { "D", { "t1", } }, { "E", { "t1", } }, }, - { { "t0", "L0" }, { "t1", "L1" }, }, + { { "t0", "L0", 2 }, { "t1", "L1", 2 }, }, {} }, { @@ -574,7 +574,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases) { { "A", { "t0", } }, { "B", { "t0", } }, { "C", { "t1", } }, { "D", { "t1", } }, }, - { { "t0", "L0" }, { "t1", "L1" }, }, + { { "t0", "L0", 2 }, { "t1", "L1", 2 }, }, {} }, { @@ -592,7 +592,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRFEdgeCases) { { "D", { "t0", } }, { "F", { "t0", } }, }, - { { "t0", "L0" }, }, + { { "t0", "L0", 2 }, }, {} }, }; @@ -616,7 +616,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRF) { { "D", { "t0", } }, { "F", { "t0", } }, { "H", { "t0", } }, }, - { { "t0", "L0" }, }, + { { "t0", "L0", 3 }, }, { { "t0", "B" }, } }, { @@ -635,7 +635,7 @@ TEST_F(ClusterLocationTest, PlacementPolicyViolationsEvenRF) { { "G", { "t0", } }, { "H", { "t0", } }, }, - { { "t0", "L1" }, }, + { { "t0", "L1", 4 }, }, { { "t0", "D" }, } }, }; http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/placement_policy_util.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/placement_policy_util.cc b/src/kudu/tools/placement_policy_util.cc index be1b502..f5ab790 100644 --- a/src/kudu/tools/placement_policy_util.cc +++ b/src/kudu/tools/placement_policy_util.cc @@ -333,7 +333,7 @@ Status DetectPlacementPolicyViolations( tablet_id, max_replicas_num, rep_factor, max_replicas_location); } if (is_policy_violated) { - info.push_back({ tablet_id, max_replicas_location }); + info.push_back({ tablet_id, max_replicas_location, max_replicas_num }); } } http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/placement_policy_util.h ---------------------------------------------------------------------- diff --git a/src/kudu/tools/placement_policy_util.h b/src/kudu/tools/placement_policy_util.h index e54848d..2938d17 100644 --- a/src/kudu/tools/placement_policy_util.h +++ b/src/kudu/tools/placement_policy_util.h @@ -86,7 +86,6 @@ Status BuildTabletsPlacementInfo(const ClusterRawInfo& raw_info, struct PlacementPolicyViolationInfo { std::string tablet_id; std::string majority_location; - int replication_factor; int replicas_num_at_majority_location; }; http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/rebalancer.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/rebalancer.cc b/src/kudu/tools/rebalancer.cc index 4d2d769..46f21f6 100644 --- a/src/kudu/tools/rebalancer.cc +++ b/src/kudu/tools/rebalancer.cc @@ -57,6 +57,7 @@ using kudu::client::KuduClient; using kudu::client::KuduClientBuilder; using std::accumulate; using std::endl; +using std::back_inserter; using std::inserter; using std::ostream; using std::map; @@ -69,6 +70,7 @@ using std::shared_ptr; using std::sort; using std::string; using std::to_string; +using std::transform; using std::unordered_map; using std::unordered_set; using std::vector; @@ -105,7 +107,7 @@ Rebalancer::Rebalancer(const Config& config) : config_(config) { } -Status Rebalancer::PrintStats(std::ostream& out) { +Status Rebalancer::PrintStats(ostream& out) { // First, report on the current balance state of the cluster. RETURN_NOT_OK(RefreshKsckResults()); const KsckResults& results = ksck_->results(); @@ -116,103 +118,49 @@ Status Rebalancer::PrintStats(std::ostream& out) { ClusterInfo ci; RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci)); - // Per-server replica distribution stats. - { - out << "Per-server replica distribution summary:" << endl; - DataTable summary({"Statistic", "Value"}); - - const auto& servers_load_info = ci.balance.servers_by_total_replica_count; - if (servers_load_info.empty()) { - summary.AddRow({ "N/A", "N/A" }); - } else { - const int64_t total_replica_count = accumulate( - servers_load_info.begin(), servers_load_info.end(), 0L, - [](int64_t sum, const pair<int32_t, string>& elem) { - return sum + elem.first; - }); - - const auto min_replica_count = servers_load_info.begin()->first; - const auto max_replica_count = servers_load_info.rbegin()->first; - const double avg_replica_count = - 1.0 * total_replica_count / servers_load_info.size(); - - summary.AddRow({ "Minimum Replica Count", to_string(min_replica_count) }); - summary.AddRow({ "Maximum Replica Count", to_string(max_replica_count) }); - summary.AddRow({ "Average Replica Count", to_string(avg_replica_count) }); - } - RETURN_NOT_OK(summary.PrintTo(out)); - out << endl; - - if (config_.output_replica_distribution_details) { - const auto& tserver_summaries = results.tserver_summaries; - unordered_map<string, string> tserver_endpoints; - for (const auto& summary : tserver_summaries) { - tserver_endpoints.emplace(summary.uuid, summary.address); - } - - out << "Per-server replica distribution details:" << endl; - DataTable servers_info({ "UUID", "Address", "Replica Count" }); - for (const auto& elem : servers_load_info) { - const auto& id = elem.second; - servers_info.AddRow({ id, tserver_endpoints[id], to_string(elem.first) }); - } - RETURN_NOT_OK(servers_info.PrintTo(out)); - out << endl; - } + const auto& ts_id_by_location = ci.locality.servers_by_location; + if (ts_id_by_location.empty()) { + // Nothing to report about: there are no tablet servers reported. + out << "an empty cluster" << endl; + return Status::OK(); } - // Per-table replica distribution stats. - { - out << "Per-table replica distribution summary:" << endl; - DataTable summary({ "Replica Skew", "Value" }); - const auto& table_skew_info = ci.balance.table_info_by_skew; - if (table_skew_info.empty()) { - summary.AddRow({ "N/A", "N/A" }); - } else { - const auto min_table_skew = table_skew_info.begin()->first; - const auto max_table_skew = table_skew_info.rbegin()->first; - const int64_t sum_table_skew = accumulate( - table_skew_info.begin(), table_skew_info.end(), 0L, - [](int64_t sum, const pair<int32_t, TableBalanceInfo>& elem) { - return sum + elem.first; - }); - double avg_table_skew = 1.0 * sum_table_skew / table_skew_info.size(); - - summary.AddRow({ "Minimum", to_string(min_table_skew) }); - summary.AddRow({ "Maximum", to_string(max_table_skew) }); - summary.AddRow({ "Average", to_string(avg_table_skew) }); - } - RETURN_NOT_OK(summary.PrintTo(out)); - out << endl; + if (ts_id_by_location.size() == 1) { + // That's about printing information about the whole cluster. + return PrintLocationBalanceStats(ts_id_by_location.begin()->first, + raw_info, ci, out); + } - if (config_.output_replica_distribution_details) { - const auto& table_summaries = results.table_summaries; - unordered_map<string, const KsckTableSummary*> table_info; - for (const auto& summary : table_summaries) { - table_info.emplace(summary.id, &summary); - } - out << "Per-table replica distribution details:" << endl; - DataTable skew( - { "Table Id", "Replica Count", "Replica Skew", "Table Name" }); - for (const auto& elem : table_skew_info) { - const auto& table_id = elem.second.table_id; - const auto it = table_info.find(table_id); - const auto* table_summary = - (it == table_info.end()) ? nullptr : it->second; - const auto& table_name = table_summary ? table_summary->name : ""; - const auto total_replica_count = table_summary - ? table_summary->replication_factor * table_summary->TotalTablets() - : 0; - skew.AddRow({ table_id, - to_string(total_replica_count), - to_string(elem.first), - table_name }); - } - RETURN_NOT_OK(skew.PrintTo(out)); - out << endl; - } + // The stats are more detailed in the case of a multi-location cluster. + DCHECK_GT(ts_id_by_location.size(), 1); + + // 1. Print information about cross-location balance. + RETURN_NOT_OK(PrintCrossLocationBalanceStats(ci, out)); + + // 2. Iterating over locations in the cluster, print per-location balance + // information. Since the ts_id_by_location is not sorted, let's first + // create a sorted list of locations so the ouput would be sorted by + // location. + vector<string> locations; + locations.reserve(ts_id_by_location.size()); + transform(ts_id_by_location.cbegin(), ts_id_by_location.cend(), + back_inserter(locations), + [](const unordered_map<string, set<string>>::value_type& elem) { + return elem.first; + }); + sort(locations.begin(), locations.end()); + + for (const auto& location : locations) { + ClusterRawInfo raw_info; + RETURN_NOT_OK(KsckResultsToClusterRawInfo(location, results, &raw_info)); + ClusterInfo ci; + RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci)); + RETURN_NOT_OK(PrintLocationBalanceStats(location, raw_info, ci, out)); } + // 3. Print information about placement policy violations. + RETURN_NOT_OK(PrintPolicyViolationInfo(raw_info, out)); + return Status::OK(); } @@ -538,6 +486,194 @@ Status Rebalancer::FilterCrossLocationTabletCandidates( return Status::OK(); } +Status Rebalancer::PrintCrossLocationBalanceStats(const ClusterInfo& ci, + ostream& out) const { + // Print location load information. + map<string, int64_t> replicas_num_by_location; + for (const auto& elem : ci.balance.servers_by_total_replica_count) { + const auto& location = FindOrDie(ci.locality.location_by_ts_id, elem.second); + LookupOrEmplace(&replicas_num_by_location, location, 0) += elem.first; + } + out << "Locations load summary:" << endl; + DataTable location_load_summary({"Location", "Load"}); + for (const auto& elem : replicas_num_by_location) { + const auto& location = elem.first; + const auto servers_num = + FindOrDie(ci.locality.servers_by_location, location).size(); + CHECK_GT(servers_num, 0); + double location_load = static_cast<double>(elem.second) / servers_num; + location_load_summary.AddRow({ location, to_string(location_load) }); + } + RETURN_NOT_OK(location_load_summary.PrintTo(out)); + out << endl; + + return Status::OK(); +} + +Status Rebalancer::PrintLocationBalanceStats(const string& location, + const ClusterRawInfo& raw_info, + const ClusterInfo& ci, + ostream& out) const { + if (!location.empty()) { + out << "--------------------------------------------------" << endl; + out << "Location: " << location << endl; + out << "--------------------------------------------------" << endl; + } + + // Per-server replica distribution stats. + { + out << "Per-server replica distribution summary:" << endl; + DataTable summary({"Statistic", "Value"}); + + const auto& servers_load_info = ci.balance.servers_by_total_replica_count; + if (servers_load_info.empty()) { + summary.AddRow({ "N/A", "N/A" }); + } else { + const int64_t total_replica_count = accumulate( + servers_load_info.begin(), servers_load_info.end(), 0L, + [](int64_t sum, const pair<int32_t, string>& elem) { + return sum + elem.first; + }); + + const auto min_replica_count = servers_load_info.begin()->first; + const auto max_replica_count = servers_load_info.rbegin()->first; + const double avg_replica_count = + 1.0 * total_replica_count / servers_load_info.size(); + + summary.AddRow({ "Minimum Replica Count", to_string(min_replica_count) }); + summary.AddRow({ "Maximum Replica Count", to_string(max_replica_count) }); + summary.AddRow({ "Average Replica Count", to_string(avg_replica_count) }); + } + RETURN_NOT_OK(summary.PrintTo(out)); + out << endl; + + if (config_.output_replica_distribution_details) { + const auto& tserver_summaries = raw_info.tserver_summaries; + unordered_map<string, string> tserver_endpoints; + for (const auto& summary : tserver_summaries) { + tserver_endpoints.emplace(summary.uuid, summary.address); + } + + out << "Per-server replica distribution details:" << endl; + DataTable servers_info({ "UUID", "Address", "Replica Count" }); + for (const auto& elem : servers_load_info) { + const auto& id = elem.second; + servers_info.AddRow({ id, tserver_endpoints[id], to_string(elem.first) }); + } + RETURN_NOT_OK(servers_info.PrintTo(out)); + out << endl; + } + } + + // Per-table replica distribution stats. + { + out << "Per-table replica distribution summary:" << endl; + DataTable summary({ "Replica Skew", "Value" }); + const auto& table_skew_info = ci.balance.table_info_by_skew; + if (table_skew_info.empty()) { + summary.AddRow({ "N/A", "N/A" }); + } else { + const auto min_table_skew = table_skew_info.begin()->first; + const auto max_table_skew = table_skew_info.rbegin()->first; + const int64_t sum_table_skew = accumulate( + table_skew_info.begin(), table_skew_info.end(), 0L, + [](int64_t sum, const pair<int32_t, TableBalanceInfo>& elem) { + return sum + elem.first; + }); + double avg_table_skew = 1.0 * sum_table_skew / table_skew_info.size(); + + summary.AddRow({ "Minimum", to_string(min_table_skew) }); + summary.AddRow({ "Maximum", to_string(max_table_skew) }); + summary.AddRow({ "Average", to_string(avg_table_skew) }); + } + RETURN_NOT_OK(summary.PrintTo(out)); + out << endl; + + if (config_.output_replica_distribution_details) { + const auto& table_summaries = raw_info.table_summaries; + unordered_map<string, const KsckTableSummary*> table_info; + for (const auto& summary : table_summaries) { + table_info.emplace(summary.id, &summary); + } + out << "Per-table replica distribution details:" << endl; + DataTable skew( + { "Table Id", "Replica Count", "Replica Skew", "Table Name" }); + for (const auto& elem : table_skew_info) { + const auto& table_id = elem.second.table_id; + const auto it = table_info.find(table_id); + const auto* table_summary = + (it == table_info.end()) ? nullptr : it->second; + const auto& table_name = table_summary ? table_summary->name : ""; + const auto total_replica_count = table_summary + ? table_summary->replication_factor * table_summary->TotalTablets() + : 0; + skew.AddRow({ table_id, + to_string(total_replica_count), + to_string(elem.first), + table_name }); + } + RETURN_NOT_OK(skew.PrintTo(out)); + out << endl; + } + } + + return Status::OK(); +} + +Status Rebalancer::PrintPolicyViolationInfo(const ClusterRawInfo& raw_info, + ostream& out) const { + TabletsPlacementInfo placement_info; + RETURN_NOT_OK(BuildTabletsPlacementInfo(raw_info, &placement_info)); + vector<PlacementPolicyViolationInfo> ppvi; + RETURN_NOT_OK(DetectPlacementPolicyViolations(placement_info, &ppvi)); + out << "Placement policy violations:" << endl; + if (ppvi.empty()) { + out << " none" << endl << endl;; + return Status::OK(); + } + + if (config_.output_replica_distribution_details) { + DataTable stats( + { "Location", "Table Name", "Tablet", "RF", "Replicas at location" }); + for (const auto& info : ppvi) { + const auto& table_id = FindOrDie(placement_info.tablet_to_table_id, + info.tablet_id); + const auto& table_info = FindOrDie(placement_info.tables_info, table_id); + stats.AddRow({ info.majority_location, + table_info.name, + info.tablet_id, + to_string(table_info.replication_factor), + to_string(info.replicas_num_at_majority_location) }); + } + RETURN_NOT_OK(stats.PrintTo(out)); + } else { + DataTable summary({ "Location", + "Number of non-complying tables", + "Number of non-complying tablets" }); + typedef pair<unordered_set<string>, unordered_set<string>> TableTabletIds; + // Location --> sets of identifiers of tables and tablets hosted by the + // tablet servers at the location. The summary is sorted by location. + map<string, TableTabletIds> info_by_location; + for (const auto& info : ppvi) { + const auto& table_id = FindOrDie(placement_info.tablet_to_table_id, + info.tablet_id); + auto& elem = LookupOrEmplace(&info_by_location, + info.majority_location, TableTabletIds()); + elem.first.emplace(table_id); + elem.second.emplace(info.tablet_id); + } + for (const auto& elem : info_by_location) { + summary.AddRow({ elem.first, + to_string(elem.second.first.size()), + to_string(elem.second.second.size()) }); + } + RETURN_NOT_OK(summary.PrintTo(out)); + } + out << endl; + + return Status::OK(); +} + Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info, const MovesInProgress& moves_in_progress, ClusterInfo* info) const { @@ -1084,11 +1220,11 @@ Status Rebalancer::AlgoBasedRunner::GetNextMovesImpl( return Status::OK(); } unordered_set<string> tablets_in_move; - std::transform(scheduled_moves_.begin(), scheduled_moves_.end(), - inserter(tablets_in_move, tablets_in_move.begin()), - [](const MovesInProgress::value_type& elem) { - return elem.first; - }); + transform(scheduled_moves_.begin(), scheduled_moves_.end(), + inserter(tablets_in_move, tablets_in_move.begin()), + [](const MovesInProgress::value_type& elem) { + return elem.first; + }); for (const auto& move : moves) { vector<string> tablet_ids; RETURN_NOT_OK(FindReplicas(move, raw_info, &tablet_ids)); http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/rebalancer.h ---------------------------------------------------------------------- diff --git a/src/kudu/tools/rebalancer.h b/src/kudu/tools/rebalancer.h index 7bb0d73..cbaef49 100644 --- a/src/kudu/tools/rebalancer.h +++ b/src/kudu/tools/rebalancer.h @@ -442,6 +442,21 @@ class Rebalancer { const TableReplicaMove& move, std::vector<std::string>* tablet_ids); + // Print information on the cross-location balance. + Status PrintCrossLocationBalanceStats(const ClusterInfo& ci, + std::ostream& out) const; + + // Print statistics for the specified location. If 'location' is an empty + // string, that's about printing the cluster-wide stats for a cluster that + // doesn't have any locations defined. + Status PrintLocationBalanceStats(const std::string& location, + const ClusterRawInfo& raw_info, + const ClusterInfo& ci, + std::ostream& out) const; + + Status PrintPolicyViolationInfo(const ClusterRawInfo& raw_info, + std::ostream& out) const; + // Convert the 'raw' information about the cluster into information suitable // for the input of the high-level rebalancing algorithm. // The 'moves_in_progress' parameter contains information on the replica moves http://git-wip-us.apache.org/repos/asf/kudu/blob/8e9345a7/src/kudu/tools/rebalancer_tool-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/rebalancer_tool-test.cc b/src/kudu/tools/rebalancer_tool-test.cc index 60e9f45..9fcd997 100644 --- a/src/kudu/tools/rebalancer_tool-test.cc +++ b/src/kudu/tools/rebalancer_tool-test.cc @@ -25,6 +25,7 @@ #include <string> #include <thread> #include <unordered_map> +#include <unordered_set> #include <utility> #include <vector> @@ -88,6 +89,7 @@ using std::thread; using std::tuple; using std::unique_ptr; using std::unordered_map; +using std::unordered_set; using std::vector; using strings::Substitute; @@ -207,28 +209,14 @@ TEST_P(RebalanceStartCriteriaTest, TabletServerIsDown) { ASSERT_STR_MATCHES(err, err_msg_pattern); } -// Create tables with unbalanced replica distribution: useful in -// rebalancer-related tests. -static Status CreateUnbalancedTables( +static Status CreateTables( cluster::ExternalMiniCluster* cluster, client::KuduClient* client, const Schema& table_schema, const string& table_name_pattern, int num_tables, int rep_factor, - int tserver_idx_from, - int tserver_num, - int tserver_unresponsive_ms, vector<string>* table_names = nullptr) { - // Keep running only some tablet servers and shut down the rest. - for (auto i = tserver_idx_from; i < tserver_num; ++i) { - cluster->tablet_server(i)->Shutdown(); - } - - // Wait for the catalog manager to understand that not all tablet servers - // are available. - SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms / 4)); - // Create tables with their tablet replicas landing only on the tablet servers // which are up and running. auto client_schema = KuduSchema::FromSchema(table_schema); @@ -253,6 +241,32 @@ static Status CreateUnbalancedTables( } } + return Status::OK(); +} + +// Create tables with unbalanced replica distribution: useful in +// rebalancer-related tests. +static Status CreateUnbalancedTables( + cluster::ExternalMiniCluster* cluster, + client::KuduClient* client, + const Schema& table_schema, + const string& table_name_pattern, + int num_tables, + int rep_factor, + int tserver_idx_from, + int tserver_num, + int tserver_unresponsive_ms, + vector<string>* table_names = nullptr) { + // Keep running only some tablet servers and shut down the rest. + for (auto i = tserver_idx_from; i < tserver_num; ++i) { + cluster->tablet_server(i)->Shutdown(); + } + + // Wait for the catalog manager to understand that not all tablet servers + // are available. + SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms / 4)); + RETURN_NOT_OK(CreateTables(cluster, client, table_schema, table_name_pattern, + num_tables, rep_factor, table_names)); for (auto i = tserver_idx_from; i < tserver_num; ++i) { RETURN_NOT_OK(cluster->tablet_server(i)->Restart()); } @@ -404,9 +418,13 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase { static const char* const kExitOnSignalStr; static const char* const kTableNamePattern; + // Working around limitations of older libstdc++. + static const unordered_set<string> kEmptySet; + void Prepare(const vector<string>& extra_tserver_flags = {}, const vector<string>& extra_master_flags = {}, const LocationInfo& location_info = {}, + const unordered_set<string>& empty_locations = kEmptySet, vector<string>* created_tables_names = nullptr) { const auto& scheme_flag = Substitute( "--raft_prepare_replacement_before_eviction=$0", is_343_scheme()); @@ -420,12 +438,60 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase { FLAGS_num_tablet_servers = num_tservers_; FLAGS_num_replicas = rep_factor_; - NO_FATALS(BuildAndStart(tserver_flags_, master_flags_, location_info)); + NO_FATALS(BuildAndStart(tserver_flags_, master_flags_, location_info, + /*create_table=*/ false)); + + if (location_info.empty()) { + ASSERT_OK(CreateUnbalancedTables( + cluster_.get(), client_.get(), schema_, kTableNamePattern, + num_tables_, rep_factor_, rep_factor_ + 1, num_tservers_, + tserver_unresponsive_ms_, created_tables_names)); + } else { + ASSERT_OK(CreateTablesExcludingLocations(empty_locations, + created_tables_names)); + } + } + + // Create tables placing their tablet replicas everywhere but not at the + // tablet servers in the specified locations. This is similar to + // CreateUnbalancedTables() but the set of tablet servers to avoid is defined + // by the set of the specified locations. + Status CreateTablesExcludingLocations( + const unordered_set<string>& excluded_locations, + vector<string>* table_names = nullptr) { + // Shutdown all tablet servers in the specified locations so no tablet + // replicas would be hosted by those servers. + unordered_set<string> seen_locations; + if (!excluded_locations.empty()) { + for (const auto& elem : tablet_servers_) { + auto* ts = elem.second; + if (ContainsKey(excluded_locations, ts->location)) { + cluster_->tablet_server_by_uuid(ts->uuid())->Shutdown(); + EmplaceIfNotPresent(&seen_locations, ts->location); + } + } + } + // Sanity check: every specified location should have been seen, otherwise + // something is wrong with the tablet servers' registration. + CHECK_EQ(excluded_locations.size(), seen_locations.size()); + + // Wait for the catalog manager to understand that not all tablet servers + // are available. + SleepFor(MonoDelta::FromMilliseconds(5 * tserver_unresponsive_ms_ / 4)); + RETURN_NOT_OK(CreateTables(cluster_.get(), client_.get(), schema_, + kTableNamePattern, num_tables_, rep_factor_, + table_names)); + // Start tablet servers at the excluded locations. + if (!excluded_locations.empty()) { + for (const auto& elem : tablet_servers_) { + auto* ts = elem.second; + if (ContainsKey(excluded_locations, ts->location)) { + RETURN_NOT_OK(cluster_->tablet_server_by_uuid(ts->uuid())->Restart()); + } + } + } - ASSERT_OK(CreateUnbalancedTables( - cluster_.get(), client_.get(), schema_, kTableNamePattern, - num_tables_, rep_factor_, rep_factor_ + 1, num_tservers_, - tserver_unresponsive_ms_, created_tables_names)); + return Status::OK(); } // When the rebalancer starts moving replicas, ksck detects corruption @@ -454,6 +520,7 @@ class RebalancingTest : public tserver::TabletServerIntegrationTestBase { }; const char* const RebalancingTest::kExitOnSignalStr = "kudu: process exited on signal"; const char* const RebalancingTest::kTableNamePattern = "rebalance_test_table_$0"; +const unordered_set<string> RebalancingTest::kEmptySet = unordered_set<string>(); typedef testing::WithParamInterface<Kudu1097> Kudu1097ParamTest; @@ -1185,7 +1252,7 @@ TEST_F(LocationAwareRebalancingBasicTest, Basic) { const LocationInfo location_info = { { "/A", 2 }, { "/B", 2 }, { "/C", 2 }, }; vector<string> table_names; - NO_FATALS(Prepare({}, {}, location_info, &table_names)); + NO_FATALS(Prepare({}, {}, location_info, kEmptySet, &table_names)); const vector<string> tool_args = { "cluster", @@ -1261,5 +1328,104 @@ TEST_F(LocationAwareRebalancingBasicTest, Basic) { } } +class LocationAwareBalanceInfoTest : public RebalancingTest { + public: + LocationAwareBalanceInfoTest() + : RebalancingTest(/*num_tables=*/ 1, + /*rep_factor=*/ 3, + /*num_tservers=*/ 5) { + } + + bool is_343_scheme() const override { + // These tests are for the 3-4-3 replica management scheme only. + return true; + } +}; + +// Verify the output of the location-aware rebalancer against a cluster +// that has multiple locations. +TEST_F(LocationAwareBalanceInfoTest, ReportOnly) { + static const char kReferenceOutput[] = + R"***(Locations load summary: + Location | Load +----------+---------- + /A | 3.000000 + /B | 3.000000 + /C | 0.000000 + +-------------------------------------------------- +Location: /A +-------------------------------------------------- +Per-server replica distribution summary: + Statistic | Value +-----------------------+---------- + Minimum Replica Count | 3 + Maximum Replica Count | 3 + Average Replica Count | 3.000000 + +Per-table replica distribution summary: + Replica Skew | Value +--------------+---------- + Minimum | 0 + Maximum | 0 + Average | 0.000000 + +-------------------------------------------------- +Location: /B +-------------------------------------------------- +Per-server replica distribution summary: + Statistic | Value +-----------------------+---------- + Minimum Replica Count | 3 + Maximum Replica Count | 3 + Average Replica Count | 3.000000 + +Per-table replica distribution summary: + Replica Skew | Value +--------------+---------- + Minimum | 0 + Maximum | 0 + Average | 0.000000 + +-------------------------------------------------- +Location: /C +-------------------------------------------------- +Per-server replica distribution summary: + Statistic | Value +-----------------------+---------- + Minimum Replica Count | 0 + Maximum Replica Count | 0 + Average Replica Count | 0.000000 + +Per-table replica distribution summary: + Replica Skew | Value +--------------+------- + N/A | N/A + +Placement policy violations: + Location | Number of non-complying tables | Number of non-complying tablets +----------+--------------------------------+--------------------------------- + /B | 1 | 3 +)***"; + + const LocationInfo location_info = { { "/A", 1 }, { "/B", 2 }, { "/C", 2 }, }; + NO_FATALS(Prepare({}, {}, location_info, { "/C" })); + + string out; + string err; + Status s = RunKuduTool({ + "cluster", + "rebalance", + cluster_->master()->bound_rpc_addr().ToString(), + "--report_only", + }, &out, &err); + ASSERT_TRUE(s.ok()) << ToolRunInfo(s, out, err); + // The output should match the reference report. + ASSERT_STR_CONTAINS(out, kReferenceOutput); + // The actual rebalancing should not run. + ASSERT_STR_NOT_CONTAINS(out, "rebalancing is complete:") + << ToolRunInfo(s, out, err); +} + } // namespace tools } // namespace kudu