Use the same replica selection when adding a server as table creation When selecting TS to host replicas at table creation time, the master uses a selection algorithm that (roughly) picks the less loaded of two randomly chosen TS that haven't already been picked to hold a replica. When adding a replica, for instance after an eviction, it just picks a tserver at random from those not already holding a replica. Let's use the same algorithm for both, since the table creation algorithm has the benefit that it tends to fill less-loaded tablet servers, like ones newly added to the cluster.
Change-Id: I199e7a59c2c7832e7a87842b357ba3aa29e34685 Reviewed-on: http://gerrit.cloudera.org:8080/7143 Reviewed-by: Adar Dembo <[email protected]> Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7ac7762f Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7ac7762f Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7ac7762f Branch: refs/heads/master Commit: 7ac7762ff470ade1c6be3426e05d322d34dc3b2b Parents: e719b5e Author: Will Berkeley <[email protected]> Authored: Fri Jun 9 15:02:15 2017 -0700 Committer: Will Berkeley <[email protected]> Committed: Mon Jun 12 23:18:56 2017 +0000 ---------------------------------------------------------------------- src/kudu/master/catalog_manager.cc | 220 ++++++++++++++++---------------- src/kudu/master/catalog_manager.h | 11 -- 2 files changed, 108 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/7ac7762f/src/kudu/master/catalog_manager.cc ---------------------------------------------------------------------- diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc index 8e19553..73e0c24 100644 --- a/src/kudu/master/catalog_manager.cc +++ b/src/kudu/master/catalog_manager.cc @@ -225,6 +225,7 @@ using cfile::TypeEncodingInfo; using consensus::ConsensusServiceProxy; using consensus::ConsensusStatePB; using consensus::GetConsensusRole; +using consensus::IsRaftConfigMember; using consensus::RaftConsensus; using consensus::RaftPeerPB; using consensus::StartTabletCopyRequestPB; @@ -3173,6 +3174,92 @@ class AsyncDeleteReplica : public RetrySpecificTSRpcTask { tserver::DeleteTabletResponsePB resp_; }; +namespace { + +// Given exactly two choices in 'two_choices', pick the better tablet server on +// which to place a tablet replica. Ties are broken using 'rng'. +shared_ptr<TSDescriptor> PickBetterReplicaLocation(const TSDescriptorVector& two_choices, + ThreadSafeRandom* rng) { + DCHECK_EQ(two_choices.size(), 2); + + const auto& a = two_choices[0]; + const auto& b = two_choices[1]; + + // When creating replicas, we consider two aspects of load: + // (1) how many tablet replicas are already on the server, and + // (2) how often we've chosen this server recently. + // + // The first factor will attempt to put more replicas on servers that + // are under-loaded (eg because they have newly joined an existing cluster, or have + // been reformatted and re-joined). + // + // The second factor will ensure that we take into account the recent selection + // decisions even if those replicas are still in the process of being created (and thus + // not yet reported by the server). This is important because, while creating a table, + // we batch the selection process before sending any creation commands to the + // servers themselves. + // + // TODO(wdberkeley): in the future we may want to factor in other items such + // as available disk space, actual request load, etc. + double load_a = a->RecentReplicaCreations() + a->num_live_replicas(); + double load_b = b->RecentReplicaCreations() + b->num_live_replicas(); + if (load_a < load_b) { + return a; + } + if (load_b < load_a) { + return b; + } + // If the load is the same, we can just pick randomly. + return two_choices[rng->Uniform(2)]; +} + +// Given the tablet servers in 'ts_descs', use 'rng' to pick a tablet server to +// host a tablet replica, excluding tablet servers in 'excluded'. +// If there are no servers in 'ts_descs' that are not in 'excluded, return nullptr. +shared_ptr<TSDescriptor> SelectReplica(const TSDescriptorVector& ts_descs, + const set<shared_ptr<TSDescriptor>>& excluded, + ThreadSafeRandom* rng) { + // The replica selection algorithm follows the idea from + // "Power of Two Choices in Randomized Load Balancing"[1]. For each replica, + // we randomly select two tablet servers, and then assign the replica to the + // less-loaded one of the two. This has some nice properties: + // + // 1) because the initial selection of two servers is random, we get good + // spreading of replicas across the cluster. In contrast if we sorted by + // load and always picked under-loaded servers first, we'd end up causing + // all tablets of a new table to be placed on an empty server. This wouldn't + // give good load balancing of that table. + // + // 2) because we pick the less-loaded of two random choices, we do end up with a + // weighting towards filling up the underloaded one over time, without + // the extreme scenario above. + // + // 3) because we don't follow any sequential pattern, every server is equally + // likely to replicate its tablets to every other server. In contrast, a + // round-robin design would enforce that each server only replicates to its + // adjacent nodes in the TS sort order, limiting recovery bandwidth (see + // KUDU-1317). + // + // [1] http://www.eecs.harvard.edu/~michaelm/postscripts/mythesis.pdf + + // Pick two random servers, excluding those we've already picked. + // If we've only got one server left, 'two_choices' will actually + // just contain one element. + vector<shared_ptr<TSDescriptor>> two_choices; + rng->ReservoirSample(ts_descs, 2, excluded, &two_choices); + + if (two_choices.size() == 2) { + // Pick the better of the two. + return PickBetterReplicaLocation(two_choices, rng); + } + if (two_choices.size() == 1) { + return two_choices[0]; + } + return nullptr; +} + +} // anonymous namespace + // Send the "Alter Table" with the latest table schema to the leader replica // for the tablet. // Keeps retrying until we get an "ok" response. @@ -3252,39 +3339,18 @@ class AsyncAlterTable : public RetryingTSRpcTask { tserver::AlterSchemaResponsePB resp_; }; -namespace { - -// Select a random TS not in the 'exclude_uuids' list. -// Will not select tablet servers that have not heartbeated recently. -// Returns true iff it was possible to select a replica. -bool SelectRandomTSForReplica(const TSDescriptorVector& ts_descs, - const unordered_set<string>& exclude_uuids, - shared_ptr<TSDescriptor>* selection) { - TSDescriptorVector tablet_servers; - for (const shared_ptr<TSDescriptor>& ts : ts_descs) { - if (!ContainsKey(exclude_uuids, ts->permanent_uuid())) { - tablet_servers.push_back(ts); - } - } - if (tablet_servers.empty()) { - return false; - } - *selection = tablet_servers[rand() % tablet_servers.size()]; - return true; -} - -} // anonymous namespace - class AsyncAddServerTask : public RetryingTSRpcTask { public: AsyncAddServerTask(Master *master, const scoped_refptr<TabletInfo>& tablet, - const ConsensusStatePB& cstate) + const ConsensusStatePB& cstate, + ThreadSafeRandom* rng) : RetryingTSRpcTask(master, gscoped_ptr<TSPicker>(new PickLeaderReplica(tablet)), tablet->table()), tablet_(tablet), - cstate_(cstate) { + cstate_(cstate), + rng_(rng) { deadline_ = MonoTime::Max(); // Never time out. } @@ -3302,6 +3368,9 @@ class AsyncAddServerTask : public RetryingTSRpcTask { const scoped_refptr<TabletInfo> tablet_; const ConsensusStatePB cstate_; + // Used to make random choices in replica selection. + ThreadSafeRandom* rng_; + consensus::ChangeConfigRequestPB req_; consensus::ChangeConfigResponsePB resp_; }; @@ -3334,14 +3403,16 @@ bool AsyncAddServerTask::SendRequest(int attempt) { // Select the replica we wish to add to the config. // Do not include current members of the config. - unordered_set<string> replica_uuids; - for (const RaftPeerPB& peer : cstate_.committed_config().peers()) { - InsertOrDie(&replica_uuids, peer.permanent_uuid()); - } TSDescriptorVector ts_descs; master_->ts_manager()->GetAllLiveDescriptors(&ts_descs); - shared_ptr<TSDescriptor> replacement_replica; - if (PREDICT_FALSE(!SelectRandomTSForReplica(ts_descs, replica_uuids, &replacement_replica))) { + set<std::shared_ptr<TSDescriptor>> excluded; + for (const auto& ts_desc : ts_descs) { + if (IsRaftConfigMember(ts_desc->permanent_uuid(), cstate_.committed_config())) { + InsertOrDie(&excluded, ts_desc); + } + } + auto replacement_replica = SelectReplica(ts_descs, excluded, rng_); + if (PREDICT_FALSE(!replacement_replica)) { KLOG_EVERY_N(WARNING, 100) << LogPrefix() << "No candidate replacement replica found " << "for tablet " << tablet_->ToString(); return false; @@ -3467,7 +3538,7 @@ void CatalogManager::SendDeleteReplicaRequest( void CatalogManager::SendAddServerRequest(const scoped_refptr<TabletInfo>& tablet, const ConsensusStatePB& cstate) { - auto task = new AsyncAddServerTask(master_, tablet, cstate); + auto task = new AsyncAddServerTask(master_, tablet, cstate, &rng_); tablet->table()->AddTask(task); WARN_NOT_OK(task->Run(), Substitute("Failed to send AddServer request for tablet $0", tablet->tablet_id())); @@ -3824,85 +3895,6 @@ void CatalogManager::SendCreateTabletRequest(const scoped_refptr<TabletInfo>& ta } } -shared_ptr<TSDescriptor> CatalogManager::PickBetterReplicaLocation( - const TSDescriptorVector& two_choices) { - DCHECK_EQ(two_choices.size(), 2); - - const auto& a = two_choices[0]; - const auto& b = two_choices[1]; - - // When creating replicas, we consider two aspects of load: - // (1) how many tablet replicas are already on the server, and - // (2) how often we've chosen this server recently. - // - // The first factor will attempt to put more replicas on servers that - // are under-loaded (eg because they have newly joined an existing cluster, or have - // been reformatted and re-joined). - // - // The second factor will ensure that we take into account the recent selection - // decisions even if those replicas are still in the process of being created (and thus - // not yet reported by the server). This is important because, while creating a table, - // we batch the selection process before sending any creation commands to the - // servers themselves. - // - // TODO: in the future we may want to factor in other items such as available disk space, - // actual request load, etc. - double load_a = a->RecentReplicaCreations() + a->num_live_replicas(); - double load_b = b->RecentReplicaCreations() + b->num_live_replicas(); - if (load_a < load_b) { - return a; - } else if (load_b < load_a) { - return b; - } else { - // If the load is the same, we can just pick randomly. - return two_choices[rng_.Uniform(2)]; - } -} - -shared_ptr<TSDescriptor> CatalogManager::SelectReplica( - const TSDescriptorVector& ts_descs, - const set<shared_ptr<TSDescriptor>>& excluded) { - // The replica selection algorithm follows the idea from - // "Power of Two Choices in Randomized Load Balancing"[1]. For each replica, - // we randomly select two tablet servers, and then assign the replica to the - // less-loaded one of the two. This has some nice properties: - // - // 1) because the initial selection of two servers is random, we get good - // spreading of replicas across the cluster. In contrast if we sorted by - // load and always picked under-loaded servers first, we'd end up causing - // all tablets of a new table to be placed on an empty server. This wouldn't - // give good load balancing of that table. - // - // 2) because we pick the less-loaded of two random choices, we do end up with a - // weighting towards filling up the underloaded one over time, without - // the extreme scenario above. - // - // 3) because we don't follow any sequential pattern, every server is equally - // likely to replicate its tablets to every other server. In contrast, a - // round-robin design would enforce that each server only replicates to its - // adjacent nodes in the TS sort order, limiting recovery bandwidth (see - // KUDU-1317). - // - // [1] http://www.eecs.harvard.edu/~michaelm/postscripts/mythesis.pdf - - // Pick two random servers, excluding those we've already picked. - // If we've only got one server left, 'two_choices' will actually - // just contain one element. - vector<shared_ptr<TSDescriptor> > two_choices; - rng_.ReservoirSample(ts_descs, 2, excluded, &two_choices); - - if (two_choices.size() == 2) { - // Pick the better of the two. - return PickBetterReplicaLocation(two_choices); - } - - // If we couldn't randomly sample two servers, it's because we only had one - // more non-excluded choice left. - CHECK_EQ(1, ts_descs.size() - excluded.size()) - << "ts_descs: " << ts_descs.size() << " already_sel: " << excluded.size(); - return two_choices[0]; -} - void CatalogManager::SelectReplicas(const TSDescriptorVector& ts_descs, int nreplicas, consensus::RaftConfigPB *config) { @@ -3913,7 +3905,11 @@ void CatalogManager::SelectReplicas(const TSDescriptorVector& ts_descs, // put two replicas on the same host. set<shared_ptr<TSDescriptor> > already_selected; for (int i = 0; i < nreplicas; ++i) { - shared_ptr<TSDescriptor> ts = SelectReplica(ts_descs, already_selected); + shared_ptr<TSDescriptor> ts = SelectReplica(ts_descs, already_selected, &rng_); + // We must be able to find a tablet server for the replica because of + // checks before this function is called. + DCHECK(ts) << "ts_descs: " << ts_descs.size() + << " already_sel: " << already_selected.size(); InsertOrDie(&already_selected, ts); // Increment the number of pending replicas so that we take this selection into http://git-wip-us.apache.org/repos/asf/kudu/blob/7ac7762f/src/kudu/master/catalog_manager.h ---------------------------------------------------------------------- diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h index b69ea8e..72401b0 100644 --- a/src/kudu/master/catalog_manager.h +++ b/src/kudu/master/catalog_manager.h @@ -682,17 +682,6 @@ class CatalogManager : public tserver::TabletReplicaLookupIf { // Loops through the "not created" tablets and sends a CreateTablet() request. Status ProcessPendingAssignments(const std::vector<scoped_refptr<TabletInfo> >& tablets); - // Given 'two_choices', which should be a vector of exactly two elements, select which - // one is the better choice for a new replica. - std::shared_ptr<TSDescriptor> PickBetterReplicaLocation(const TSDescriptorVector& two_choices); - - // Select a tablet server from 'ts_descs' on which to place a new replica. - // Any tablet servers in 'excluded' are not considered. - // REQUIRES: 'ts_descs' must include at least one non-excluded server. - std::shared_ptr<TSDescriptor> SelectReplica( - const TSDescriptorVector& ts_descs, - const std::set<std::shared_ptr<TSDescriptor>>& excluded); - // Select N Replicas from online tablet servers (as specified by // 'ts_descs') for the specified tablet and populate the consensus configuration // object. If 'ts_descs' does not specify enough online tablet
