Use the same replica selection when adding a server as table creation

When selecting TS to host replicas at table creation time, the
master uses a selection algorithm that (roughly) picks the less
loaded of two randomly chosen TS that haven't already been
picked to hold a replica. When adding a replica, for instance
after an eviction, it just picks a tserver at random from those
not already holding a replica. Let's use the same algorithm for
both, since the table creation algorithm has the benefit that it
tends to fill less-loaded tablet servers, like ones newly added
to the cluster.

Change-Id: I199e7a59c2c7832e7a87842b357ba3aa29e34685
Reviewed-on: http://gerrit.cloudera.org:8080/7143
Reviewed-by: Adar Dembo <[email protected]>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7ac7762f
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7ac7762f
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7ac7762f

Branch: refs/heads/master
Commit: 7ac7762ff470ade1c6be3426e05d322d34dc3b2b
Parents: e719b5e
Author: Will Berkeley <[email protected]>
Authored: Fri Jun 9 15:02:15 2017 -0700
Committer: Will Berkeley <[email protected]>
Committed: Mon Jun 12 23:18:56 2017 +0000

----------------------------------------------------------------------
 src/kudu/master/catalog_manager.cc | 220 ++++++++++++++++----------------
 src/kudu/master/catalog_manager.h  |  11 --
 2 files changed, 108 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/7ac7762f/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc 
b/src/kudu/master/catalog_manager.cc
index 8e19553..73e0c24 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -225,6 +225,7 @@ using cfile::TypeEncodingInfo;
 using consensus::ConsensusServiceProxy;
 using consensus::ConsensusStatePB;
 using consensus::GetConsensusRole;
+using consensus::IsRaftConfigMember;
 using consensus::RaftConsensus;
 using consensus::RaftPeerPB;
 using consensus::StartTabletCopyRequestPB;
@@ -3173,6 +3174,92 @@ class AsyncDeleteReplica : public RetrySpecificTSRpcTask 
{
   tserver::DeleteTabletResponsePB resp_;
 };
 
+namespace {
+
+// Given exactly two choices in 'two_choices', pick the better tablet server on
+// which to place a tablet replica. Ties are broken using 'rng'.
+shared_ptr<TSDescriptor> PickBetterReplicaLocation(const TSDescriptorVector& 
two_choices,
+                                                   ThreadSafeRandom* rng) {
+  DCHECK_EQ(two_choices.size(), 2);
+
+  const auto& a = two_choices[0];
+  const auto& b = two_choices[1];
+
+  // When creating replicas, we consider two aspects of load:
+  //   (1) how many tablet replicas are already on the server, and
+  //   (2) how often we've chosen this server recently.
+  //
+  // The first factor will attempt to put more replicas on servers that
+  // are under-loaded (eg because they have newly joined an existing cluster, 
or have
+  // been reformatted and re-joined).
+  //
+  // The second factor will ensure that we take into account the recent 
selection
+  // decisions even if those replicas are still in the process of being 
created (and thus
+  // not yet reported by the server). This is important because, while 
creating a table,
+  // we batch the selection process before sending any creation commands to the
+  // servers themselves.
+  //
+  // TODO(wdberkeley): in the future we may want to factor in other items such
+  // as available disk space, actual request load, etc.
+  double load_a = a->RecentReplicaCreations() + a->num_live_replicas();
+  double load_b = b->RecentReplicaCreations() + b->num_live_replicas();
+  if (load_a < load_b) {
+    return a;
+  }
+  if (load_b < load_a) {
+    return b;
+  }
+  // If the load is the same, we can just pick randomly.
+  return two_choices[rng->Uniform(2)];
+}
+
+// Given the tablet servers in 'ts_descs', use 'rng' to pick a tablet server to
+// host a tablet replica, excluding tablet servers in 'excluded'.
+// If there are no servers in 'ts_descs' that are not in 'excluded, return 
nullptr.
+shared_ptr<TSDescriptor> SelectReplica(const TSDescriptorVector& ts_descs,
+                                       const set<shared_ptr<TSDescriptor>>& 
excluded,
+                                       ThreadSafeRandom* rng) {
+  // The replica selection algorithm follows the idea from
+  // "Power of Two Choices in Randomized Load Balancing"[1]. For each replica,
+  // we randomly select two tablet servers, and then assign the replica to the
+  // less-loaded one of the two. This has some nice properties:
+  //
+  // 1) because the initial selection of two servers is random, we get good
+  //    spreading of replicas across the cluster. In contrast if we sorted by
+  //    load and always picked under-loaded servers first, we'd end up causing
+  //    all tablets of a new table to be placed on an empty server. This 
wouldn't
+  //    give good load balancing of that table.
+  //
+  // 2) because we pick the less-loaded of two random choices, we do end up 
with a
+  //    weighting towards filling up the underloaded one over time, without
+  //    the extreme scenario above.
+  //
+  // 3) because we don't follow any sequential pattern, every server is equally
+  //    likely to replicate its tablets to every other server. In contrast, a
+  //    round-robin design would enforce that each server only replicates to 
its
+  //    adjacent nodes in the TS sort order, limiting recovery bandwidth (see
+  //    KUDU-1317).
+  //
+  // [1] http://www.eecs.harvard.edu/~michaelm/postscripts/mythesis.pdf
+
+  // Pick two random servers, excluding those we've already picked.
+  // If we've only got one server left, 'two_choices' will actually
+  // just contain one element.
+  vector<shared_ptr<TSDescriptor>> two_choices;
+  rng->ReservoirSample(ts_descs, 2, excluded, &two_choices);
+
+  if (two_choices.size() == 2) {
+    // Pick the better of the two.
+    return PickBetterReplicaLocation(two_choices, rng);
+  }
+  if (two_choices.size() == 1) {
+    return two_choices[0];
+  }
+  return nullptr;
+}
+
+} // anonymous namespace
+
 // Send the "Alter Table" with the latest table schema to the leader replica
 // for the tablet.
 // Keeps retrying until we get an "ok" response.
@@ -3252,39 +3339,18 @@ class AsyncAlterTable : public RetryingTSRpcTask {
   tserver::AlterSchemaResponsePB resp_;
 };
 
-namespace {
-
-// Select a random TS not in the 'exclude_uuids' list.
-// Will not select tablet servers that have not heartbeated recently.
-// Returns true iff it was possible to select a replica.
-bool SelectRandomTSForReplica(const TSDescriptorVector& ts_descs,
-                              const unordered_set<string>& exclude_uuids,
-                              shared_ptr<TSDescriptor>* selection) {
-  TSDescriptorVector tablet_servers;
-  for (const shared_ptr<TSDescriptor>& ts : ts_descs) {
-    if (!ContainsKey(exclude_uuids, ts->permanent_uuid())) {
-      tablet_servers.push_back(ts);
-    }
-  }
-  if (tablet_servers.empty()) {
-    return false;
-  }
-  *selection = tablet_servers[rand() % tablet_servers.size()];
-  return true;
-}
-
-} // anonymous namespace
-
 class AsyncAddServerTask : public RetryingTSRpcTask {
  public:
   AsyncAddServerTask(Master *master,
                      const scoped_refptr<TabletInfo>& tablet,
-                     const ConsensusStatePB& cstate)
+                     const ConsensusStatePB& cstate,
+                     ThreadSafeRandom* rng)
     : RetryingTSRpcTask(master,
                         gscoped_ptr<TSPicker>(new PickLeaderReplica(tablet)),
                         tablet->table()),
       tablet_(tablet),
-      cstate_(cstate) {
+      cstate_(cstate),
+      rng_(rng) {
     deadline_ = MonoTime::Max(); // Never time out.
   }
 
@@ -3302,6 +3368,9 @@ class AsyncAddServerTask : public RetryingTSRpcTask {
   const scoped_refptr<TabletInfo> tablet_;
   const ConsensusStatePB cstate_;
 
+  // Used to make random choices in replica selection.
+  ThreadSafeRandom* rng_;
+
   consensus::ChangeConfigRequestPB req_;
   consensus::ChangeConfigResponsePB resp_;
 };
@@ -3334,14 +3403,16 @@ bool AsyncAddServerTask::SendRequest(int attempt) {
 
   // Select the replica we wish to add to the config.
   // Do not include current members of the config.
-  unordered_set<string> replica_uuids;
-  for (const RaftPeerPB& peer : cstate_.committed_config().peers()) {
-    InsertOrDie(&replica_uuids, peer.permanent_uuid());
-  }
   TSDescriptorVector ts_descs;
   master_->ts_manager()->GetAllLiveDescriptors(&ts_descs);
-  shared_ptr<TSDescriptor> replacement_replica;
-  if (PREDICT_FALSE(!SelectRandomTSForReplica(ts_descs, replica_uuids, 
&replacement_replica))) {
+  set<std::shared_ptr<TSDescriptor>> excluded;
+  for (const auto& ts_desc : ts_descs) {
+    if (IsRaftConfigMember(ts_desc->permanent_uuid(), 
cstate_.committed_config())) {
+      InsertOrDie(&excluded, ts_desc);
+    }
+  }
+  auto replacement_replica = SelectReplica(ts_descs, excluded, rng_);
+  if (PREDICT_FALSE(!replacement_replica)) {
     KLOG_EVERY_N(WARNING, 100) << LogPrefix() << "No candidate replacement 
replica found "
                                << "for tablet " << tablet_->ToString();
     return false;
@@ -3467,7 +3538,7 @@ void CatalogManager::SendDeleteReplicaRequest(
 
 void CatalogManager::SendAddServerRequest(const scoped_refptr<TabletInfo>& 
tablet,
                                           const ConsensusStatePB& cstate) {
-  auto task = new AsyncAddServerTask(master_, tablet, cstate);
+  auto task = new AsyncAddServerTask(master_, tablet, cstate, &rng_);
   tablet->table()->AddTask(task);
   WARN_NOT_OK(task->Run(),
               Substitute("Failed to send AddServer request for tablet $0", 
tablet->tablet_id()));
@@ -3824,85 +3895,6 @@ void CatalogManager::SendCreateTabletRequest(const 
scoped_refptr<TabletInfo>& ta
   }
 }
 
-shared_ptr<TSDescriptor> CatalogManager::PickBetterReplicaLocation(
-    const TSDescriptorVector& two_choices) {
-  DCHECK_EQ(two_choices.size(), 2);
-
-  const auto& a = two_choices[0];
-  const auto& b = two_choices[1];
-
-  // When creating replicas, we consider two aspects of load:
-  //   (1) how many tablet replicas are already on the server, and
-  //   (2) how often we've chosen this server recently.
-  //
-  // The first factor will attempt to put more replicas on servers that
-  // are under-loaded (eg because they have newly joined an existing cluster, 
or have
-  // been reformatted and re-joined).
-  //
-  // The second factor will ensure that we take into account the recent 
selection
-  // decisions even if those replicas are still in the process of being 
created (and thus
-  // not yet reported by the server). This is important because, while 
creating a table,
-  // we batch the selection process before sending any creation commands to the
-  // servers themselves.
-  //
-  // TODO: in the future we may want to factor in other items such as 
available disk space,
-  // actual request load, etc.
-  double load_a = a->RecentReplicaCreations() + a->num_live_replicas();
-  double load_b = b->RecentReplicaCreations() + b->num_live_replicas();
-  if (load_a < load_b) {
-    return a;
-  } else if (load_b < load_a) {
-    return b;
-  } else {
-    // If the load is the same, we can just pick randomly.
-    return two_choices[rng_.Uniform(2)];
-  }
-}
-
-shared_ptr<TSDescriptor> CatalogManager::SelectReplica(
-    const TSDescriptorVector& ts_descs,
-    const set<shared_ptr<TSDescriptor>>& excluded) {
-  // The replica selection algorithm follows the idea from
-  // "Power of Two Choices in Randomized Load Balancing"[1]. For each replica,
-  // we randomly select two tablet servers, and then assign the replica to the
-  // less-loaded one of the two. This has some nice properties:
-  //
-  // 1) because the initial selection of two servers is random, we get good
-  //    spreading of replicas across the cluster. In contrast if we sorted by
-  //    load and always picked under-loaded servers first, we'd end up causing
-  //    all tablets of a new table to be placed on an empty server. This 
wouldn't
-  //    give good load balancing of that table.
-  //
-  // 2) because we pick the less-loaded of two random choices, we do end up 
with a
-  //    weighting towards filling up the underloaded one over time, without
-  //    the extreme scenario above.
-  //
-  // 3) because we don't follow any sequential pattern, every server is equally
-  //    likely to replicate its tablets to every other server. In contrast, a
-  //    round-robin design would enforce that each server only replicates to 
its
-  //    adjacent nodes in the TS sort order, limiting recovery bandwidth (see
-  //    KUDU-1317).
-  //
-  // [1] http://www.eecs.harvard.edu/~michaelm/postscripts/mythesis.pdf
-
-  // Pick two random servers, excluding those we've already picked.
-  // If we've only got one server left, 'two_choices' will actually
-  // just contain one element.
-  vector<shared_ptr<TSDescriptor> > two_choices;
-  rng_.ReservoirSample(ts_descs, 2, excluded, &two_choices);
-
-  if (two_choices.size() == 2) {
-    // Pick the better of the two.
-    return PickBetterReplicaLocation(two_choices);
-  }
-
-  // If we couldn't randomly sample two servers, it's because we only had one
-  // more non-excluded choice left.
-  CHECK_EQ(1, ts_descs.size() - excluded.size())
-      << "ts_descs: " << ts_descs.size() << " already_sel: " << 
excluded.size();
-  return two_choices[0];
-}
-
 void CatalogManager::SelectReplicas(const TSDescriptorVector& ts_descs,
                                     int nreplicas,
                                     consensus::RaftConfigPB *config) {
@@ -3913,7 +3905,11 @@ void CatalogManager::SelectReplicas(const 
TSDescriptorVector& ts_descs,
   // put two replicas on the same host.
   set<shared_ptr<TSDescriptor> > already_selected;
   for (int i = 0; i < nreplicas; ++i) {
-    shared_ptr<TSDescriptor> ts = SelectReplica(ts_descs, already_selected);
+    shared_ptr<TSDescriptor> ts = SelectReplica(ts_descs, already_selected, 
&rng_);
+    // We must be able to find a tablet server for the replica because of
+    // checks before this function is called.
+    DCHECK(ts) << "ts_descs: " << ts_descs.size()
+               << " already_sel: " << already_selected.size();
     InsertOrDie(&already_selected, ts);
 
     // Increment the number of pending replicas so that we take this selection 
into

http://git-wip-us.apache.org/repos/asf/kudu/blob/7ac7762f/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h 
b/src/kudu/master/catalog_manager.h
index b69ea8e..72401b0 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -682,17 +682,6 @@ class CatalogManager : public 
tserver::TabletReplicaLookupIf {
   // Loops through the "not created" tablets and sends a CreateTablet() 
request.
   Status ProcessPendingAssignments(const std::vector<scoped_refptr<TabletInfo> 
>& tablets);
 
-  // Given 'two_choices', which should be a vector of exactly two elements, 
select which
-  // one is the better choice for a new replica.
-  std::shared_ptr<TSDescriptor> PickBetterReplicaLocation(const 
TSDescriptorVector& two_choices);
-
-  // Select a tablet server from 'ts_descs' on which to place a new replica.
-  // Any tablet servers in 'excluded' are not considered.
-  // REQUIRES: 'ts_descs' must include at least one non-excluded server.
-  std::shared_ptr<TSDescriptor> SelectReplica(
-      const TSDescriptorVector& ts_descs,
-      const std::set<std::shared_ptr<TSDescriptor>>& excluded);
-
   // Select N Replicas from online tablet servers (as specified by
   // 'ts_descs') for the specified tablet and populate the consensus 
configuration
   // object. If 'ts_descs' does not specify enough online tablet

Reply via email to