[rebalancer] location-aware rebalancer (part 4/n) Refactored Rebalancer and Runner classes, separating common base for a runner of the rebalancing process.
Change-Id: Id47183fc853573390b22ec714751adec93e0ea3a Reviewed-on: http://gerrit.cloudera.org:8080/11745 Tested-by: Alexey Serbin <aser...@cloudera.com> Reviewed-by: Will Berkeley <wdberke...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/34bb7f93 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/34bb7f93 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/34bb7f93 Branch: refs/heads/master Commit: 34bb7f93b4bcb8d39803d0f0718148f84f5cca22 Parents: 43161e5 Author: Alexey Serbin <aser...@cloudera.com> Authored: Fri Oct 19 22:20:33 2018 -0700 Committer: Alexey Serbin <aser...@cloudera.com> Committed: Mon Oct 29 23:05:50 2018 +0000 ---------------------------------------------------------------------- src/kudu/tools/rebalancer.cc | 640 ++++++++++++++++++++------------------ src/kudu/tools/rebalancer.h | 209 ++++++++----- 2 files changed, 481 insertions(+), 368 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/34bb7f93/src/kudu/tools/rebalancer.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/rebalancer.cc b/src/kudu/tools/rebalancer.cc index c3225f2..abf28f0 100644 --- a/src/kudu/tools/rebalancer.cc +++ b/src/kudu/tools/rebalancer.cc @@ -39,6 +39,7 @@ #include "kudu/client/client.h" #include "kudu/gutil/basictypes.h" +#include "kudu/gutil/map-util.h" #include "kudu/gutil/port.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/tools/ksck.h" @@ -93,9 +94,7 @@ Rebalancer::Config::Config( } Rebalancer::Rebalancer(const Config& config) - : config_(config), - random_device_(), - random_generator_(random_device_()) { + : config_(config) { } Status Rebalancer::PrintStats(std::ostream& out) { @@ -109,13 +108,12 @@ Status Rebalancer::PrintStats(std::ostream& out) { ClusterInfo ci; RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci)); - auto& cbi = ci.balance; // Per-server replica distribution stats. { out << "Per-server replica distribution summary:" << endl; DataTable summary({"Statistic", "Value"}); - const auto& servers_load_info = cbi.servers_by_total_replica_count; + const auto& servers_load_info = ci.balance.servers_by_total_replica_count; if (servers_load_info.empty()) { summary.AddRow({ "N/A", "N/A" }); } else { @@ -159,7 +157,7 @@ Status Rebalancer::PrintStats(std::ostream& out) { { out << "Per-table replica distribution summary:" << endl; DataTable summary({ "Replica Skew", "Value" }); - const auto& table_skew_info = cbi.table_info_by_skew; + const auto& table_skew_info = ci.balance.table_info_by_skew; if (table_skew_info.empty()) { summary.AddRow({ "N/A", "N/A" }); } else { @@ -225,106 +223,145 @@ Status Rebalancer::Run(RunStatus* result_status, size_t* moves_count) { ClusterInfo ci; RETURN_NOT_OK(BuildClusterInfo(raw_info, MovesInProgress(), &ci)); - Runner runner(config_.max_moves_per_server, deadline); + TwoDimensionalGreedyRunner runner(this, config_.max_moves_per_server, deadline); RETURN_NOT_OK(runner.Init(config_.master_addresses)); + RETURN_NOT_OK(RunWith(&runner, result_status)); + if (moves_count != nullptr) { + *moves_count = runner.moves_count(); + } - const MonoDelta max_staleness_delta = - MonoDelta::FromSeconds(config_.max_staleness_interval_sec); - MonoTime staleness_start = MonoTime::Now(); - bool is_timed_out = false; - bool resync_state = false; - while (!is_timed_out) { - if (resync_state) { - resync_state = false; - MonoDelta staleness_delta = MonoTime::Now() - staleness_start; - if (staleness_delta > max_staleness_delta) { - LOG(INFO) << Substitute("detected a staleness period of $0", staleness_delta.ToString()); - return Status::Incomplete(Substitute( - "stalled with no progress for more than $0 seconds, aborting", - max_staleness_delta.ToString())); - } - // The actual re-synchronization happens during GetNextMoves() below: - // updated info is collected from the cluster and fed into the algorithm. - LOG(INFO) << "re-synchronizing cluster state"; - } + return Status::OK(); +} - { - vector<Rebalancer::ReplicaMove> replica_moves; - RETURN_NOT_OK(GetNextMoves(runner.scheduled_moves(), &replica_moves)); - if (replica_moves.empty() && runner.scheduled_moves().empty()) { - // No moves are left: done! - break; - } +Status Rebalancer::KsckResultsToClusterRawInfo( + const KsckResults& ksck_info, + ClusterRawInfo* raw_info) { + DCHECK(raw_info); - // Filter out moves for tablets which already have operations in progress. - FilterMoves(runner.scheduled_moves(), &replica_moves); - runner.LoadMoves(std::move(replica_moves)); - } + raw_info->tserver_summaries = ksck_info.tserver_summaries; + raw_info->table_summaries = ksck_info.table_summaries; + raw_info->tablet_summaries = ksck_info.tablet_summaries; - auto has_errors = false; - while (!is_timed_out) { - auto is_scheduled = runner.ScheduleNextMove(&has_errors, &is_timed_out); - resync_state |= has_errors; - if (resync_state || is_timed_out) { - break; - } - if (is_scheduled) { - // Reset the start of the staleness interval: there was some progress - // in scheduling new move operations. - staleness_start = MonoTime::Now(); + return Status::OK(); +} - // Continue scheduling available move operations while there is enough - // capacity, i.e. until number of pending move operations on every - // involved tablet server reaches max_moves_per_server. Once no more - // operations can be scheduled, it's time to check for their status. +// Given high-level description of moves, find tablets with replicas at the +// corresponding tablet servers to satisfy those high-level descriptions. +// The idea is to find all tablets of the specified table that would have a +// replica at the source server, but would not have a replica at the destination +// server. That is to satisfy the restriction of having no more than one replica +// of the same tablet per server. +// +// An additional constraint: it's better not to move leader replicas, if +// possible. If a client has a write operation in progress, moving leader +// replicas of affected tablets would make the client to re-resolve new leaders +// and retry the operations. Moving leader replicas is used as last resort +// when no other candidates are left. +Status Rebalancer::FindReplicas(const TableReplicaMove& move, + const ClusterRawInfo& raw_info, + vector<string>* tablet_ids) { + const auto& table_id = move.table_id; + + // Tablet ids of replicas on the source tserver that are non-leaders. + vector<string> tablet_uuids_src; + // Tablet ids of replicas on the source tserver that are leaders. + vector<string> tablet_uuids_src_leaders; + // UUIDs of tablets of the selected table at the destination tserver. + vector<string> tablet_uuids_dst; + + for (const auto& tablet_summary : raw_info.tablet_summaries) { + if (tablet_summary.table_id != table_id) { + continue; + } + if (tablet_summary.result != KsckCheckResult::HEALTHY) { + VLOG(1) << Substitute("table $0: not considering replicas of tablet $1 " + "as candidates for movement since the tablet's " + "status is '$2'", + table_id, tablet_summary.id, + KsckCheckResultToString(tablet_summary.result)); + continue; + } + for (const auto& replica_summary : tablet_summary.replicas) { + if (replica_summary.ts_uuid != move.from && + replica_summary.ts_uuid != move.to) { continue; } - - // Poll for the status of pending operations. If some of the in-flight - // operations are complete, it might be possible to schedule new ones - // by calling Runner::ScheduleNextMove(). - auto has_updates = runner.UpdateMovesInProgressStatus(&has_errors, - &is_timed_out); - if (has_updates) { - // Reset the start of the staleness interval: there was some updates - // on the status of scheduled move operations. - staleness_start = MonoTime::Now(); + if (!replica_summary.ts_healthy) { + VLOG(1) << Substitute("table $0: not considering replica movement " + "from $1 to $2 since server $3 is not healthy", + table_id, + move.from, move.to, replica_summary.ts_uuid); + continue; } - resync_state |= has_errors; - if (resync_state || is_timed_out || !has_updates) { - // If there were errors while trying to get the statuses of pending - // operations it's necessary to re-synchronize the state of the cluster: - // most likely something has changed, so it's better to get a new set - // of planned moves. - break; + if (replica_summary.ts_uuid == move.from) { + if (replica_summary.is_leader) { + tablet_uuids_src_leaders.emplace_back(tablet_summary.id); + } else { + tablet_uuids_src.emplace_back(tablet_summary.id); + } + } else { + DCHECK_EQ(move.to, replica_summary.ts_uuid); + tablet_uuids_dst.emplace_back(tablet_summary.id); } - - // Sleep a bit before going next cycle of status polling. - SleepFor(MonoDelta::FromMilliseconds(200)); } } + sort(tablet_uuids_src.begin(), tablet_uuids_src.end()); + sort(tablet_uuids_dst.begin(), tablet_uuids_dst.end()); - *result_status = is_timed_out ? RunStatus::TIMED_OUT - : RunStatus::CLUSTER_IS_BALANCED; - if (moves_count) { - *moves_count = runner.moves_count(); - } + vector<string> tablet_uuids; + set_difference( + tablet_uuids_src.begin(), tablet_uuids_src.end(), + tablet_uuids_dst.begin(), tablet_uuids_dst.end(), + inserter(tablet_uuids, tablet_uuids.begin())); - return Status::OK(); -} + if (!tablet_uuids.empty()) { + // If there are tablets with non-leader replicas at the source server, + // those are the best candidates for movement. + tablet_ids->swap(tablet_uuids); + return Status::OK(); + } -Status Rebalancer::KsckResultsToClusterRawInfo( - const KsckResults& ksck_info, - ClusterRawInfo* raw_info) { - DCHECK(raw_info); + // If no tablets with non-leader replicas were found, resort to tablets with + // leader replicas at the source server. + DCHECK(tablet_uuids.empty()); + sort(tablet_uuids_src_leaders.begin(), tablet_uuids_src_leaders.end()); + set_difference( + tablet_uuids_src_leaders.begin(), tablet_uuids_src_leaders.end(), + tablet_uuids_dst.begin(), tablet_uuids_dst.end(), + inserter(tablet_uuids, tablet_uuids.begin())); - raw_info->tserver_summaries = ksck_info.tserver_summaries; - raw_info->table_summaries = ksck_info.table_summaries; - raw_info->tablet_summaries = ksck_info.tablet_summaries; + tablet_ids->swap(tablet_uuids); return Status::OK(); } +void Rebalancer::FilterMoves(const MovesInProgress& scheduled_moves, + vector<ReplicaMove>* replica_moves) { + unordered_set<string> tablet_uuids; + vector<ReplicaMove> filtered_replica_moves; + for (auto& move_op : *replica_moves) { + const auto& tablet_uuid = move_op.tablet_uuid; + if (ContainsKey(scheduled_moves, tablet_uuid)) { + // There is a move operation in progress for the tablet, don't schedule + // another one. + continue; + } + if (PREDICT_TRUE(tablet_uuids.emplace(tablet_uuid).second)) { + filtered_replica_moves.emplace_back(std::move(move_op)); + } else { + // Rationale behind the unique tablet constraint: the implementation of + // the Run() method is designed to re-order operations suggested by the + // high-level algorithm to use the op-count-per-tablet-server capacity + // as much as possible. Right now, the RunStep() method outputs only one + // move operation per tablet in every batch. The code below is to + // enforce the contract between Run() and RunStep() methods. + LOG(DFATAL) << "detected multiple replica move operations for the same " + "tablet " << tablet_uuid; + } + } + *replica_moves = std::move(filtered_replica_moves); +} + Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info, const MovesInProgress& moves_in_progress, ClusterInfo* info) const { @@ -463,175 +500,88 @@ Status Rebalancer::BuildClusterInfo(const ClusterRawInfo& raw_info, return Status::OK(); } - -// Run one step of the rebalancer. Due to the inherent restrictions of the -// rebalancing engine, no more than one replica per tablet is moved during -// one step of the rebalancing. -Status Rebalancer::GetNextMoves(const MovesInProgress& moves_in_progress, - vector<ReplicaMove>* replica_moves) { - RETURN_NOT_OK(RefreshKsckResults()); - const auto& ksck_info = ksck_->results(); - - // For simplicity, allow to run the rebalancing only when all tablet servers - // are in good shape. Otherwise, the rebalancing might interfere with the - // automatic re-replication or get unexpected errors while moving replicas. - for (const auto& s : ksck_info.tserver_summaries) { - if (s.health != KsckServerHealth::HEALTHY) { - return Status::IllegalState( - Substitute("tablet server $0 ($1): unacceptable health status $2", - s.uuid, s.address, ServerHealthToString(s.health))); +Status Rebalancer::RunWith(Runner* runner, RunStatus* result_status) { + const MonoDelta max_staleness_delta = + MonoDelta::FromSeconds(config_.max_staleness_interval_sec); + MonoTime staleness_start = MonoTime::Now(); + bool is_timed_out = false; + bool resync_state = false; + while (!is_timed_out) { + if (resync_state) { + resync_state = false; + MonoDelta staleness_delta = MonoTime::Now() - staleness_start; + if (staleness_delta > max_staleness_delta) { + LOG(INFO) << Substitute("detected a staleness period of $0", + staleness_delta.ToString()); + return Status::Incomplete(Substitute( + "stalled with no progress for more than $0 seconds, aborting", + max_staleness_delta.ToString())); + } + // The actual re-synchronization happens during GetNextMoves() below: + // updated info is collected from the cluster and fed into the algorithm. + LOG(INFO) << "re-synchronizing cluster state"; } - } - - // The number of operations to output by the algorithm. Those will be - // translated into concrete tablet replica movement operations, the output of - // this method. - const size_t max_moves = config_.max_moves_per_server * - ksck_info.tserver_summaries.size() * 5; - ClusterRawInfo raw_info; - RETURN_NOT_OK(KsckResultsToClusterRawInfo(ksck_info, &raw_info)); + bool has_more_moves = false; + RETURN_NOT_OK(runner->GetNextMoves(&has_more_moves)); + if (!has_more_moves) { + // No moves are left, done! + break; + } - replica_moves->clear(); - vector<TableReplicaMove> moves; - ClusterInfo cluster_info; - RETURN_NOT_OK(BuildClusterInfo(raw_info, moves_in_progress, &cluster_info)); - RETURN_NOT_OK(algo_.GetNextMoves(cluster_info, max_moves, &moves)); - if (moves.empty()) { - // No suitable moves were found: the cluster is balanced, - // assuming all pending moves, if any, will succeed. - return Status::OK(); - } - unordered_set<string> tablets_in_move; - std::transform(moves_in_progress.begin(), moves_in_progress.end(), - inserter(tablets_in_move, tablets_in_move.begin()), - [](const MovesInProgress::value_type& elem) { - return elem.first; - }); - for (const auto& move : moves) { - vector<string> tablet_ids; - RETURN_NOT_OK(FindReplicas(move, ksck_info, &tablet_ids)); - // Shuffle the set of the tablet identifiers: that's to achieve even spread - // of moves across tables with the same skew. - std::shuffle(tablet_ids.begin(), tablet_ids.end(), random_generator_); - string move_tablet_id; - for (const auto& tablet_id : tablet_ids) { - if (tablets_in_move.find(tablet_id) == tablets_in_move.end()) { - // For now, choose the very first tablet that does not have replicas - // in move. Later on, additional logic might be added to find - // the best candidate. - move_tablet_id = tablet_id; + auto has_errors = false; + while (!is_timed_out) { + auto is_scheduled = runner->ScheduleNextMove(&has_errors, &is_timed_out); + resync_state |= has_errors; + if (resync_state || is_timed_out) { break; } - } - if (move_tablet_id.empty()) { - LOG(WARNING) << Substitute( - "table $0: could not find any suitable replica to move " - "from server $1 to server $2", move.table_id, move.from, move.to); - continue; - } - ReplicaMove info; - info.tablet_uuid = move_tablet_id; - info.ts_uuid_from = move.from; - info.ts_uuid_to = move.to; - replica_moves->emplace_back(std::move(info)); - // Mark the tablet as 'has a replica in move'. - tablets_in_move.emplace(move_tablet_id); - } - - return Status::OK(); -} - -// Given high-level description of moves, find tablets with replicas at the -// corresponding tablet servers to satisfy those high-level descriptions. -// The idea is to find all tablets of the specified table that would have a -// replica at the source server, but would not have a replica at the destination -// server. That is to satisfy the restriction of having no more than one replica -// of the same tablet per server. -// -// An additional constraint: it's better not to move leader replicas, if -// possible. If a client has a write operation in progress, moving leader -// replicas of affected tablets would make the client to re-resolve new leaders -// and retry the operations. Moving leader replicas is used as last resort -// when no other candidates are left. -Status Rebalancer::FindReplicas(const TableReplicaMove& move, - const KsckResults& ksck_info, - vector<string>* tablet_ids) const { - const auto& table_id = move.table_id; - - // Tablet ids of replicas on the source tserver that are non-leaders. - vector<string> tablet_uuids_src; - // Tablet ids of replicas on the source tserver that are leaders. - vector<string> tablet_uuids_src_leaders; - // UUIDs of tablets of the selected table at the destination tserver. - vector<string> tablet_uuids_dst; + if (is_scheduled) { + // Reset the start of the staleness interval: there was some progress + // in scheduling new move operations. + staleness_start = MonoTime::Now(); - for (const auto& tablet_summary : ksck_info.tablet_summaries) { - if (tablet_summary.table_id != table_id) { - continue; - } - if (tablet_summary.result != KsckCheckResult::HEALTHY) { - VLOG(1) << Substitute("table $0: not considering replicas of tablet $1 " - "as candidates for movement since the tablet's " - "status is '$2'", - table_id, tablet_summary.id, - KsckCheckResultToString(tablet_summary.result)); - continue; - } - for (const auto& replica_summary : tablet_summary.replicas) { - if (replica_summary.ts_uuid != move.from && - replica_summary.ts_uuid != move.to) { + // Continue scheduling available move operations while there is enough + // capacity, i.e. until number of pending move operations on every + // involved tablet server reaches max_moves_per_server. Once no more + // operations can be scheduled, it's time to check for their status. continue; } - if (!replica_summary.ts_healthy) { - VLOG(1) << Substitute("table $0: not considering replica movement " - "from $1 to $2 since server $3 is not healthy", - table_id, - move.from, move.to, replica_summary.ts_uuid); - continue; + + // Poll for the status of pending operations. If some of the in-flight + // operations are complete, it might be possible to schedule new ones + // by calling Runner::ScheduleNextMove(). + auto has_updates = runner->UpdateMovesInProgressStatus(&has_errors, + &is_timed_out); + if (has_updates) { + // Reset the start of the staleness interval: there was some updates + // on the status of scheduled move operations. + staleness_start = MonoTime::Now(); } - if (replica_summary.ts_uuid == move.from) { - if (replica_summary.is_leader) { - tablet_uuids_src_leaders.emplace_back(tablet_summary.id); - } else { - tablet_uuids_src.emplace_back(tablet_summary.id); - } - } else { - DCHECK_EQ(move.to, replica_summary.ts_uuid); - tablet_uuids_dst.emplace_back(tablet_summary.id); + resync_state |= has_errors; + if (resync_state || is_timed_out || !has_updates) { + // If there were errors while trying to get the statuses of pending + // operations it's necessary to re-synchronize the state of the cluster: + // most likely something has changed, so it's better to get a new set + // of planned moves. + break; } - } - } - sort(tablet_uuids_src.begin(), tablet_uuids_src.end()); - sort(tablet_uuids_dst.begin(), tablet_uuids_dst.end()); - vector<string> tablet_uuids; - set_difference( - tablet_uuids_src.begin(), tablet_uuids_src.end(), - tablet_uuids_dst.begin(), tablet_uuids_dst.end(), - inserter(tablet_uuids, tablet_uuids.begin())); - - if (!tablet_uuids.empty()) { - // If there are tablets with non-leader replicas at the source server, - // those are the best candidates for movement. - tablet_ids->swap(tablet_uuids); - return Status::OK(); + // Sleep a bit before going next cycle of status polling. + SleepFor(MonoDelta::FromMilliseconds(200)); + } } - // If no tablets with non-leader replicas were found, resort to tablets with - // leader replicas at the source server. - DCHECK(tablet_uuids.empty()); - sort(tablet_uuids_src_leaders.begin(), tablet_uuids_src_leaders.end()); - set_difference( - tablet_uuids_src_leaders.begin(), tablet_uuids_src_leaders.end(), - tablet_uuids_dst.begin(), tablet_uuids_dst.end(), - inserter(tablet_uuids, tablet_uuids.begin())); - - tablet_ids->swap(tablet_uuids); - + *result_status = is_timed_out ? RunStatus::TIMED_OUT + : RunStatus::CLUSTER_IS_BALANCED; return Status::OK(); } +Status Rebalancer::GetClusterRawInfo(ClusterRawInfo* raw_info) { + RETURN_NOT_OK(RefreshKsckResults()); + return KsckResultsToClusterRawInfo(ksck_->results(), raw_info); +} + Status Rebalancer::RefreshKsckResults() { shared_ptr<KsckCluster> cluster; RETURN_NOT_OK_PREPEND( @@ -643,47 +593,20 @@ Status Rebalancer::RefreshKsckResults() { return Status::OK(); } -void Rebalancer::FilterMoves(const MovesInProgress& scheduled_moves, - vector<ReplicaMove>* replica_moves) { - unordered_set<string> tablet_uuids; - vector<ReplicaMove> filtered_replica_moves; - for (auto&& move_op : *replica_moves) { - const auto& tablet_uuid = move_op.tablet_uuid; - if (scheduled_moves.find(tablet_uuid) != scheduled_moves.end()) { - // There is a move operation in progress for the tablet, don't schedule - // another one. - continue; - } - if (PREDICT_TRUE(tablet_uuids.emplace(tablet_uuid).second)) { - filtered_replica_moves.push_back(std::move(move_op)); - } else { - // Rationale behind the unique tablet constraint: the implementation of - // the Run() method is designed to re-order operations suggested by the - // high-level algorithm to use the op-count-per-tablet-server capacity - // as much as possible. Right now, the RunStep() method outputs only one - // move operation per tablet in every batch. The code below is to - // enforce the contract between Run() and RunStep() methods. - LOG(DFATAL) << "detected multiple replica move operations for the same " - "tablet " << tablet_uuid; - } - } - replica_moves->swap(filtered_replica_moves); -} - -Rebalancer::Runner::Runner(size_t max_moves_per_server, - const boost::optional<MonoTime>& deadline) - : max_moves_per_server_(max_moves_per_server), - deadline_(deadline), +Rebalancer::BaseRunner::BaseRunner(Rebalancer* rebalancer, + size_t max_moves_per_server, + boost::optional<MonoTime> deadline) + : rebalancer_(rebalancer), + max_moves_per_server_(max_moves_per_server), + deadline_(std::move(deadline)), moves_count_(0) { + CHECK(rebalancer_); } -Status Rebalancer::Runner::Init(vector<string> master_addresses) { +Status Rebalancer::BaseRunner::Init(vector<string> master_addresses) { DCHECK_EQ(0, moves_count_); - DCHECK(src_op_indices_.empty()); - DCHECK(dst_op_indices_.empty()); DCHECK(op_count_per_ts_.empty()); DCHECK(ts_per_op_count_.empty()); - DCHECK(scheduled_moves_.empty()); DCHECK(master_addresses_.empty()); DCHECK(client_.get() == nullptr); master_addresses_ = std::move(master_addresses); @@ -692,7 +615,60 @@ Status Rebalancer::Runner::Init(vector<string> master_addresses) { .Build(&client_); } -void Rebalancer::Runner::LoadMoves(vector<ReplicaMove> replica_moves) { +Status Rebalancer::BaseRunner::GetNextMoves(bool* has_moves) { + vector<ReplicaMove> replica_moves; + RETURN_NOT_OK(GetNextMovesImpl(&replica_moves)); + if (replica_moves.empty() && scheduled_moves_.empty()) { + *has_moves = false; + return Status::OK(); + } + + // Filter out moves for tablets which already have operations in progress. + // The idea is simple: no more than one move operation per tablet should + // ever be attempted. + Rebalancer::FilterMoves(scheduled_moves_, &replica_moves); + LoadMoves(std::move(replica_moves)); + + // TODO(aserbin): now this method reports on presence of some moves even if + // all of those are in progress and no fresh new are available. + // Would it be more convenient for to report only on the + // fresh new moves and check for the presence of the scheduled + // moves at the upper level? + *has_moves = true; + return Status::OK(); +} + +void Rebalancer::BaseRunner::UpdateOnMoveCompleted(const string& ts_uuid) { + const auto op_count = op_count_per_ts_[ts_uuid]--; + const auto op_range = ts_per_op_count_.equal_range(op_count); + bool ts_per_op_count_updated = false; + for (auto it = op_range.first; it != op_range.second; ++it) { + if (it->second == ts_uuid) { + ts_per_op_count_.erase(it); + ts_per_op_count_.emplace(op_count - 1, ts_uuid); + ts_per_op_count_updated = true; + break; + } + } + DCHECK(ts_per_op_count_updated); +} + +Rebalancer::AlgoBasedRunner::AlgoBasedRunner( + Rebalancer* rebalancer, + size_t max_moves_per_server, + boost::optional<MonoTime> deadline) + : BaseRunner(rebalancer, max_moves_per_server, std::move(deadline)), + random_generator_(random_device_()) { +} + +Status Rebalancer::AlgoBasedRunner::Init(vector<string> master_addresses) { + DCHECK(src_op_indices_.empty()); + DCHECK(dst_op_indices_.empty()); + DCHECK(scheduled_moves_.empty()); + return BaseRunner::Init(std::move(master_addresses)); +} + +void Rebalancer::AlgoBasedRunner::LoadMoves(vector<ReplicaMove> replica_moves) { // The moves to schedule (used by subsequent calls to ScheduleNextMove()). replica_moves_.swap(replica_moves); @@ -758,7 +734,8 @@ void Rebalancer::Runner::LoadMoves(vector<ReplicaMove> replica_moves) { } // Return true if replica move operation has been scheduled successfully. -bool Rebalancer::Runner::ScheduleNextMove(bool* has_errors, bool* timed_out) { +bool Rebalancer::AlgoBasedRunner::ScheduleNextMove(bool* has_errors, + bool* timed_out) { DCHECK(has_errors); DCHECK(timed_out); *has_errors = false; @@ -811,7 +788,7 @@ bool Rebalancer::Runner::ScheduleNextMove(bool* has_errors, bool* timed_out) { return false; } -bool Rebalancer::Runner::UpdateMovesInProgressStatus( +bool Rebalancer::AlgoBasedRunner::UpdateMovesInProgressStatus( bool* has_errors, bool* timed_out) { DCHECK(has_errors); DCHECK(timed_out); @@ -848,7 +825,9 @@ bool Rebalancer::Runner::UpdateMovesInProgressStatus( // Erase the element and advance the iterator. it = scheduled_moves_.erase(it); continue; - } else if (is_complete) { + } + DCHECK(s.ok()); + if (is_complete) { // The move has completed (success or failure): update the stats on the // pending operations per server. ++moves_count_; @@ -856,7 +835,7 @@ bool Rebalancer::Runner::UpdateMovesInProgressStatus( UpdateOnMoveCompleted(it->second.ts_uuid_to); LOG(INFO) << Substitute("tablet $0: $1 -> $2 move completed: $3", tablet_id, src_ts_uuid, dst_ts_uuid, - s.ok() ? move_status.ToString() : s.ToString()); + move_status.ToString()); // Erase the element and advance the iterator. it = scheduled_moves_.erase(it); continue; @@ -869,7 +848,83 @@ bool Rebalancer::Runner::UpdateMovesInProgressStatus( return has_updates; } -bool Rebalancer::Runner::FindNextMove(size_t* op_idx) { +// Run one step of the rebalancer. Due to the inherent restrictions of the +// rebalancing engine, no more than one replica per tablet is moved during +// one step of the rebalancing. +Status Rebalancer::AlgoBasedRunner::GetNextMovesImpl( + vector<ReplicaMove>* replica_moves) { + ClusterRawInfo raw_info; + RETURN_NOT_OK(rebalancer_->GetClusterRawInfo(&raw_info)); + + // For simplicity, allow to run the rebalancing only when all tablet servers + // are in good shape. Otherwise, the rebalancing might interfere with the + // automatic re-replication or get unexpected errors while moving replicas. + for (const auto& s : raw_info.tserver_summaries) { + if (s.health != KsckServerHealth::HEALTHY) { + return Status::IllegalState( + Substitute("tablet server $0 ($1): unacceptable health status $2", + s.uuid, s.address, ServerHealthToString(s.health))); + } + } + + // The number of operations to output by the algorithm. Those will be + // translated into concrete tablet replica movement operations, the output of + // this method. + const size_t max_moves = max_moves_per_server_ * + raw_info.tserver_summaries.size() * 5; + + replica_moves->clear(); + vector<TableReplicaMove> moves; + ClusterInfo cluster_info; + RETURN_NOT_OK(rebalancer_->BuildClusterInfo( + raw_info, scheduled_moves_, &cluster_info)); + RETURN_NOT_OK(algorithm()->GetNextMoves(cluster_info, max_moves, &moves)); + if (moves.empty()) { + // No suitable moves were found: the cluster described by the 'cluster_info' + // is balanced, assuming the pending moves, if any, will succeed. + return Status::OK(); + } + unordered_set<string> tablets_in_move; + std::transform(scheduled_moves_.begin(), scheduled_moves_.end(), + inserter(tablets_in_move, tablets_in_move.begin()), + [](const MovesInProgress::value_type& elem) { + return elem.first; + }); + for (const auto& move : moves) { + vector<string> tablet_ids; + RETURN_NOT_OK(FindReplicas(move, raw_info, &tablet_ids)); + // Shuffle the set of the tablet identifiers: that's to achieve even spread + // of moves across tables with the same skew. + std::shuffle(tablet_ids.begin(), tablet_ids.end(), random_generator_); + string move_tablet_id; + for (const auto& tablet_id : tablet_ids) { + if (tablets_in_move.find(tablet_id) == tablets_in_move.end()) { + // For now, choose the very first tablet that does not have replicas + // in move. Later on, additional logic might be added to find + // the best candidate. + move_tablet_id = tablet_id; + break; + } + } + if (move_tablet_id.empty()) { + LOG(WARNING) << Substitute( + "table $0: could not find any suitable replica to move " + "from server $1 to server $2", move.table_id, move.from, move.to); + continue; + } + ReplicaMove info; + info.tablet_uuid = move_tablet_id; + info.ts_uuid_from = move.from; + info.ts_uuid_to = move.to; + replica_moves->emplace_back(std::move(info)); + // Mark the tablet as 'has a replica in move'. + tablets_in_move.emplace(move_tablet_id); + } + + return Status::OK(); +} + +bool Rebalancer::AlgoBasedRunner::FindNextMove(size_t* op_idx) { vector<size_t> op_indices; for (auto it = ts_per_op_count_.begin(); op_indices.empty() && it != ts_per_op_count_.end() && it->first < max_moves_per_server_; ++it) { @@ -915,7 +970,7 @@ bool Rebalancer::Runner::FindNextMove(size_t* op_idx) { return !op_indices.empty(); } -void Rebalancer::Runner::UpdateOnMoveScheduled( +void Rebalancer::AlgoBasedRunner::UpdateOnMoveScheduled( size_t idx, const string& tablet_uuid, const string& src_ts_uuid, @@ -925,13 +980,14 @@ void Rebalancer::Runner::UpdateOnMoveScheduled( Rebalancer::ReplicaMove move_info = { tablet_uuid, src_ts_uuid, dst_ts_uuid }; auto ins = scheduled_moves_.emplace(tablet_uuid, std::move(move_info)); // Only one replica of a tablet can be moved at a time. - DCHECK(ins.second); + // TODO(aserbin): clarify on duplicates + //DCHECK(ins.second); } UpdateOnMoveScheduledImpl(idx, src_ts_uuid, is_success, &src_op_indices_); UpdateOnMoveScheduledImpl(idx, dst_ts_uuid, is_success, &dst_op_indices_); } -void Rebalancer::Runner::UpdateOnMoveScheduledImpl( +void Rebalancer::AlgoBasedRunner::UpdateOnMoveScheduledImpl( size_t idx, const string& ts_uuid, bool is_success, @@ -959,19 +1015,11 @@ void Rebalancer::Runner::UpdateOnMoveScheduledImpl( } } -void Rebalancer::Runner::UpdateOnMoveCompleted(const string& ts_uuid) { - const auto op_count = op_count_per_ts_[ts_uuid]--; - const auto op_range = ts_per_op_count_.equal_range(op_count); - bool ts_per_op_count_updated = false; - for (auto it = op_range.first; it != op_range.second; ++it) { - if (it->second == ts_uuid) { - ts_per_op_count_.erase(it); - ts_per_op_count_.emplace(op_count - 1, ts_uuid); - ts_per_op_count_updated = true; - break; - } - } - DCHECK(ts_per_op_count_updated); +Rebalancer::TwoDimensionalGreedyRunner::TwoDimensionalGreedyRunner( + Rebalancer* rebalancer, + size_t max_moves_per_server, + boost::optional<MonoTime> deadline) + : AlgoBasedRunner(rebalancer, max_moves_per_server, std::move(deadline)) { } } // namespace tools http://git-wip-us.apache.org/repos/asf/kudu/blob/34bb7f93/src/kudu/tools/rebalancer.h ---------------------------------------------------------------------- diff --git a/src/kudu/tools/rebalancer.h b/src/kudu/tools/rebalancer.h index 2b13a92..c4f5824 100644 --- a/src/kudu/tools/rebalancer.h +++ b/src/kudu/tools/rebalancer.h @@ -131,26 +131,21 @@ class Rebalancer { // and track already scheduled ones. class Runner { public: - // The 'max_moves_per_server' specifies the maximum number of operations - // per tablet server (both the source and the destination are counted in). - // The 'deadline' specifies the deadline for the run, 'boost::none' - // if no timeout is set. - Runner(size_t max_moves_per_server, - const boost::optional<MonoTime>& deadline); + virtual ~Runner() = default; // Initialize instance of Runner so it can run against Kudu cluster with // the 'master_addresses' RPC endpoints. - Status Init(std::vector<std::string> master_addresses); + virtual Status Init(std::vector<std::string> master_addresses) = 0; // Load information on prescribed replica movement operations. Also, // populate helper containers and other auxiliary run-time structures // used by ScheduleNextMove(). This method is called with every batch // of move operations output by the rebalancing algorithm once previously // loaded moves have been scheduled. - void LoadMoves(std::vector<ReplicaMove> replica_moves); + virtual void LoadMoves(std::vector<ReplicaMove> replica_moves) = 0; // Schedule next replica move. - bool ScheduleNextMove(bool* has_errors, bool* timed_out); + virtual bool ScheduleNextMove(bool* has_errors, bool* timed_out) = 0; // Update statuses and auxiliary information on in-progress replica move // operations. The 'timed_out' parameter is set to 'true' if not all @@ -158,39 +153,45 @@ class Rebalancer { // the 'deadline_' member field. The method returns 'true' if it's necessary // to clear the state of the in-progress operations, i.e. 'forget' // those, starting from a clean state. - bool UpdateMovesInProgressStatus(bool* has_errors, bool* timed_out); + virtual bool UpdateMovesInProgressStatus(bool* has_errors, bool* timed_out) = 0; - uint32_t moves_count() const { - return moves_count_; - } + virtual Status GetNextMoves(bool* has_moves) = 0; - const MovesInProgress& scheduled_moves() const { - return scheduled_moves_; - } + virtual uint32_t moves_count() const = 0; + }; // class Runner - private: - // Given the data in the helper containers, find the index describing - // the next replica move and output it into the 'op_idx' parameter. - bool FindNextMove(size_t* op_idx); + // Common base for a few Runner implementations. + class BaseRunner : public Runner { + public: + BaseRunner(Rebalancer* rebalancer, + size_t max_moves_per_server, + boost::optional<MonoTime> deadline); - // Update the helper containers once a move operation has been scheduled. - void UpdateOnMoveScheduled(size_t idx, - const std::string& tablet_uuid, - const std::string& src_ts_uuid, - const std::string& dst_ts_uuid, - bool is_success); + Status Init(std::vector<std::string> master_addresses) override; - // Auxiliary method used by UpdateOnMoveScheduled() implementation. - void UpdateOnMoveScheduledImpl( - size_t idx, - const std::string& ts_uuid, - bool is_success, - std::unordered_map<std::string, std::set<size_t>>* op_indices); + Status GetNextMoves(bool* has_moves) override; + + uint32_t moves_count() const override { + return moves_count_; + } + + protected: + // Get next batch of replica moves from the rebalancing algorithm. + // Essentially, it runs ksck against the cluster and feeds the data into the + // rebalancing algorithm along with the information on currently pending + // replica movement operations. The information returned by the high-level + // rebalancing algorithm is translated into particular replica movement + // instructions, which are used to populate the 'replica_moves' parameter + // (the container is cleared first). + virtual Status GetNextMovesImpl(std::vector<ReplicaMove>* moves) = 0; // Update the helper containers once a scheduled operation is complete // (i.e. succeeded or failed). void UpdateOnMoveCompleted(const std::string& ts_uuid); + // A pointer to the Rebalancer object. + Rebalancer* rebalancer_; + // Maximum allowed number of move operations per server. For a move // operation, a source replica adds +1 at the source server and the target // replica adds +1 at the destination server. @@ -200,12 +201,71 @@ class Rebalancer { // ScheduleNextMoves() and UpadteMovesInProgressStatus() methods. const boost::optional<MonoTime> deadline_; + // Client object to make queries to Kudu masters for various auxiliary info + // while scheduling move operations and monitoring their status. + client::sp::shared_ptr<client::KuduClient> client_; + + // Information on scheduled replica movement operations; keys are + // tablet UUIDs, values are ReplicaMove structures. + MovesInProgress scheduled_moves_; + // Number of successfully completed replica moves operations. uint32_t moves_count_; // Kudu cluster RPC end-points. std::vector<std::string> master_addresses_; + // Mapping 'tserver UUID' --> 'scheduled move operations count'. + std::unordered_map<std::string, int32_t> op_count_per_ts_; + + // Mapping 'scheduled move operations count' --> 'tserver UUID'. That's + // just reversed 'op_count_per_ts_'. + std::multimap<int32_t, std::string> ts_per_op_count_; + }; // class BaseRunner + + // Runner that leverages RebalancingAlgo interface for rebalancing. + class AlgoBasedRunner : public BaseRunner { + public: + // The 'max_moves_per_server' specifies the maximum number of operations + // per tablet server (both the source and the destination are counted in). + // The 'deadline' specifies the deadline for the run, 'boost::none' + // if no timeout is set. + AlgoBasedRunner(Rebalancer* rebalancer, + size_t max_moves_per_server, + boost::optional<MonoTime> deadline); + + Status Init(std::vector<std::string> master_addresses) override; + + void LoadMoves(std::vector<ReplicaMove> replica_moves) override; + + bool ScheduleNextMove(bool* has_errors, bool* timed_out) override; + + bool UpdateMovesInProgressStatus(bool* has_errors, bool* timed_out) override; + + // Rebalancing algorithm that running uses to find replica moves. + virtual RebalancingAlgo* algorithm() = 0; + + protected: + Status GetNextMovesImpl(std::vector<ReplicaMove>* replica_moves) override; + + // Given the data in the helper containers, find the index describing + // the next replica move and output it into the 'op_idx' parameter. + bool FindNextMove(size_t* op_idx); + + // Update the helper containers once a move operation has been scheduled. + void UpdateOnMoveScheduled(size_t idx, + const std::string& tablet_uuid, + const std::string& src_ts_uuid, + const std::string& dst_ts_uuid, + bool is_success); + + // Auxiliary method used by UpdateOnMoveScheduled() implementation. + void UpdateOnMoveScheduledImpl( + size_t idx, + const std::string& ts_uuid, + bool is_success, + std::unordered_map<std::string, std::set<size_t>>* op_indices); + // The moves to schedule. std::vector<ReplicaMove> replica_moves_; @@ -217,23 +277,29 @@ class Rebalancer { // tserver UUID (i.e. the key) as the destination of the move operation'. std::unordered_map<std::string, std::set<size_t>> dst_op_indices_; - // Mapping 'tserver UUID' --> 'scheduled move operations count'. - std::unordered_map<std::string, int32_t> op_count_per_ts_; + // Random device and generator for selecting among multiple choices, when + // appropriate. + std::random_device random_device_; + std::mt19937 random_generator_; + }; // class AlgoBasedRunner - // Mapping 'scheduled move operations count' --> 'tserver UUID'. That's - // just reversed 'op_count_per_ts_'. Having count as key helps with finding - // servers with minimum number of scheduled operations while scheduling - // replica movement operations (it's necessary to preserve the - // 'maximum-moves-per-server' constraint while doing so). - std::multimap<int32_t, std::string> ts_per_op_count_; + class TwoDimensionalGreedyRunner : public AlgoBasedRunner { + public: + // The 'max_moves_per_server' specifies the maximum number of operations + // per tablet server (both the source and the destination are counted in). + // The 'deadline' specifies the deadline for the run, 'boost::none' + // if no timeout is set. + TwoDimensionalGreedyRunner(Rebalancer* rebalancer, + size_t max_moves_per_server, + boost::optional<MonoTime> deadline); - // Information on scheduled replica movement operations; keys are - // tablet UUIDs, values are ReplicaMove structures. - MovesInProgress scheduled_moves_; + RebalancingAlgo* algorithm() override { + return &algorithm_; + } - // Client object to make queries to Kudu masters for various auxiliary info - // while scheduling move operations and monitoring their status. - client::sp::shared_ptr<client::KuduClient> client_; + private: + // An instance of the balancing algorithm. + TwoDimensionalGreedyAlgo algorithm_; }; friend class KsckResultsToClusterBalanceInfoTest; @@ -246,6 +312,24 @@ class Rebalancer { const KsckResults& ksck_info, ClusterRawInfo* raw_info); + // Given high-level move-some-tablet-replica-for-a-table information from the + // rebalancing algorithm, find appropriate tablet replicas to move between the + // specified tablet servers. The set of result tablet UUIDs is output + // into the 'tablet_ids' container (note: the container is first cleared). + // The source and destination replicas are determined by the elements of the + // 'tablet_ids' container and tablet server UUIDs TableReplicaMove::from and + // TableReplica::to correspondingly. If no suitable tablet replicas are found, + // 'tablet_ids' will be empty with the result status of Status::OK(). + static Status FindReplicas(const TableReplicaMove& move, + const ClusterRawInfo& raw_info, + std::vector<std::string>* tablet_ids); + + // Filter move operations in 'replica_moves': remove all operations that would + // involve moving replicas of tablets which are in 'scheduled_moves'. The + // 'replica_moves' cannot be null. + static void FilterMoves(const MovesInProgress& scheduled_moves, + std::vector<ReplicaMove>* replica_moves); + // Convert the 'raw' information about the cluster into information suitable // for the input of the high-level rebalancing algorithm. // The 'moves_in_progress' parameter contains information on the replica moves @@ -260,37 +344,18 @@ class Rebalancer { const MovesInProgress& moves_in_progress, ClusterInfo* info) const; - // Get next batch of replica moves from the rebalancing algorithm. - // Essentially, it runs ksck against the cluster and feeds the data into the - // rebalancing algorithm along with the information on currently pending - // replica movement operations. The information returned by the high-level - // rebalancing algorithm is translated into particular replica movement - // instructions, which are used to populate the 'replica_moves' parameter - // (the container is cleared first). - // - // The 'moves_in_progress' parameter contains information on pending moves. - // The results are output into 'replica_moves', which will be empty - // if no next steps are needed to make the cluster balanced. - Status GetNextMoves(const MovesInProgress& moves_in_progress, - std::vector<ReplicaMove>* replica_moves); + // Run rebalancing using the specified runner. + Status RunWith(Runner* runner, RunStatus* result_status); - // Given information from the high-level rebalancing algorithm, find - // appropriate tablet replicas to move on the specified tablet servers. - // The set of result UUIDs is output into the 'tablet_ids' container (note: - // the output container is first cleared). If no suitable replicas are found, - // 'tablet_ids' will be empty with the result status of Status::OK(). - Status FindReplicas(const TableReplicaMove& move, - const KsckResults& ksck_info, - std::vector<std::string>* tablet_ids) const; + // Refresh the information on the cluster (involves running ksck). + Status GetClusterRawInfo(ClusterRawInfo* raw_info); + + Status GetNextMoves(Runner* runner, + std::vector<ReplicaMove>* replica_moves); // Reset ksck-related fields and run ksck against the cluster. Status RefreshKsckResults(); - // Filter out move operations at the tablets which already have operations - // in progress. The 'replica_moves' cannot be null. - void FilterMoves(const MovesInProgress& scheduled_moves, - std::vector<ReplicaMove>* replica_moves); - // Configuration for the rebalancer. const Config config_;