This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch branch-1.17.x
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/branch-1.17.x by this push:
new ffddad8f4 KUDU-3476: Make replica placement range and table aware
ffddad8f4 is described below
commit ffddad8f49f33a40a34a877fd44f1bd6be64440a
Author: Mahesh Reddy <[email protected]>
AuthorDate: Tue May 2 13:33:40 2023 -0400
KUDU-3476: Make replica placement range and table aware
Previously, the replica selection policy randomly selected
two tablet servers and placed the replica on the tserver
with less replicas. This could lead to hotspotting if
placing replicas from the same range on the same set of
tservers since the policy doesn't discriminate by range.
With this patch, the replica selection policy now ranks
the available tservers by range and table load and places
the replica accordingly. It prioritizes replicas by range
first, replicas by table are used as a tiebreaker, then
total replicas is used as the final tiebreaker. The range
and table load is determined by the existing number of
replicas before the placement begins and the number of
pending replicas placed on the tserver while placing replicas.
The flag --enable_range_replica_placement on the master side
controls whether or not this new policy is used.
For this feature to work, both the range start key and the
table id of the table the range belongs to must be defined.
This is because multiple tables could have the same range
defined by the same range start key, so to
differentiate the ranges, the table id is required.
The link to the design doc is here:
https://docs.google.com/document/d/1r-p0GW8lj2iLA3VGvZWAem09ykCmR5jEe8npUhJ07G8/edit?usp=sharing
Change-Id: I9caeb8d5547e946bfeb152a99e1ec034c3fa0a0f
Reviewed-on: http://gerrit.cloudera.org:8080/19931
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
(cherry picked from commit 10fdaf6a93a4bd3289d162b5f8351c4f0f5928c8)
Reviewed-on: http://gerrit.cloudera.org:8080/20195
Reviewed-by: Yingchun Lai <[email protected]>
---
src/kudu/integration-tests/create-table-itest.cc | 104 ++++-
src/kudu/master/catalog_manager.cc | 52 ++-
src/kudu/master/master.proto | 18 +-
src/kudu/master/master_runner.cc | 6 +-
src/kudu/master/master_service.cc | 10 +
src/kudu/master/placement_policy-test.cc | 498 +++++++++++++++++++++--
src/kudu/master/placement_policy.cc | 165 ++++++--
src/kudu/master/placement_policy.h | 48 ++-
src/kudu/master/ts_descriptor.cc | 91 ++++-
src/kudu/master/ts_descriptor.h | 100 ++++-
src/kudu/tools/rebalancer_tool-test.cc | 15 +-
src/kudu/tserver/heartbeater.cc | 14 +
src/kudu/tserver/ts_tablet_manager.cc | 16 +
src/kudu/tserver/ts_tablet_manager.h | 16 +-
14 files changed, 1049 insertions(+), 104 deletions(-)
diff --git a/src/kudu/integration-tests/create-table-itest.cc
b/src/kudu/integration-tests/create-table-itest.cc
index b0969ae47..f13d41171 100644
--- a/src/kudu/integration-tests/create-table-itest.cc
+++ b/src/kudu/integration-tests/create-table-itest.cc
@@ -39,6 +39,7 @@
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
+#include "kudu/common/partition.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/gutil/mathlimits.h"
@@ -51,6 +52,9 @@
#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/mini-cluster/mini_cluster.h"
#include "kudu/rpc/rpc_controller.h"
+#include "kudu/tablet/tablet.pb.h"
+#include "kudu/tools/tool_test_util.h"
+#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/atomic.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
@@ -195,7 +199,10 @@ TEST_F(CreateTableITest,
TestCreateWhenMajorityOfReplicasFailCreation) {
TEST_F(CreateTableITest, TestSpreadReplicasEvenly) {
const int kNumServers = 10;
const int kNumTablets = 20;
- NO_FATALS(StartCluster({}, {}, kNumServers));
+ vector<string> master_flags = {
+ "--enable_range_replica_placement=false",
+ };
+ NO_FATALS(StartCluster({}, master_flags, kNumServers));
unique_ptr<client::KuduTableCreator>
table_creator(client_->NewTableCreator());
auto client_schema = KuduSchema::FromSchema(GetSimpleTestSchema());
@@ -395,6 +402,99 @@ TEST_F(CreateTableITest,
TestSpreadReplicasEvenlyWithDimension) {
}
}
+// Tests the range aware replica placement by adding multiple tables with
multiple ranges
+// and checking the replica distribution.
+TEST_F(CreateTableITest, TestSpreadReplicas) {
+ const int kNumServers = 5;
+ const int kNumReplicas = 3;
+ NO_FATALS(StartCluster({ }, { }, kNumServers));
+
+ Schema schema = Schema({ ColumnSchema("key1", INT32),
+ ColumnSchema("key2", INT32),
+ ColumnSchema("int_val", INT32),
+ ColumnSchema("string_val", STRING, true) }, 2);
+ auto client_schema = KuduSchema::FromSchema(schema);
+
+ auto create_table_func = [](KuduClient* client,
+ KuduSchema* client_schema,
+ const string& table_name,
+ const vector<std::pair<int32_t, int32_t>>
range_bounds,
+ const int num_buckets) {
+ unique_ptr<client::KuduTableCreator>
table_creator(client->NewTableCreator());
+ table_creator->table_name(table_name)
+ .schema(client_schema)
+ .add_hash_partitions({ "key1" }, num_buckets)
+ .set_range_partition_columns({ "key2" })
+ .num_replicas(kNumReplicas);
+ for (const auto& range_bound : range_bounds) {
+ unique_ptr<KuduPartialRow> lower_bound(client_schema->NewRow());
+ RETURN_NOT_OK(lower_bound->SetInt32("key2", range_bound.first));
+ unique_ptr<KuduPartialRow> upper_bound(client_schema->NewRow());
+ RETURN_NOT_OK(upper_bound->SetInt32("key2", range_bound.second));
+ table_creator->add_range_partition(lower_bound.release(),
upper_bound.release());
+ }
+ return table_creator->Create();
+ };
+
+ vector<string> tables = {"table1", "table2", "table3", "table4"};
+ vector<std::pair<int32_t, int32_t>> range_bounds =
+ { {0, 100}, {100, 200}, {200, 300}, {300, 400}};
+ const int doubleNumBuckets = 10;
+ const int numBuckets = 5;
+ for (const auto& table : tables) {
+ if (table == "table1") {
+ ASSERT_OK(create_table_func(
+ client_.get(), &client_schema, table, range_bounds,
doubleNumBuckets));
+ } else {
+ ASSERT_OK(create_table_func(
+ client_.get(), &client_schema, table, range_bounds, numBuckets));
+ }
+ }
+
+ // Stats of number of replicas per range per table per tserver.
+ typedef std::unordered_map<string, std::unordered_map<string, int>>
replicas_per_range_per_table;
+ std::unordered_map<int, replicas_per_range_per_table> stats;
+ for (int ts_idx = 0; ts_idx < kNumServers; ts_idx++) {
+ rpc::RpcController rpc;
+ tserver::ListTabletsRequestPB req;
+ tserver::ListTabletsResponsePB resp;
+ cluster_->tserver_proxy(ts_idx)->ListTablets(req, &resp, &rpc);
+ for (auto i = 0; i < resp.status_and_schema_size(); ++i) {
+ auto tablet_status = resp.status_and_schema(i).tablet_status();
+ if (tablet_status.has_partition()) {
+ Partition partition;
+ Partition::FromPB(tablet_status.partition(), &partition);
+ auto range_start_key = partition.begin().range_key();
+ auto table_name = tablet_status.table_name();
+ ++stats[ts_idx][table_name][range_start_key];
+ }
+ }
+ }
+
+ ASSERT_EQ(kNumServers, stats.size());
+ for (const auto& stat : stats) {
+ int tserver_replicas = 0;
+ // Verifies that four tables exist on each tserver.
+ ASSERT_EQ(tables.size(), stat.second.size());
+ for (const auto& table : stat.second) {
+ // Verifies that the four ranges exist for each table on each tserver.
+ ASSERT_EQ(range_bounds.size(), table.second.size());
+ for (const auto& ranges : table.second) {
+ // Since there are ten buckets instead of five for table "table1",
+ // we expect twice as many replicas (6 instead of 3).
+ if (table.first == "table1") {
+ ASSERT_EQ(doubleNumBuckets * kNumReplicas / kNumServers,
ranges.second);;
+ } else {
+ ASSERT_EQ(numBuckets * kNumReplicas / kNumServers, ranges.second);
+ }
+ tserver_replicas += ranges.second;
+ }
+ }
+ // Verifies that 60 replicas are placed on each tserver, 300 total across
5 tservers.
+ ASSERT_EQ(60, tserver_replicas);
+ }
+}
+
static void LookUpRandomKeysLoop(const
std::shared_ptr<master::MasterServiceProxy>& master,
const char* table_name,
AtomicBool* quit) {
@@ -468,7 +568,7 @@ TEST_F(CreateTableITest, TestCreateTableWithDeadTServers) {
unique_ptr<client::KuduTableCreator>
table_creator(client_->NewTableCreator());
// Don't bother waiting for table creation to finish; it'll never happen
- // because all of the tservers are dead.
+ // because all the tservers are dead.
CHECK_OK(table_creator->table_name(kTableName)
.schema(&client_schema)
.set_range_partition_columns({ "key" })
diff --git a/src/kudu/master/catalog_manager.cc
b/src/kudu/master/catalog_manager.cc
index c2ff76a92..4e4b59396 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -228,7 +228,7 @@ TAG_FLAG(allow_unsafe_replication_factor, runtime);
DEFINE_int32(catalog_manager_bg_task_wait_ms, 1000,
"Amount of time the catalog manager background task thread waits "
- "between runs");
+ "between runs.");
TAG_FLAG(catalog_manager_bg_task_wait_ms, hidden);
DEFINE_int32(max_create_tablets_per_ts, 60,
@@ -357,11 +357,11 @@ TAG_FLAG(auto_leader_rebalancing_enabled, runtime);
DEFINE_uint32(table_locations_cache_capacity_mb, 0,
"Capacity for the table locations cache (in MiB); a value "
- "of 0 means table locations are not be cached");
+ "of 0 means table locations are not be cached.");
TAG_FLAG(table_locations_cache_capacity_mb, advanced);
DEFINE_bool(enable_per_range_hash_schemas, true,
- "Whether to support range-specific hash schemas for tables");
+ "Whether to support range-specific hash schemas for tables.");
TAG_FLAG(enable_per_range_hash_schemas, advanced);
TAG_FLAG(enable_per_range_hash_schemas, runtime);
@@ -372,6 +372,10 @@ DEFINE_bool(enable_table_write_limit, false,
TAG_FLAG(enable_table_write_limit, experimental);
TAG_FLAG(enable_table_write_limit, runtime);
+DEFINE_bool(enable_range_replica_placement, true,
+ "Whether to use range aware replica placement for newly created
tablets.");
+TAG_FLAG(enable_range_replica_placement, runtime);
+
DEFINE_int64(table_disk_size_limit, -1,
"Set the target size in bytes of a table to write. "
"This is a system wide configuration for every newly "
@@ -385,7 +389,7 @@ DEFINE_int64(table_row_count_limit, -1,
TAG_FLAG(table_row_count_limit, experimental);
DEFINE_double(table_write_limit_ratio, 0.95,
- "Set the ratio of how much write limit can be reached");
+ "Set the ratio of how much write limit can be reached.");
TAG_FLAG(table_write_limit_ratio, experimental);
DEFINE_bool(enable_metadata_cleanup_for_deleted_tables_and_tablets, false,
@@ -410,7 +414,7 @@ TAG_FLAG(enable_chunked_tablet_writes, runtime);
DEFINE_bool(require_new_spec_for_custom_hash_schema_range_bound, false,
"Whether to require the client to use newer signature to specify "
"range bounds when working with a table having custom hash schema "
- "per range");
+ "per range.");
TAG_FLAG(require_new_spec_for_custom_hash_schema_range_bound, experimental);
TAG_FLAG(require_new_spec_for_custom_hash_schema_range_bound, runtime);
@@ -4973,16 +4977,29 @@ bool AsyncAddReplicaTask::SendRequest(int attempt) {
TSDescriptorVector ts_descs;
master_->ts_manager()->GetDescriptorsAvailableForPlacement(&ts_descs);
- // Get the dimension of the tablet. Otherwise, it will be nullopt.
+ // Get the dimension, table id, and range start key of the tablet.
optional<string> dimension = nullopt;
+ optional<string> table_id = nullopt;
+ optional<string> range_key_start = nullopt;
{
TabletMetadataLock l(tablet_.get(), LockMode::READ);
if (tablet_->metadata().state().pb.has_dimension_label()) {
dimension = tablet_->metadata().state().pb.dimension_label();
}
+ if (FLAGS_enable_range_replica_placement) {
+ Partition partition;
+ if (tablet_->metadata().state().pb.has_partition()) {
+ const auto& tablet_partition =
tablet_->metadata().state().pb.partition();
+ Partition::FromPB(tablet_partition, &partition);
+ }
+ range_key_start = partition.begin().range_key();
+ VLOG(1) << Substitute("range_key_start is set to $1",
range_key_start.value());
+ table_id = tablet_->metadata().state().pb.table_id();
+ VLOG(1) << Substitute("table_id is set to $1", table_id.value());
+ }
}
- // Some of the tablet servers hosting the current members of the config
+ // Some tablet servers hosting the current members of the config
// (see the 'existing' populated above) might be presumably dead.
// Inclusion of a presumably dead tablet server into 'existing' is OK:
// PlacementPolicy::PlaceExtraTabletReplica() does not require elements of
@@ -4991,7 +5008,8 @@ bool AsyncAddReplicaTask::SendRequest(int attempt) {
// to host the extra replica is 'ts_descs' after blacklisting all elements
// common with 'existing'.
PlacementPolicy policy(std::move(ts_descs), rng_);
- s = policy.PlaceExtraTabletReplica(std::move(existing), dimension,
&extra_replica);
+ s = policy.PlaceExtraTabletReplica(
+ std::move(existing), dimension, range_key_start, table_id,
&extra_replica);
}
if (PREDICT_FALSE(!s.ok())) {
auto msg = Substitute("no extra replica candidate found for tablet $0: $1",
@@ -6009,15 +6027,29 @@ Status CatalogManager::SelectReplicasForTablet(const
PlacementPolicy& policy,
config->set_obsolete_local(nreplicas == 1);
config->set_opid_index(consensus::kInvalidOpIdIndex);
- // Get the dimension of the tablet. Otherwise, it will be nullopt.
+ // Get the dimension, table id, and range start key of the tablet.
optional<string> dimension = nullopt;
+ optional<string> table_id = nullopt;
+ optional<string> range_key_start = nullopt;
if (tablet->metadata().state().pb.has_dimension_label()) {
dimension = tablet->metadata().state().pb.dimension_label();
}
+ if (FLAGS_enable_range_replica_placement) {
+ Partition partition;
+ if (tablet->metadata().state().pb.has_partition()) {
+ const auto& tablet_partition = tablet->metadata().state().pb.partition();
+ Partition::FromPB(tablet_partition, &partition);
+ }
+ range_key_start = partition.begin().range_key();
+ VLOG(1) << Substitute("range_key_start is set to $1",
range_key_start.value());
+ table_id = tablet->metadata().state().pb.table_id();
+ VLOG(1) << Substitute("table_id is set to $1", table_id.value());
+ }
// Select the set of replicas for the tablet.
TSDescriptorVector descriptors;
- RETURN_NOT_OK_PREPEND(policy.PlaceTabletReplicas(nreplicas, dimension,
&descriptors),
+ RETURN_NOT_OK_PREPEND(policy.PlaceTabletReplicas(
+ nreplicas, dimension, range_key_start, table_id, &descriptors),
Substitute("failed to place replicas for tablet $0 "
"(table '$1')",
tablet->id(), table_guard.data().name()));
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index 0493be6e2..39150ccfe 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -335,6 +335,17 @@ message TabletReportUpdatesPB {
repeated ReportedTabletUpdatesPB tablets = 1;
}
+// The number of tablets that are BOOTSTRAPPING or RUNNING per range.
+message TabletsByRangePB {
+ optional bytes range_start_key = 1;
+ optional int32 tablets = 2;
+}
+
+// The number of tablets that are BOOTSTRAPPING or RUNNING per each range for
each table.
+message TabletsByRangePerTablePB {
+ repeated TabletsByRangePB num_live_tablets_by_range = 1;
+}
+
// Heartbeat sent from the tablet-server to the master
// to establish liveness and report back any status changes.
message TSHeartbeatRequestPB {
@@ -370,9 +381,14 @@ message TSHeartbeatRequestPB {
optional consensus.ReplicaManagementInfoPB replica_management_info = 7;
// The number of tablets that are BOOTSTRAPPING or RUNNING in each dimension.
- // Used by the master to determine load when creating new tablet replicas
+ // Used by the master to determine load when placing new tablet replicas
// based on dimension.
map<string, int32> num_live_tablets_by_dimension = 8;
+
+ // Per table, the number of tablets that are BOOTSTRAPPING or RUNNING
+ // in each range. Used by the master to determine load when placing
+ // new tablet replicas based on range and table.
+ map<string, TabletsByRangePerTablePB> num_live_tablets_by_range_per_table =
9;
}
message TSHeartbeatResponsePB {
diff --git a/src/kudu/master/master_runner.cc b/src/kudu/master/master_runner.cc
index 851531257..60a7cc68b 100644
--- a/src/kudu/master/master_runner.cc
+++ b/src/kudu/master/master_runner.cc
@@ -348,9 +348,9 @@ void SetMasterFlagDefaults() {
to_string(kDefaultRpcServiceQueueLength).c_str(),
SET_FLAGS_DEFAULT));
// Master always reads the latest data snapshot from the system catalog and
- // never uses any specific timestatmp in past for a read snapshot. With that,
- // here isn't much sense to keep long chain of UNDO deltas in addition to the
- // latest version in the MVCC. Keeping short history of deltas frees CPU
+ // never uses any specific past timestamp for a read snapshot. With that,
+ // it doesn't make sense to keep a long chain of UNDO deltas in addition to
the
+ // latest version in the MVCC. Keeping a short history of deltas frees CPU
// cycles, memory, and IO bandwidth that otherwise would be consumed by
// background maintenance jobs running compactions. In addition, less disk
// space is consumed to store the system tablet's data.
diff --git a/src/kudu/master/master_service.cc
b/src/kudu/master/master_service.cc
index f5944aa3b..76026e77a 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -451,11 +451,21 @@ void MasterServiceImpl::TSHeartbeat(const
TSHeartbeatRequestPB* req,
}
// 4. Update tserver soft state based on the heartbeat contents.
+ // TODO(mreddy) If --enable_range_replica_placement is set to false, don't
populate ranges map.
ts_desc->UpdateHeartbeatTime();
ts_desc->set_num_live_replicas(req->num_live_tablets());
ts_desc->set_num_live_replicas_by_dimension(
TabletNumByDimensionMap(req->num_live_tablets_by_dimension().begin(),
req->num_live_tablets_by_dimension().end()));
+ for (auto it = req->num_live_tablets_by_range_per_table().begin();
+ it != req->num_live_tablets_by_range_per_table().end(); ++it) {
+ TabletNumByRangeMap ranges;
+ for (auto range = it->second.num_live_tablets_by_range().begin();
+ range != it->second.num_live_tablets_by_range().end(); ++range) {
+ ranges[range->range_start_key()] = range->tablets();
+ }
+ ts_desc->set_num_live_replicas_by_range_per_table(it->first, ranges);
+ }
// 5. Only leaders handle tablet reports.
if (is_leader_master && req->has_tablet_report()) {
diff --git a/src/kudu/master/placement_policy-test.cc
b/src/kudu/master/placement_policy-test.cc
index 40bcc6f3e..84449ca7a 100644
--- a/src/kudu/master/placement_policy-test.cc
+++ b/src/kudu/master/placement_policy-test.cc
@@ -17,6 +17,7 @@
#include "kudu/master/placement_policy.h"
+#include <algorithm>
#include <cmath>
#include <cstddef>
#include <initializer_list>
@@ -25,6 +26,8 @@
#include <optional>
#include <set>
#include <string>
+#include <type_traits>
+#include <unordered_map>
#include <utility>
#include <vector>
@@ -62,6 +65,8 @@ class PlacementPolicyTest : public ::testing::Test {
const size_t replica_num;
// number of tablet replicas in each dimension
TabletNumByDimensionMap replica_num_by_dimension;
+ // number of tablet replicas in each table and all its ranges.
+ TabletNumByRangePerTableMap replica_num_by_range_and_table;
};
struct LocationInfo {
@@ -82,7 +87,7 @@ class PlacementPolicyTest : public ::testing::Test {
// Get tablet server descriptors for the specified tablet server UUIDs.
TSDescriptorVector GetDescriptors(const vector<string>& uuids) const {
TSDescriptorVector result;
- // O(n^2) is not the best way to do this, but it's OK the test purposes.
+ // O(n^2) is not the best way to do this, but it's OK for test purposes.
for (const auto& uuid : uuids) {
for (const auto& desc : descriptors_) {
if (uuid == desc->permanent_uuid()) {
@@ -107,6 +112,9 @@ class PlacementPolicyTest : public ::testing::Test {
tsd->set_num_live_replicas(ts.replica_num);
tsd->location_.emplace(location_info.id);
tsd->set_num_live_replicas_by_dimension(ts.replica_num_by_dimension);
+ for (const auto& [table, range_map] :
ts.replica_num_by_range_and_table) {
+ tsd->set_num_live_replicas_by_range_per_table(table, range_map);
+ }
ts_descriptors.emplace_back(std::move(tsd));
}
}
@@ -277,7 +285,7 @@ TEST_F(PlacementPolicyTest, PlaceExtraTabletReplicaNoLoc) {
TSDescriptorVector existing(all.begin(), all.end());
existing.pop_back();
shared_ptr<TSDescriptor> extra_ts;
- ASSERT_OK(policy.PlaceExtraTabletReplica(existing, label, &extra_ts));
+ ASSERT_OK(policy.PlaceExtraTabletReplica(existing, label, nullopt,
nullopt, &extra_ts));
ASSERT_TRUE(extra_ts);
ASSERT_EQ("ts2", extra_ts->permanent_uuid());
}
@@ -287,7 +295,7 @@ TEST_F(PlacementPolicyTest, PlaceExtraTabletReplicaNoLoc) {
existing.pop_back();
existing.pop_back();
shared_ptr<TSDescriptor> extra_ts;
- ASSERT_OK(policy.PlaceExtraTabletReplica(existing, label, &extra_ts));
+ ASSERT_OK(policy.PlaceExtraTabletReplica(existing, label, nullopt,
nullopt, &extra_ts));
ASSERT_TRUE(extra_ts);
ASSERT_EQ("ts2", extra_ts->permanent_uuid());
}
@@ -295,7 +303,7 @@ TEST_F(PlacementPolicyTest, PlaceExtraTabletReplicaNoLoc) {
{
TSDescriptorVector existing(all.begin(), all.end());
shared_ptr<TSDescriptor> extra_ts;
- const auto s = policy.PlaceExtraTabletReplica(existing, label,
&extra_ts);
+ const auto s = policy.PlaceExtraTabletReplica(existing, label, nullopt,
nullopt, &extra_ts);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
ASSERT_FALSE(extra_ts);
ASSERT_STR_CONTAINS(s.ToString(),
@@ -326,7 +334,7 @@ TEST_F(PlacementPolicyTest, PlaceTabletReplicasNoLoc) {
// Ask just for a single replica.
{
TSDescriptorVector result;
- ASSERT_OK(policy.PlaceTabletReplicas(1, label, &result));
+ ASSERT_OK(policy.PlaceTabletReplicas(1, label, nullopt, nullopt,
&result));
ASSERT_EQ(1, result.size());
TSDescriptorsMap m;
ASSERT_OK(TSDescriptorVectorToMap(result, &m));
@@ -339,7 +347,7 @@ TEST_F(PlacementPolicyTest, PlaceTabletReplicasNoLoc) {
// Ask for number of replicas equal to the number of available tablet
servers.
{
TSDescriptorVector result;
- ASSERT_OK(policy.PlaceTabletReplicas(3, label, &result));
+ ASSERT_OK(policy.PlaceTabletReplicas(3, label, nullopt, nullopt,
&result));
ASSERT_EQ(3, result.size());
TSDescriptorsMap m;
ASSERT_OK(TSDescriptorVectorToMap(result, &m));
@@ -351,7 +359,7 @@ TEST_F(PlacementPolicyTest, PlaceTabletReplicasNoLoc) {
// Try to ask for too many replicas when too few tablet servers are
available.
{
TSDescriptorVector result;
- auto s = policy.PlaceTabletReplicas(4, label, &result);
+ auto s = policy.PlaceTabletReplicas(4, label, nullopt, nullopt, &result);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
"could not find next location after placing "
@@ -375,7 +383,7 @@ TEST_F(PlacementPolicyTest, PlaceTabletReplicas) {
// Ask for number of replicas equal to the number of available locations.
{
TSDescriptorVector result;
- ASSERT_OK(policy.PlaceTabletReplicas(3, nullopt, &result));
+ ASSERT_OK(policy.PlaceTabletReplicas(3, nullopt, nullopt, nullopt,
&result));
ASSERT_EQ(3, result.size());
TSDescriptorsMap m;
ASSERT_OK(TSDescriptorVectorToMap(result, &m));
@@ -390,7 +398,7 @@ TEST_F(PlacementPolicyTest, PlaceTabletReplicas) {
// enough locations to spread the replicas.
{
TSDescriptorVector result;
- ASSERT_OK(policy.PlaceTabletReplicas(5, nullopt, &result));
+ ASSERT_OK(policy.PlaceTabletReplicas(5, nullopt, nullopt, nullopt,
&result));
ASSERT_EQ(5, result.size());
TSDescriptorsMap m;
ASSERT_OK(TSDescriptorVectorToMap(result, &m));
@@ -404,7 +412,7 @@ TEST_F(PlacementPolicyTest, PlaceTabletReplicas) {
// Ask for number of replicas greater than the number of tablet servers.
{
TSDescriptorVector result;
- auto s = policy.PlaceTabletReplicas(8, nullopt, &result);
+ auto s = policy.PlaceTabletReplicas(8, nullopt, nullopt, nullopt, &result);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
"could not find next location after placing "
@@ -429,7 +437,7 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasOneTSPerLocation) {
// Ask for number of replicas equal to the number of available locations.
{
TSDescriptorVector result;
- ASSERT_OK(policy.PlaceTabletReplicas(3, nullopt, &result));
+ ASSERT_OK(policy.PlaceTabletReplicas(3, nullopt, nullopt, nullopt,
&result));
ASSERT_EQ(3, result.size());
TSDescriptorsMap m;
ASSERT_OK(TSDescriptorVectorToMap(result, &m));
@@ -443,7 +451,7 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasOneTSPerLocation) {
// Ask for number of replicas equal to the number of available locations.
{
TSDescriptorVector result;
- ASSERT_OK(policy.PlaceTabletReplicas(5, nullopt, &result));
+ ASSERT_OK(policy.PlaceTabletReplicas(5, nullopt, nullopt, nullopt,
&result));
ASSERT_EQ(5, result.size());
TSDescriptorsMap m;
ASSERT_OK(TSDescriptorVectorToMap(result, &m));
@@ -457,7 +465,7 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasOneTSPerLocation) {
// Ask for number of replicas greater than the number of tablet servers.
{
TSDescriptorVector result;
- auto s = policy.PlaceTabletReplicas(6, nullopt, &result);
+ auto s = policy.PlaceTabletReplicas(6, nullopt, nullopt, nullopt, &result);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
"could not find next location after placing "
@@ -482,7 +490,7 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasBalancingLocations) {
// Make sure no location contains the majority of replicas.
{
TSDescriptorVector result;
- ASSERT_OK(policy.PlaceTabletReplicas(3, nullopt, &result));
+ ASSERT_OK(policy.PlaceTabletReplicas(3, nullopt, nullopt, nullopt,
&result));
ASSERT_EQ(3, result.size());
TSDescriptorsMap m;
ASSERT_OK(TSDescriptorVectorToMap(result, &m));
@@ -498,7 +506,7 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasBalancingLocations) {
// Current location selection algorithm loads the locations evenly.
{
TSDescriptorVector result;
- ASSERT_OK(policy.PlaceTabletReplicas(5, nullopt, &result));
+ ASSERT_OK(policy.PlaceTabletReplicas(5, nullopt, nullopt, nullopt,
&result));
ASSERT_EQ(5, result.size());
TSDescriptorsMap m;
ASSERT_OK(TSDescriptorVectorToMap(result, &m));
@@ -513,7 +521,7 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasBalancingLocations) {
// servers in the cluster.
{
TSDescriptorVector result;
- ASSERT_OK(policy.PlaceTabletReplicas(7, nullopt, &result));
+ ASSERT_OK(policy.PlaceTabletReplicas(7, nullopt, nullopt, nullopt,
&result));
ASSERT_EQ(7, result.size());
TSDescriptorsMap m;
ASSERT_OK(TSDescriptorVectorToMap(result, &m));
@@ -546,7 +554,7 @@ TEST_F(PlacementPolicyTest,
PlaceExtraTabletReplicaViolatedPolicy) {
// in the same location.
const auto existing = GetDescriptors({ "A_ts0", "A_ts1", "A_ts2", });
shared_ptr<TSDescriptor> extra_ts;
- ASSERT_OK(policy.PlaceExtraTabletReplica(existing, nullopt, &extra_ts));
+ ASSERT_OK(policy.PlaceExtraTabletReplica(existing, nullopt, nullopt,
nullopt, &extra_ts));
ASSERT_TRUE(extra_ts);
// Within location a replica is placed by the 'power of 2' algorithm.
ASSERT_TRUE(extra_ts->permanent_uuid() == "C_ts0" ||
@@ -561,7 +569,7 @@ TEST_F(PlacementPolicyTest,
PlaceExtraTabletReplicaViolatedPolicy) {
// constraint would be violated.
const auto existing = GetDescriptors({ "A_ts0", "A_ts1", "C_ts1", "C_ts2",
});
shared_ptr<TSDescriptor> extra_ts;
- ASSERT_OK(policy.PlaceExtraTabletReplica(existing, nullopt, &extra_ts));
+ ASSERT_OK(policy.PlaceExtraTabletReplica(existing, nullopt, nullopt,
nullopt, &extra_ts));
ASSERT_TRUE(extra_ts);
// Within location a replica is placed by the 'power of 2' algorithm.
ASSERT_TRUE(extra_ts->permanent_uuid() == "B_ts0" ||
@@ -623,7 +631,7 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasEmptyCluster_L5_TS7) {
{
static constexpr auto num_replicas = 3;
TSDescriptorVector result;
- ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, &result));
+ ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, nullopt,
nullopt, &result));
ASSERT_EQ(num_replicas, result.size());
TSDescriptorsMap m;
// Make sure the placement of replicas conforms with the main constraint:
@@ -645,7 +653,7 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasEmptyCluster_L5_TS7) {
{
static constexpr auto num_replicas = 5;
TSDescriptorVector result;
- ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, &result));
+ ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, nullopt,
nullopt, &result));
ASSERT_EQ(num_replicas, result.size());
TSDescriptorsMap m;
ASSERT_OK(TSDescriptorVectorToMap(result, &m));
@@ -662,7 +670,7 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasEmptyCluster_L5_TS7) {
{
static constexpr auto num_replicas = 7;
TSDescriptorVector result;
- ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, &result));
+ ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, nullopt,
nullopt, &result));
ASSERT_EQ(num_replicas, result.size());
TSDescriptorsMap m;
ASSERT_OK(TSDescriptorVectorToMap(result, &m));
@@ -676,7 +684,7 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasEmptyCluster_L5_TS7) {
{
static constexpr auto num_replicas = 9;
TSDescriptorVector result;
- auto s = policy.PlaceTabletReplicas(num_replicas, nullopt, &result);
+ auto s = policy.PlaceTabletReplicas(num_replicas, nullopt, nullopt,
nullopt, &result);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
const string ref_msg = Substitute(
"could not find next location after placing 7 out of $0 tablet
replicas",
@@ -708,9 +716,9 @@ TEST_F(PlacementPolicyTest, PlaceExtraTablet_L5_TS10_RF5) {
const auto existing = GetDescriptors(
{ "A_ts0", "B_ts0", "C_ts0", "D_ts0", "E_ts0", });
shared_ptr<TSDescriptor> extra_ts;
- ASSERT_OK(policy.PlaceExtraTabletReplica(existing, nullopt, &extra_ts));
+ ASSERT_OK(policy.PlaceExtraTabletReplica(existing, nullopt, nullopt,
nullopt, &extra_ts));
ASSERT_TRUE(extra_ts);
- // The location with lowest load is selected for the extra replica.
+ // The location with the lowest load is selected for the extra replica.
ASSERT_EQ("A_ts2", extra_ts->permanent_uuid());
}
{
@@ -719,9 +727,9 @@ TEST_F(PlacementPolicyTest, PlaceExtraTablet_L5_TS10_RF5) {
const auto existing = GetDescriptors(
{ "A_ts0", "A_ts1", "B_ts0", "B_ts1", "C_ts0", });
shared_ptr<TSDescriptor> extra_ts;
- ASSERT_OK(policy.PlaceExtraTabletReplica(existing, nullopt, &extra_ts));
+ ASSERT_OK(policy.PlaceExtraTabletReplica(existing, nullopt, nullopt,
nullopt, &extra_ts));
ASSERT_TRUE(extra_ts);
- // The location with lowest load is selected for the extra replica.
+ // The location with the lowest load is selected for the extra replica.
ASSERT_EQ("D_ts0", extra_ts->permanent_uuid());
}
{
@@ -730,7 +738,7 @@ TEST_F(PlacementPolicyTest, PlaceExtraTablet_L5_TS10_RF5) {
const auto existing = GetDescriptors(
{ "A_ts0", "B_ts0", "B_ts1", "B_ts2", "E_ts0", });
shared_ptr<TSDescriptor> extra_ts;
- ASSERT_OK(policy.PlaceExtraTabletReplica(existing, nullopt, &extra_ts));
+ ASSERT_OK(policy.PlaceExtraTabletReplica(existing, nullopt, nullopt,
nullopt, &extra_ts));
ASSERT_TRUE(extra_ts);
// Among the locations where an additional replica can be placed,
// location A and location C have the least load. As for the preferences
@@ -768,7 +776,7 @@ TEST_F(PlacementPolicyTest, PlaceExtraTablet_L5_TS16_RF5) {
map<string, int> placement_stats;
for (auto i = 0; i < 6000; ++i) {
shared_ptr<TSDescriptor> extra_ts;
- ASSERT_OK(policy.PlaceExtraTabletReplica(existing, nullopt, &extra_ts));
+ ASSERT_OK(policy.PlaceExtraTabletReplica(existing, nullopt, nullopt,
nullopt, &extra_ts));
ASSERT_TRUE(extra_ts);
const auto& ts_uuid = extra_ts->permanent_uuid();
ASSERT_TRUE(ts_uuid == "A_ts1" ||
@@ -796,7 +804,7 @@ TEST_F(PlacementPolicyTest, PlaceExtraTablet_L5_TS16_RF5) {
EXPECT_GT(1500, placement_stats["B_ts3"]);
}
-// Even RF case: edge cases with 2 locaitons.
+// Even RF case: edge cases with 2 locations.
TEST_F(PlacementPolicyTest, PlaceTabletReplicasEmptyCluster_L2_EvenRFEdgeCase)
{
const vector<LocationInfo> cluster_info = {
{ "A", { { "A_ts0", 10 }, { "A_ts1", 10 }, { "A_ts2", 10 }, } },
@@ -810,7 +818,7 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasEmptyCluster_L2_EvenRFEdgeCase) {
{
static constexpr auto num_replicas = 2;
TSDescriptorVector result;
- ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, &result));
+ ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, nullopt,
nullopt, &result));
ASSERT_EQ(num_replicas, result.size());
TSDescriptorsMap m;
ASSERT_OK(TSDescriptorVectorToMap(result, &m));
@@ -820,7 +828,7 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasEmptyCluster_L2_EvenRFEdgeCase) {
{
static constexpr auto num_replicas = 4;
TSDescriptorVector result;
- ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, &result));
+ ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, nullopt,
nullopt, &result));
ASSERT_EQ(num_replicas, result.size());
TSDescriptorsMap m;
ASSERT_OK(TSDescriptorVectorToMap(result, &m));
@@ -830,7 +838,7 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasEmptyCluster_L2_EvenRFEdgeCase) {
{
static constexpr auto num_replicas = 6;
TSDescriptorVector result;
- ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, &result));
+ ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, nullopt,
nullopt, &result));
ASSERT_EQ(num_replicas, result.size());
TSDescriptorsMap m;
ASSERT_OK(TSDescriptorVectorToMap(result, &m));
@@ -843,7 +851,7 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasEmptyCluster_L2_EvenRFEdgeCase) {
}
}
-// Odd RF case: edge cases with 2 locaitons.
+// Odd RF case: edge cases with 2 locations.
// Make sure replicas are placed into both locations, even if ideal density
// distribution would have them in a single location.
TEST_F(PlacementPolicyTest, PlaceTabletReplicasCluster_L2_OddRFEdgeCase) {
@@ -859,7 +867,7 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasCluster_L2_OddRFEdgeCase) {
{
static constexpr auto num_replicas = 3;
TSDescriptorVector result;
- ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, &result));
+ ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, nullopt,
nullopt, &result));
ASSERT_EQ(num_replicas, result.size());
TSDescriptorsMap m;
ASSERT_OK(TSDescriptorVectorToMap(result, &m));
@@ -870,7 +878,7 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasCluster_L2_OddRFEdgeCase) {
{
static constexpr auto num_replicas = 5;
TSDescriptorVector result;
- ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, &result));
+ ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, nullopt,
nullopt, &result));
ASSERT_EQ(num_replicas, result.size());
TSDescriptorsMap m;
ASSERT_OK(TSDescriptorVectorToMap(result, &m));
@@ -894,7 +902,7 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasEmptyCluster_L3_EvenRF) {
{
static constexpr auto num_replicas = 2;
TSDescriptorVector result;
- ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, &result));
+ ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, nullopt,
nullopt, &result));
ASSERT_EQ(num_replicas, result.size());
TSDescriptorsMap m;
ASSERT_OK(TSDescriptorVectorToMap(result, &m));
@@ -906,7 +914,7 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasEmptyCluster_L3_EvenRF) {
{
static constexpr auto num_replicas = 4;
TSDescriptorVector result;
- ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, &result));
+ ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, nullopt,
nullopt, &result));
ASSERT_EQ(num_replicas, result.size());
TSDescriptorsMap m;
ASSERT_OK(TSDescriptorVectorToMap(result, &m));
@@ -921,7 +929,7 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasEmptyCluster_L3_EvenRF) {
{
static constexpr auto num_replicas = 6;
TSDescriptorVector result;
- ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, &result));
+ ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, nullopt,
nullopt, &result));
ASSERT_EQ(num_replicas, result.size());
TSDescriptorsMap m;
ASSERT_OK(TSDescriptorVectorToMap(result, &m));
@@ -934,7 +942,7 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasEmptyCluster_L3_EvenRF) {
{
static constexpr auto num_replicas = 8;
TSDescriptorVector result;
- ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, &result));
+ ASSERT_OK(policy.PlaceTabletReplicas(num_replicas, nullopt, nullopt,
nullopt, &result));
ASSERT_EQ(num_replicas, result.size());
TSDescriptorsMap m;
ASSERT_OK(TSDescriptorVectorToMap(result, &m));
@@ -986,7 +994,7 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasWithNewTabletServers) {
for (auto i = 0; i < 1000; ++i) {
TSDescriptorVector result;
// Get the number of tablet replicas on tablet server.
- ASSERT_OK(policy.PlaceTabletReplicas(3, nullopt, &result));
+ ASSERT_OK(policy.PlaceTabletReplicas(3, nullopt, nullopt, nullopt,
&result));
ASSERT_EQ(3, result.size());
for (const auto& ts : result) {
const auto& ts_uuid = ts->permanent_uuid();
@@ -1010,7 +1018,8 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasWithNewTabletServers) {
map<string, int> placement_stats;
for (auto i = 0; i < 1000; ++i) {
TSDescriptorVector result;
- ASSERT_OK(policy.PlaceTabletReplicas(3, make_optional(string(label)),
&result));
+ ASSERT_OK(policy.PlaceTabletReplicas(
+ 3, make_optional(string(label)), nullopt, nullopt, &result));
ASSERT_EQ(3, result.size());
for (const auto& ts : result) {
const auto& ts_uuid = ts->permanent_uuid();
@@ -1026,5 +1035,412 @@ TEST_F(PlacementPolicyTest,
PlaceTabletReplicasWithNewTabletServers) {
}
}
+// RF is 1 while number of TS is 2, extreme case where one empty tablet server
gets a majority
+// of the replicas placed but with range aware replica placement it's more
balanced.
+TEST_F(PlacementPolicyTest, PlaceTabletReplicasWithRangesExtremeCase) {
+ const int nReplicationFactor = 1;
+ const int kNumServers = 2;
+ const vector<LocationInfo> cluster_info = {
+ {
+ "",
+ {
+ { "ts0", 0 },
+ { "ts1", 500 },
+ }
+ },
+ };
+
+ {
+ ASSERT_OK(Prepare(cluster_info));
+ const auto& all = descriptors();
+ PlacementPolicy policy(all, rng());
+ map<string, int> stats;
+ for (auto i = 0; i < 1000; ++i) {
+ TSDescriptorVector result;
+ ASSERT_OK(policy.PlaceTabletReplicas(nReplicationFactor, nullopt,
nullopt, nullopt, &result));
+ ASSERT_EQ(nReplicationFactor, result.size());
+ const auto& ts_uuid = result[0]->permanent_uuid();
+ ++stats[ts_uuid];
+ }
+ ASSERT_EQ(kNumServers, stats.size());
+ ASSERT_LE(748, stats["ts0"]);
+ ASSERT_GE(752, stats["ts0"]);
+ ASSERT_LE(248, stats["ts1"]);
+ ASSERT_GE(252, stats["ts1"]);
+ }
+
+ {
+ ASSERT_OK(Prepare(cluster_info));
+ const auto& all = descriptors();
+ PlacementPolicy policy(all, rng());
+ map<string, int> stats;
+ for (auto i = 0; i < 1000; ++i) {
+ TSDescriptorVector result;
+ ASSERT_OK(policy.PlaceTabletReplicas(nReplicationFactor, nullopt, "a1",
"t1", &result));
+ ASSERT_EQ(nReplicationFactor, result.size());
+ const auto& ts_uuid = result[0]->permanent_uuid();
+ ++stats[ts_uuid];
+ }
+ ASSERT_EQ(kNumServers, stats.size());
+ ASSERT_EQ(500, stats["ts0"]);
+ ASSERT_EQ(500, stats["ts1"]);
+ }
+}
+
+// RF is equal to the number of tablet servers, so replica placement will be
same with ranges.
+TEST_F(PlacementPolicyTest, PlaceTabletReplicasWithRanges) {
+ const int nReplicationFactor = 3;
+ const int kNumServers = 3;
+ TabletNumByRangeMap range({ {"a1", 500} });
+ const vector<LocationInfo> cluster_info = {
+ {
+ "",
+ {
+ { "ts0", 0 },
+ { "ts1", 0 },
+ { "ts2", 500, { }, { {"t1", range }, } },
+ }
+ },
+ };
+
+ {
+ ASSERT_OK(Prepare(cluster_info));
+ const auto& all = descriptors();
+ PlacementPolicy policy(all, rng());
+ map<string, int> stats;
+ for (auto i = 0; i < 1000; ++i) {
+ TSDescriptorVector result;
+ ASSERT_OK(policy.PlaceTabletReplicas(nReplicationFactor, nullopt,
nullopt, nullopt, &result));
+ ASSERT_EQ(nReplicationFactor, result.size());
+ for (const auto& ts : result) {
+ const auto& ts_uuid = ts->permanent_uuid();
+ ++stats[ts_uuid];
+ }
+ }
+ ASSERT_EQ(kNumServers, stats.size());
+ ASSERT_EQ(1000, stats["ts0"]);
+ ASSERT_EQ(1000, stats["ts1"]);
+ ASSERT_EQ(1000, stats["ts2"]);
+ }
+
+ {
+ ASSERT_OK(Prepare(cluster_info));
+ const auto& all = descriptors();
+ PlacementPolicy policy(all, rng());
+ map<string, int> stats;
+ for (auto i = 0; i < 1000; ++i) {
+ TSDescriptorVector result;
+ ASSERT_OK(policy.PlaceTabletReplicas(nReplicationFactor, nullopt, "a1",
"t1", &result));
+ ASSERT_EQ(nReplicationFactor, result.size());
+ for (const auto& ts : result) {
+ const auto& ts_uuid = ts->permanent_uuid();
+ ++stats[ts_uuid];
+ }
+ }
+ ASSERT_EQ(kNumServers, stats.size());
+ ASSERT_EQ(1000, stats["ts0"]);
+ ASSERT_EQ(1000, stats["ts1"]);
+ ASSERT_EQ(1000, stats["ts2"]);
+ }
+}
+
+// RF is 3 while number of tablet servers is 4 with 2 of them empty and the
other 2 loaded with
+// 500 replicas, although only one tablet server has replicas with the same
range that is being
+// added. The expectation is that the tablet server with replicas not from
that range will be
+// treated similar to the empty tablet servers. This test case also ensures
that ranges with the
+// same start key from two different tables can properly be differentiated.
+TEST_F(PlacementPolicyTest, PlaceTabletReplicasWithRangesAndTables) {
+ const int nReplicationFactor = 3;
+ const int kNumServers = 4;
+ TabletNumByRangeMap range({ {"a1", 500} });
+ const vector<LocationInfo> cluster_info = {
+ {
+ "",
+ {
+ { "ts0", 0 },
+ { "ts1", 0 },
+ { "ts2", 500, { }, { {"t2", range }, } },
+ { "ts3", 500, { }, { {"t1", range }, } },
+ }
+ },
+ };
+
+ {
+ ASSERT_OK(Prepare(cluster_info));
+ const auto& all = descriptors();
+ PlacementPolicy policy(all, rng());
+ map<string, int> stats;
+ for (auto i = 0; i < 1000; ++i) {
+ TSDescriptorVector result;
+ ASSERT_OK(policy.PlaceTabletReplicas(nReplicationFactor, nullopt,
nullopt, nullopt, &result));
+ ASSERT_EQ(nReplicationFactor, result.size());
+ for (const auto& ts : result) {
+ const auto& ts_uuid = ts->permanent_uuid();
+ ++stats[ts_uuid];
+ }
+ }
+ ASSERT_EQ(kNumServers, stats.size());
+ ASSERT_EQ(1000, stats["ts0"]);
+ ASSERT_EQ(1000, stats["ts1"]);
+ ASSERT_EQ(500, stats["ts2"]);
+ ASSERT_EQ(500, stats["ts3"]);
+ }
+
+ {
+ ASSERT_OK(Prepare(cluster_info));
+ const auto& all = descriptors();
+ PlacementPolicy policy(all, rng());
+ map<string, int> stats;
+ for (auto i = 0; i < 1000; ++i) {
+ TSDescriptorVector result;
+ ASSERT_OK(policy.PlaceTabletReplicas(nReplicationFactor, nullopt, "a1",
"t1", &result));
+ ASSERT_EQ(nReplicationFactor, result.size());
+ for (const auto& ts : result) {
+ const auto& ts_uuid = ts->permanent_uuid();
+ ++stats[ts_uuid];
+ }
+ }
+ ASSERT_EQ(kNumServers, stats.size());
+ vector<string> tservers_no_load = {"ts0", "ts1", "ts2"};
+ for (const auto& tserver : tservers_no_load) {
+ ASSERT_LE(871, stats[tserver]);
+ ASSERT_GE(881, stats[tserver]);
+ }
+ ASSERT_LE(368, stats["ts3"]);
+ ASSERT_GE(378, stats["ts3"]);
+ }
+}
+
+// RF is 3 while number of tablet servers is 6. Half of them are empty while
the other half contain
+// an increasing number of replicas. One of them does not contain replicas of
the range that is
+// being placed thus it is expected to be treated like the empty tablet
servers when placing
+// replicas by range.
+TEST_F(PlacementPolicyTest,
PlaceTabletReplicasWithRangesAndTablesWithDoubleTServers) {
+ const int nReplicationFactor = 3;
+ const int kNumServers = 6;
+ TabletNumByRangeMap range1({ {"b1", 250} });
+ TabletNumByRangeMap range2({ {"a1", 375} });
+ TabletNumByRangeMap range3({ {"a1", 500} });
+ const vector<LocationInfo> cluster_info = {
+ {
+ "",
+ {
+ { "ts0", 0 },
+ { "ts1", 0 },
+ { "ts2", 0 },
+ { "ts3", 250, { }, { {"t1", range1 }, } },
+ { "ts4", 375, { }, { {"t1", range2 }, } },
+ { "ts5", 500, { }, { {"t1", range3 }, } },
+ }
+ },
+ };
+
+ {
+ ASSERT_OK(Prepare(cluster_info));
+ const auto& all = descriptors();
+ PlacementPolicy policy(all, rng());
+ map<string, int> stats;
+ for (auto i = 0; i < 1000; ++i) {
+ TSDescriptorVector result;
+ ASSERT_OK(policy.PlaceTabletReplicas(nReplicationFactor, nullopt,
nullopt, nullopt, &result));
+ ASSERT_EQ(nReplicationFactor, result.size());
+ for (const auto& ts : result) {
+ const auto& ts_uuid = ts->permanent_uuid();
+ ++stats[ts_uuid];
+ }
+ }
+ ASSERT_EQ(kNumServers, stats.size());
+ vector<string> tservers_no_load = {"ts0", "ts1", "ts2"};
+ for (const auto& tserver : tservers_no_load) {
+ ASSERT_LE(682, stats[tserver]);
+ ASSERT_GE(691, stats[tserver]);
+ }
+ ASSERT_LE(432, stats["ts3"]);
+ ASSERT_GE(441, stats["ts3"]);
+ ASSERT_LE(306, stats["ts4"]);
+ ASSERT_GE(315, stats["ts4"]);
+ ASSERT_LE(182, stats["ts5"]);
+ ASSERT_GE(191, stats["ts5"]);
+ }
+
+ {
+ ASSERT_OK(Prepare(cluster_info));
+ const auto& all = descriptors();
+ PlacementPolicy policy(all, rng());
+ map<string, int> stats;
+ for (auto i = 0; i < 1000; ++i) {
+ TSDescriptorVector result;
+ ASSERT_OK(policy.PlaceTabletReplicas(nReplicationFactor, nullopt, "a1",
"t1", &result));
+ ASSERT_EQ(nReplicationFactor, result.size());
+ for (const auto& ts : result) {
+ const auto& ts_uuid = ts->permanent_uuid();
+ ++stats[ts_uuid];
+ }
+ }
+ ASSERT_EQ(kNumServers, stats.size());
+ vector<string> tservers_no_load = {"ts0", "ts1", "ts2", "ts3"};
+ for (const auto& tserver : tservers_no_load) {
+ ASSERT_LE(642, stats[tserver]);
+ ASSERT_GE(652, stats[tserver]);
+ }
+ ASSERT_LE(264, stats["ts4"]);
+ ASSERT_GE(274, stats["ts4"]);
+ ASSERT_LE(137, stats["ts5"]);
+ ASSERT_GE(147, stats["ts5"]);
+ }
+}
+
+// With RF = 3, 6 tablet servers, and a well-balanced cluster initially, this
test case adds
+// several range partitions for the existing table and for a new table. All
these range partitions
+// have the same start key and twice as many replicas for the new table will
be added. These
+// replicas are expected to be distributed evenly per table.
+TEST_F(PlacementPolicyTest, PlaceTabletReplicasWithSeveralRanges) {
+ const int nReplicationFactor = 3;
+ const int kNumServers = 6;
+ TabletNumByRangeMap ranges_per_table({ {"a1", 250}, {"b1", 250} });
+ const vector<LocationInfo> cluster_info = {
+ {
+ "",
+ {
+ { "ts0", 500, { }, { {"t1", ranges_per_table }, } },
+ { "ts1", 500, { }, { {"t1", ranges_per_table }, } },
+ { "ts2", 500, { }, { {"t1", ranges_per_table }, } },
+ { "ts3", 500, { }, { {"t1", ranges_per_table }, } },
+ { "ts4", 500, { }, { {"t1", ranges_per_table }, } },
+ { "ts5", 500, { }, { {"t1", ranges_per_table }, } },
+ }
+ },
+ };
+
+ ASSERT_OK(Prepare(cluster_info));
+ const auto& all = descriptors();
+ PlacementPolicy policy(all, rng());
+ typedef std::unordered_map<string, std::unordered_map<string, int>>
replicas_per_range_per_table;
+ std::unordered_map<string, replicas_per_range_per_table> stats;
+ vector<string> ranges = {"a1", "b1", "c1", "d1", "e1", "f1"};
+ vector<std::pair<string, int>> tables = {{"t1", 500}, {"t2", 1000}};
+ for (const auto& table : tables) {
+ for (const auto& range : ranges) {
+ for (auto i = 0; i < table.second; ++i) {
+ TSDescriptorVector result;
+ ASSERT_OK(
+ policy.PlaceTabletReplicas(nReplicationFactor, nullopt, range,
table.first, &result));
+ ASSERT_EQ(nReplicationFactor, result.size());
+ for (const auto& ts : result) {
+ const auto& ts_uuid = ts->permanent_uuid();
+ ++stats[ts_uuid][table.first][range];
+ }
+ }
+ }
+ }
+
+ ASSERT_EQ(kNumServers, stats.size());
+ for (const auto& stat : stats) {
+ int tserver_replicas = 0;
+ // Verifies that two tables exist on each tserver.
+ ASSERT_EQ(2, stat.second.size());
+ for (const auto& table : stat.second) {
+ // Verifies that the six ranges exist for each table on each tserver.
+ ASSERT_EQ(6, table.second.size());
+ for (const auto& ranges : table.second) {
+ // Since we placed twice as many replicas for table "t2", we expect
twice as many replicas.
+ if (table.first == "t2") {
+ ASSERT_EQ(500, ranges.second);
+ } else {
+ ASSERT_EQ(250, ranges.second);
+ }
+ tserver_replicas += ranges.second;
+ }
+ }
+ // Verifies that 4500 replicas are placed on each tserver.
+ ASSERT_EQ(4500, tserver_replicas);
+ }
+}
+
+// With RF = 3, 6 tablet servers, and an imbalanced cluster by tables
initially, this test case adds
+// several range partitions for the existing table and for a new table.
+TEST_F(PlacementPolicyTest, PlaceTabletReplicasWithImbalanceByTable) {
+ const int nReplicationFactor = 3;
+ const int kNumServers = 6;
+ TabletNumByRangeMap ranges_per_table({ {"a1", 250}, {"b1", 250} });
+ const vector<LocationInfo> cluster_info = {
+ {
+ "",
+ {
+ { "ts0", 500, { }, { {"t1", ranges_per_table }, } },
+ { "ts1", 500, { }, { {"t2", ranges_per_table }, } },
+ { "ts2", 500, { }, { {"t1", ranges_per_table }, } },
+ { "ts3", 500, { }, { {"t2", ranges_per_table }, } },
+ { "ts4", 500, { }, { {"t1", ranges_per_table }, } },
+ { "ts5", 500, { }, { {"t2", ranges_per_table }, } },
+ }
+ },
+ };
+
+ ASSERT_OK(Prepare(cluster_info));
+ const auto& all = descriptors();
+ PlacementPolicy policy(all, rng());
+ typedef std::unordered_map<string, std::unordered_map<string, int>>
replicas_per_range_per_table;
+ std::unordered_map<string, replicas_per_range_per_table> stats;
+ vector<string> ranges = {"a1", "b1", "c1", "d1", "e1", "f1"};
+ vector<string> tables = {"t1", "t2"};
+ for (const auto& table : tables) {
+ for (const auto& range : ranges) {
+ for (auto i = 0; i < 500; ++i) {
+ TSDescriptorVector result;
+ ASSERT_OK(policy.PlaceTabletReplicas(nReplicationFactor, nullopt,
range, table, &result));
+ ASSERT_EQ(nReplicationFactor, result.size());
+ for (const auto& ts : result) {
+ const auto& ts_uuid = ts->permanent_uuid();
+ ++stats[ts_uuid][table][range];
+ }
+ }
+ }
+ }
+
+ ASSERT_EQ(kNumServers, stats.size());
+ vector<string> balanced_ranges = {"c1", "d1", "e1", "f1"};
+ vector<string> tservers1 = {"ts0", "ts2", "ts4"};
+ for (const auto& stat : stats) {
+ int tserver_replicas = 0;
+ // Verifies that two tables exist on each tserver.
+ ASSERT_EQ(2, stat.second.size());
+ for (const auto& table : stat.second) {
+ // Verifies that the six ranges exist for each table on each tserver.
+ ASSERT_EQ(6, table.second.size());
+ for (const auto& ranges : table.second) {
+ if (std::find(balanced_ranges.begin(),
+ balanced_ranges.end(), ranges.first) !=
balanced_ranges.end()) {
+ ASSERT_LE(245, ranges.second);
+ ASSERT_GE(255, ranges.second);
+ } else if (std::find(tservers1.begin(),
+ tservers1.end(), stat.first) != tservers1.end()) {
+ // Only range of "a1" or "b1" now. Checks which tservers and for
table "t1" or "t2".
+ if (table.first == "t1") {
+ ASSERT_LE(120, ranges.second);
+ ASSERT_GE(130, ranges.second);
+ } else {
+ ASSERT_LE(370, ranges.second);
+ ASSERT_GE(380, ranges.second);
+ }
+ } else {
+ // Ranges "a1" or "b1" for tservers "ts1", "ts3", "ts5". Checks for
table "t1" or "t2".
+ if (table.first == "t1") {
+ ASSERT_LE(370, ranges.second);
+ ASSERT_GE(380, ranges.second);
+ } else {
+ ASSERT_LE(120, ranges.second);
+ ASSERT_GE(130, ranges.second);
+ }
+ }
+ tserver_replicas += ranges.second;
+ }
+ }
+ // Verifies that around 3000 replicas are placed on each tserver.
+ ASSERT_LE(2990, tserver_replicas);
+ ASSERT_GE(3110, tserver_replicas);
+ }
+}
+
} // namespace master
} // namespace kudu
diff --git a/src/kudu/master/placement_policy.cc
b/src/kudu/master/placement_policy.cc
index 7b3843992..44c0cb292 100644
--- a/src/kudu/master/placement_policy.cc
+++ b/src/kudu/master/placement_policy.cc
@@ -18,6 +18,7 @@
#include "kudu/master/placement_policy.h"
#include <iterator>
+#include <limits>
#include <map>
#include <memory>
#include <numeric>
@@ -25,7 +26,7 @@
#include <ostream>
#include <set>
#include <string>
-#include <unordered_map>
+#include <type_traits>
#include <utility>
#include <vector>
@@ -42,7 +43,6 @@ using std::optional;
using std::set;
using std::shared_ptr;
using std::string;
-using std::unordered_map;
using std::vector;
using strings::Substitute;
@@ -51,17 +51,46 @@ namespace master {
namespace {
-double GetTSLoad(const optional<string>& dimension, TSDescriptor* desc) {
+typedef std::pair<std::shared_ptr<TSDescriptor>, vector<double>> ts_stats;
+
+double GetTSLoad(TSDescriptor* desc, const optional<string>& dimension) {
// TODO (oclarms): get the number of times this tablet server has recently
been
// selected to create a tablet replica by dimension.
return desc->RecentReplicaCreations() + desc->num_live_replicas(dimension);
}
+double GetTSRangeLoad(TSDescriptor* desc, const string& range_start_key, const
string& table_id) {
+ return desc->RecentReplicaCreationsByRange(range_start_key, table_id)
+ + desc->num_live_replicas_by_range(range_start_key, table_id);
+}
+
+double GetTSTableLoad(TSDescriptor* desc, const string& table_id) {
+ return desc->RecentReplicaCreationsByTable(table_id) +
desc->num_live_replicas_by_table(table_id);
+}
+
+// Iterates through `src_replicas_stats` and returns set of tablet servers
that have the least
+// replicas of a particular stat (range, table, total) that is determined by
'index'.
+vector<ts_stats> TabletServerTieBreaker(const vector<ts_stats>&
src_replicas_stats,
+ int index) {
+ double min = std::numeric_limits<double>::max();
+ vector<ts_stats> result;
+ for (const auto& stats : src_replicas_stats) {
+ if (stats.second[index] < min) {
+ min = stats.second[index];
+ result.clear();
+ result.emplace_back(stats);
+ } else if (stats.second[index] == min) {
+ result.emplace_back(stats);
+ }
+ }
+ return result;
+}
+
// Given exactly two choices in 'two_choices', pick the better tablet server on
// which to place a tablet replica. Ties are broken using 'rng'.
-shared_ptr<TSDescriptor> PickBetterReplica(
+shared_ptr<TSDescriptor> PickBetterTabletServer(
const TSDescriptorVector& two_choices,
- const optional<std::string>& dimension,
+ const optional<string>& dimension,
ThreadSafeRandom* rng) {
CHECK_EQ(2, two_choices.size());
@@ -86,8 +115,8 @@ shared_ptr<TSDescriptor> PickBetterReplica(
//
// TODO(wdberkeley): in the future we may want to factor in other items such
// as available disk space, actual request load, etc.
- double load_a = GetTSLoad(dimension, a.get());
- double load_b = GetTSLoad(dimension, b.get());
+ double load_a = GetTSLoad(a.get(), dimension);
+ double load_b = GetTSLoad(b.get(), dimension);
if (load_a < load_b) {
return a;
}
@@ -98,6 +127,51 @@ shared_ptr<TSDescriptor> PickBetterReplica(
return two_choices[rng->Uniform(2)];
}
+// Given a set of tablet servers in 'ts_choices', pick the best tablet server
on which to place a
+// tablet replica based on the existing number of replicas per the given
range. The tiebreaker
+// is the number of replicas per table, then number of replicas overall. If
still tied, use rng.
+shared_ptr<TSDescriptor> PickTabletServer(const TSDescriptorVector& ts_choices,
+ const string& range_key_start,
+ const string& table_id,
+ const optional<string>& dimension,
+ ThreadSafeRandom* rng) {
+ CHECK_GE(ts_choices.size(), 2);
+ vector<ts_stats> replicas_stats;
+ // Find the number of replicas per the given range, the given table,
+ // and total replicas for each tablet server.
+ for (const auto& ts : ts_choices) {
+ auto* ts_desc = ts.get();
+ auto tablets_by_range = GetTSRangeLoad(ts_desc, range_key_start, table_id);
+ auto tablets_by_table = GetTSTableLoad(ts_desc, table_id);
+ auto tablets = GetTSLoad(ts_desc, dimension);
+ const vector<double> tablet_count = {tablets_by_range, tablets_by_table,
tablets};
+ replicas_stats.emplace_back(ts, tablet_count);
+ }
+ // Given set of tablet servers with stats about replicas per given range,
table,
+ // and total replicas, find tablet servers with the least replicas per given
range.
+ vector<ts_stats> replicas_by_range = TabletServerTieBreaker(replicas_stats,
0);
+ CHECK(!replicas_by_range.empty());
+ if (replicas_by_range.size() == 1) {
+ return replicas_by_range[0].first;
+ }
+ // Given set of tablet servers with the least replicas per given range,
+ // find tablet servers with the least replicas per given table.
+ vector<ts_stats> replicas_by_table =
TabletServerTieBreaker(replicas_by_range, 1);
+ CHECK(!replicas_by_table.empty());
+ if (replicas_by_table.size() == 1) {
+ return replicas_by_table[0].first;
+ }
+ // Given set of tablet servers with the least replicas per range and table,
+ // find tablet servers with the least replicas overall.
+ vector<ts_stats> replicas_total = TabletServerTieBreaker(replicas_by_table,
2);
+ CHECK(!replicas_total.empty());
+ if (replicas_total.size() == 1) {
+ return replicas_total[0].first;
+ }
+ // No more tiebreakers, randomly select a tablet server.
+ return replicas_total[rng->Uniform(replicas_total.size())].first;
+}
+
} // anonymous namespace
PlacementPolicy::PlacementPolicy(TSDescriptorVector descs,
@@ -114,7 +188,9 @@ PlacementPolicy::PlacementPolicy(TSDescriptorVector descs,
}
Status PlacementPolicy::PlaceTabletReplicas(int nreplicas,
- const optional<std::string>&
dimension,
+ const optional<string>& dimension,
+ const optional<string>&
range_key_start,
+ const optional<string>& table_id,
TSDescriptorVector* ts_descs)
const {
DCHECK(ts_descs);
@@ -127,14 +203,17 @@ Status PlacementPolicy::PlaceTabletReplicas(int nreplicas,
const auto& loc = elem.first;
const auto loc_nreplicas = elem.second;
const auto& ts_descriptors = FindOrDie(ltd_, loc);
- RETURN_NOT_OK(SelectReplicas(ts_descriptors, loc_nreplicas, dimension,
ts_descs));
+ RETURN_NOT_OK(SelectReplicas(
+ ts_descriptors, loc_nreplicas, dimension, range_key_start, table_id,
ts_descs));
}
return Status::OK();
}
Status PlacementPolicy::PlaceExtraTabletReplica(
TSDescriptorVector existing,
- const optional<std::string>& dimension,
+ const optional<string>& dimension,
+ const optional<string>& range_key_start,
+ const optional<string>& table_id,
shared_ptr<TSDescriptor>* ts_desc) const {
DCHECK(ts_desc);
@@ -172,7 +251,8 @@ Status PlacementPolicy::PlaceExtraTabletReplica(
return Status::IllegalState(
Substitute("'$0': no info on tablet servers at location", location));
}
- auto replica = SelectReplica(*location_ts_descs_ptr, dimension,
existing_set);
+ auto replica = SelectReplica(
+ *location_ts_descs_ptr, dimension, range_key_start, table_id,
existing_set);
if (!replica) {
return Status::NotFound("could not find tablet server for extra replica");
}
@@ -233,6 +313,8 @@ Status PlacementPolicy::SelectReplicaLocations(
Status PlacementPolicy::SelectReplicas(const TSDescriptorVector&
source_ts_descs,
int nreplicas,
const optional<string>& dimension,
+ const optional<string>& range_key_start,
+ const optional<string>& table_id,
TSDescriptorVector* result_ts_descs)
const {
if (nreplicas > source_ts_descs.size()) {
return Status::InvalidArgument(
@@ -244,27 +326,37 @@ Status PlacementPolicy::SelectReplicas(const
TSDescriptorVector& source_ts_descs
// put two replicas on the same host.
set<shared_ptr<TSDescriptor>> already_selected;
for (auto i = 0; i < nreplicas; ++i) {
- auto ts = SelectReplica(source_ts_descs, dimension, already_selected);
+ auto ts = SelectReplica(
+ source_ts_descs, dimension, range_key_start, table_id,
already_selected);
CHECK(ts);
// Increment the number of pending replicas so that we take this selection
// into account when assigning replicas for other tablets of the same
table.
// This value decays back to 0 over time.
ts->IncrementRecentReplicaCreations();
+ if (range_key_start && table_id) {
+
ts->IncrementRecentReplicaCreationsByRangeAndTable(range_key_start.value(),
table_id.value());
+ }
result_ts_descs->emplace_back(ts);
EmplaceOrDie(&already_selected, std::move(ts));
}
return Status::OK();
}
+// The new replica selection algorithm takes into consideration the number of
+// replicas per range of the set of replicas that is being placed. If there's a
+// tie between multiple tablet servers, the number of replicas per table of the
+// set of replicas being placed will be considered. If there's still a tie
between
+// multiple tablet servers, the total number of replicas will be considered. If
+// there's still a tie, a tablet server will be randomly chosen.
//
-// The replica selection algorithm follows the idea from
+// The old replica selection algorithm follows the idea from
// "Power of Two Choices in Randomized Load Balancing"[1]. For each replica,
// we randomly select two tablet servers, and then assign the replica to the
// less-loaded one of the two. This has some nice properties:
//
// 1) because the initial selection of two servers is random, we get good
-// spreading of replicas across the cluster. In contrast if we sorted by
+// spreading of replicas across the cluster. In contrast, if we sorted by
// load and always picked under-loaded servers first, we'd end up causing
// all tablets of a new table to be placed on an empty server. This wouldn't
// give good load balancing of that table.
@@ -284,22 +376,39 @@ Status PlacementPolicy::SelectReplicas(const
TSDescriptorVector& source_ts_descs
shared_ptr<TSDescriptor> PlacementPolicy::SelectReplica(
const TSDescriptorVector& ts_descs,
const optional<string>& dimension,
+ const optional<string>& range_key_start,
+ const optional<string>& table_id,
const set<shared_ptr<TSDescriptor>>& excluded) const {
- // Pick two random servers, excluding those we've already picked.
- // If we've only got one server left, 'two_choices' will actually
- // just contain one element.
- vector<shared_ptr<TSDescriptor>> two_choices;
- ReservoirSample(ts_descs, 2, excluded, rng_, &two_choices);
- DCHECK_LE(two_choices.size(), 2);
-
- if (two_choices.size() == 2) {
- // Pick the better of the two.
- return PickBetterReplica(two_choices, dimension, rng_);
- }
- if (two_choices.size() == 1) {
- return two_choices.front();
+ if (range_key_start && table_id) {
+ TSDescriptorVector ts_choices;
+ auto choices_size = ts_descs.size() - excluded.size();
+ ReservoirSample(ts_descs, choices_size, excluded, rng_, &ts_choices);
+ DCHECK_EQ(ts_choices.size(), choices_size);
+ if (ts_choices.size() > 1) {
+ return PickTabletServer(
+ ts_choices, range_key_start.value(), table_id.value(), dimension,
rng_);
+ }
+ if (ts_choices.size() == 1) {
+ return ts_choices.front();
+ }
+ return nullptr;
+ } else {
+ // Pick two random servers, excluding those we've already picked.
+ // If we've only got one server left, 'two_choices' will actually
+ // just contain one element.
+ TSDescriptorVector two_choices;
+ ReservoirSample(ts_descs, 2, excluded, rng_, &two_choices);
+ DCHECK_LE(two_choices.size(), 2);
+
+ if (two_choices.size() == 2) {
+ // Pick the better of the two.
+ return PickBetterTabletServer(two_choices, dimension, rng_);
+ }
+ if (two_choices.size() == 1) {
+ return two_choices.front();
+ }
+ return nullptr;
}
- return nullptr;
}
Status PlacementPolicy::SelectLocation(
diff --git a/src/kudu/master/placement_policy.h
b/src/kudu/master/placement_policy.h
index e14b38dde..48d81914e 100644
--- a/src/kudu/master/placement_policy.h
+++ b/src/kudu/master/placement_policy.h
@@ -72,27 +72,47 @@ class PlacementPolicy {
// Select tablet servers to host the given number of replicas for a tablet.
//
// Parameters:
- // 'nreplicas' The 'nreplicas' parameter specifies the desired
replication factor.
- // 'dimension' The 'dimension' parameter specifies the dimension
information of the tablet.
- // If not none, place tablet replicas based on the number of
tablets in a
- // dimension. Otherwise, based on the number of tablets at a
tablet server.
- // 'ts_descs' The result set of tablet server descriptors is output into
the 'ts_descs'
- // placeholder (must not be null).
+ // 'nreplicas' The 'nreplicas' parameter specifies the desired
replication factor.
+ // 'dimension' The 'dimension' parameter specifies the dimension
information of the
+ // tablet. If not null, place tablet replicas based on
the number of tablets
+ // in a dimension. Otherwise, based on the number of
tablets at a tserver.
+ // 'range_key_start' The 'range_key_start' parameter specifies the start
range key of the
+ // tablet. If not null, place tablet replicas based on
number of replicas per
+ // this range. Otherwise, place them based on number of
tablets per tserver.
+ // 'table_id' The 'table_id' parameter specifies the table id to
which the tablet
+ // belongs to. If not null, place tablet replicas based
on number of replicas
+ // per this table if the number of replicas per the
range to which this
+ // tablet belongs to is equal. Otherwise, place the
replicas based on number
+ // of tablets at a tablet server.
+ // 'ts_descs' The result set of tablet server descriptors is
output into the 'ts_descs'
+ // placeholder (must not be null).
Status PlaceTabletReplicas(int nreplicas,
const std::optional<std::string>& dimension,
+ const std::optional<std::string>& range_key_start,
+ const std::optional<std::string>& table_id,
TSDescriptorVector* ts_descs) const;
// Select tablet server to host an additional tablet replica.
//
// Parameters:
- // 'existing' The 'existing' parameter lists current members of the
tablet's
- // Raft configuration.
- // 'dimension' The 'dimension' parameter specifies the dimension
information of the tablet.
- // If not none, place tablet replicas based on the number of
tablets in a
- // dimension. Otherwise, based on the number of tablets at a
tablet server.
- // 'ts_desc' The new member is output into 'ts_desc' placeholer (must
not be null).
+ // 'existing' The 'existing' parameter lists current members of
the tablet's
+ // Raft configuration.
+ // 'dimension' The 'dimension' parameter specifies the dimension
information of the
+ // tablet. If not null, place tablet replicas based on
the number of tablets
+ // in a dimension. Otherwise, based on the number of
tablets at a tserver.
+ // 'range_key_start' The 'range_key_start' parameter specifies the start
range key of the
+ // tablet. If not null, place tablet replicas based on
number of replicas per
+ // this range. Otherwise, place them based on number of
tablets per tserver.
+ // 'table_id' The 'table_id' parameter specifies the table id to
which the tablet
+ // belongs to. If not null, place tablet replicas based
on number of replicas
+ // per this table if the number of replicas per the
range to which this
+ // tablet belongs to is equal. Otherwise, place the
replicas based on number
+ // of tablets at a tablet server.
+ // 'ts_desc' The new member is output into 'ts_desc' placeholder
(must not be null).
Status PlaceExtraTabletReplica(TSDescriptorVector existing,
const std::optional<std::string>& dimension,
+ const std::optional<std::string>&
range_key_start,
+ const std::optional<std::string>& table_id,
std::shared_ptr<TSDescriptor>* ts_desc) const;
private:
@@ -140,6 +160,8 @@ class PlacementPolicy {
Status SelectReplicas(const TSDescriptorVector& source_ts_descs,
int nreplicas,
const std::optional<std::string>& dimension,
+ const std::optional<std::string>& range_key_start,
+ const std::optional<std::string>& table_id,
TSDescriptorVector* result_ts_descs) const;
// Given the tablet servers in 'ts_descs', pick a tablet server to host
@@ -148,6 +170,8 @@ class PlacementPolicy {
std::shared_ptr<TSDescriptor> SelectReplica(
const TSDescriptorVector& ts_descs,
const std::optional<std::string>& dimension,
+ const std::optional<std::string>& range_key_start,
+ const std::optional<std::string>& table_id,
const std::set<std::shared_ptr<TSDescriptor>>& excluded) const;
// Select location for next replica of a tablet with the specified
replication
diff --git a/src/kudu/master/ts_descriptor.cc b/src/kudu/master/ts_descriptor.cc
index f9152a564..5057a92d8 100644
--- a/src/kudu/master/ts_descriptor.cc
+++ b/src/kudu/master/ts_descriptor.cc
@@ -82,6 +82,7 @@ TSDescriptor::TSDescriptor(std::string perm_id)
needs_full_report_(false),
recent_replica_creations_(0),
last_replica_creations_decay_(MonoTime::Now()),
+ init_time_(MonoTime::Now()),
num_live_replicas_(0) {
}
@@ -187,7 +188,7 @@ void TSDescriptor::DecayRecentReplicaCreationsUnlocked() {
if (recent_replica_creations_ == 0) return;
const double kHalflifeSecs =
FLAGS_tserver_last_replica_creations_halflife_ms / 1000;
- MonoTime now = MonoTime::Now();
+ const MonoTime now = MonoTime::Now();
double secs_since_last_decay = (now -
last_replica_creations_decay_).ToSeconds();
recent_replica_creations_ *= pow(0.5, secs_since_last_decay / kHalflifeSecs);
@@ -198,12 +199,84 @@ void TSDescriptor::DecayRecentReplicaCreationsUnlocked() {
last_replica_creations_decay_ = now;
}
+// TODO(mreddy) Avoid hacky decay code by potentially getting information
about recently
+// placed replicas from the system catalog, tablet server takes too long to
report during
+// table creation process as requests aren't sent to servers until after
selection process.
+void TSDescriptor::DecayRecentReplicaCreationsByRangeUnlocked(const string&
range_start_key,
+ const string&
table_id) {
+ // In most cases, we won't have any recent replica creations, so
+ // we don't need to bother calling the clock, etc. Such cases include when
+ // the map for the table or the range hasn't been initialized yet, or if the
value is 0.
+ if (recent_replicas_by_range_.find(table_id) ==
recent_replicas_by_range_.end() ||
+ recent_replicas_by_range_[table_id].first.find(range_start_key) ==
+ recent_replicas_by_range_[table_id].first.end() ||
+ recent_replicas_by_range_[table_id].first[range_start_key] == 0) {
+ return;
+ }
+
+ const double kHalflifeSecs =
FLAGS_tserver_last_replica_creations_halflife_ms / 1000;
+ const MonoTime now = MonoTime::Now();
+ // If map for the table or range hasn't been initialized yet, use init_time_
as last decay.
+ MonoTime last_decay =
+ last_replica_decay_by_range_.find(table_id) ==
last_replica_decay_by_range_.end() ||
+ last_replica_decay_by_range_[table_id].first.find(range_start_key) ==
+ last_replica_decay_by_range_[table_id].first.end() ?
+ init_time_ :
last_replica_decay_by_range_[table_id].first[range_start_key];
+ double secs_since_last_decay = (now - last_decay).ToSeconds();
+ recent_replicas_by_range_[table_id].first[range_start_key] *=
+ pow(0.5, secs_since_last_decay / kHalflifeSecs);
+
+ // If sufficiently small, reset down to 0 to take advantage of the fast path
above.
+ if (recent_replicas_by_range_[table_id].first[range_start_key] < 1e-12) {
+ recent_replicas_by_range_[table_id].first[range_start_key] = 0;
+ }
+ // First time this is set, it silently initializes
last_replica_decay_by_range_[table_id].second
+ // to 0. This fails monotime init check in DecayTableUnlocked() so it's set
here.
+ last_replica_decay_by_range_[table_id].first[range_start_key] = now;
+ if (!last_replica_decay_by_range_[table_id].second.Initialized()) {
+ last_replica_decay_by_range_[table_id].second = now;
+ }
+}
+
+void TSDescriptor::DecayRecentReplicaCreationsByTableUnlocked(const string&
table_id) {
+ // In most cases, we won't have any recent replica creations, so
+ // we don't need to bother calling the clock, etc.
+ if (recent_replicas_by_range_.find(table_id) ==
recent_replicas_by_range_.end() ||
+ recent_replicas_by_range_[table_id].second == 0) {
+ return;
+ }
+
+ const double kHalflifeSecs =
FLAGS_tserver_last_replica_creations_halflife_ms / 1000;
+ const MonoTime now = MonoTime::Now();
+ // If map for the table hasn't been initialized yet, use init_time_ as last
decay.
+ MonoTime last_decay =
+ last_replica_decay_by_range_.find(table_id) ==
last_replica_decay_by_range_.end() ?
+ init_time_ : last_replica_decay_by_range_[table_id].second;
+ double secs_since_last_decay = (now - last_decay).ToSeconds();
+ recent_replicas_by_range_[table_id].second *= pow(0.5, secs_since_last_decay
/ kHalflifeSecs);
+
+ // If sufficiently small, reset down to 0 to take advantage of the fast path
above.
+ if (recent_replicas_by_range_[table_id].second < 1e-12) {
+ recent_replicas_by_range_[table_id].second = 0;
+ }
+ last_replica_decay_by_range_[table_id].second = now;
+}
+
void TSDescriptor::IncrementRecentReplicaCreations() {
std::lock_guard<rw_spinlock> l(lock_);
DecayRecentReplicaCreationsUnlocked();
recent_replica_creations_ += 1;
}
+void TSDescriptor::IncrementRecentReplicaCreationsByRangeAndTable(const
string& range_key_start,
+ const
string& table_id) {
+ std::lock_guard<rw_spinlock> l(lock_);
+ DecayRecentReplicaCreationsByRangeUnlocked(range_key_start, table_id);
+ DecayRecentReplicaCreationsByTableUnlocked(table_id);
+ recent_replicas_by_range_[table_id].first[range_key_start]++;
+ recent_replicas_by_range_[table_id].second++;
+}
+
double TSDescriptor::RecentReplicaCreations() {
// NOTE: not a shared lock because of the "Decay" side effect.
std::lock_guard<rw_spinlock> l(lock_);
@@ -211,6 +284,22 @@ double TSDescriptor::RecentReplicaCreations() {
return recent_replica_creations_;
}
+double TSDescriptor::RecentReplicaCreationsByRange(const string&
range_key_start,
+ const string& table_id) {
+ // NOTE: not a shared lock because of the "Decay" side effect.
+ std::lock_guard<rw_spinlock> l(lock_);
+ DecayRecentReplicaCreationsByRangeUnlocked(range_key_start, table_id);
+ DecayRecentReplicaCreationsByTableUnlocked(table_id);
+ return recent_replicas_by_range_[table_id].first[range_key_start];
+}
+
+double TSDescriptor::RecentReplicaCreationsByTable(const string& table_id) {
+ // NOTE: not a shared lock because of the "Decay" side effect.
+ std::lock_guard<rw_spinlock> l(lock_);
+ DecayRecentReplicaCreationsByTableUnlocked(table_id);
+ return recent_replicas_by_range_[table_id].second;
+}
+
Status TSDescriptor::GetRegistration(ServerRegistrationPB* reg,
bool use_external_addr) const {
shared_lock<rw_spinlock> l(lock_);
diff --git a/src/kudu/master/ts_descriptor.h b/src/kudu/master/ts_descriptor.h
index ca78eb7b3..667b9a6a1 100644
--- a/src/kudu/master/ts_descriptor.h
+++ b/src/kudu/master/ts_descriptor.h
@@ -59,9 +59,34 @@ namespace master {
class TSInfoPB;
-// Map of dimension -> tablets number.
+// Map of dimension -> number of tablets.
typedef std::unordered_map<std::string, int32_t> TabletNumByDimensionMap;
+// For a table, a map of range start key -> number of tablets for that range.
+typedef std::unordered_map<std::string, int32_t> TabletNumByRangeMap;
+
+// Map of table id -> number of tablets by each range for that table.
+typedef std::unordered_map<std::string, TabletNumByRangeMap>
TabletNumByRangePerTableMap;
+
+// For a table, a pair of map of range start key -> number of times this range
has
+// had a replica selected on this tablet server and the number of times the
table
+// has had a replica selected on this tablet server.
+typedef std::pair<std::unordered_map<std::string, double>, double>
RecentReplicaByRangesPerTable;
+
+// Map of table id -> number of times each range in the table and
+// the table itself has had a replica selected on this tablet server.
+typedef std::unordered_map<std::string, RecentReplicaByRangesPerTable>
RecentReplicasByTable;
+
+// For a table, a pair of map of range start key -> decay since last replica
of this range
+// has had a replica selected on this tablet server and the decay since last
replica of the
+// table has had a replica selected on this tablet server.
+typedef std::pair<std::unordered_map<std::string, MonoTime>, MonoTime>
+ LastReplicaDecayByRangesPerTable;
+
+// Map of table id -> decay since last replica of each range in the table and
+// the table itself has had a replica selected on this tablet server.
+typedef std::unordered_map<std::string, LastReplicaDecayByRangesPerTable>
LastReplicaDecayByTable;
+
// Master-side view of a single tablet server.
//
// Tracks the last heartbeat, status, instance identifier, location, etc.
@@ -129,12 +154,33 @@ class TSDescriptor : public
enable_make_shared<TSDescriptor> {
// server. This value will automatically decay over time.
void IncrementRecentReplicaCreations();
+ // Increment the accounting of the number of replicas from the range
'range_key_start'
+ // from the table 'table_id' recently created on this server. Also
increments the accounting of
+ // the number of replicas from 'table_id' recently created on this server.
+ // These values will automatically decay over time.
+ void IncrementRecentReplicaCreationsByRangeAndTable(const std::string&
range_key_start,
+ const std::string&
table_id);
+
// Return the number of replicas which have recently been created on this
// TS. This number is incremented when replicas are placed on the TS, and
// then decayed over time. This method is not 'const' because each call
// actually performs the time-based decay.
double RecentReplicaCreations();
+ // Return the number of replicas from the range identified by
'range_key_start' from the
+ // table 'table_id' which have recently been created on this TS. This number
is incremented
+ // when replicas from this range are placed on the TS, and then decayed over
time. This method
+ // is not 'const' because each call actually performs the time-based decay.
+ double RecentReplicaCreationsByRange(const std::string& range_key_start,
+ const std::string& table_id);
+
+ // Return the number of replicas from the table identified by
+ // 'table_id' which have recently been created on this TS. This number is
+ // incremented when replicas from this table are placed on the TS, and then
+ // decayed over time. This method is not 'const' because
+ // each call actually performs the time-based decay.
+ double RecentReplicaCreationsByTable(const std::string& table_id);
+
// Set the number of live replicas (i.e. running or bootstrapping).
void set_num_live_replicas(int n) {
DCHECK_GE(n, 0);
@@ -148,7 +194,15 @@ class TSDescriptor : public
enable_make_shared<TSDescriptor> {
num_live_tablets_by_dimension_ = std::move(num_live_tablets_by_dimension);
}
- // Return the number of live replicas (i.e running or bootstrapping).
+ // Set the number of live replicas per range for each table.
+ void set_num_live_replicas_by_range_per_table(
+ std::string table_id,
+ TabletNumByRangeMap num_live_tablets_by_table) {
+ std::lock_guard<rw_spinlock> l(lock_);
+ num_live_tablets_by_range_per_table_.emplace(table_id,
num_live_tablets_by_table);
+ }
+
+ // Return the number of live replicas (i.e. running or bootstrapping).
// If dimension is none, return the total number of replicas in the tablet
server.
// Otherwise, return the number of replicas in the dimension.
int num_live_replicas(const std::optional<std::string>& dimension =
std::nullopt) const {
@@ -163,6 +217,31 @@ class TSDescriptor : public
enable_make_shared<TSDescriptor> {
return num_live_replicas_;
}
+ // Return the number of live replicas (i.e. running or bootstrapping)
+ // in the given range for the given table.
+ int num_live_replicas_by_range(const std::string& range_key, const
std::string& table_id) const {
+ shared_lock<rw_spinlock> l(lock_);
+ int32_t num_live_tablets_by_range = 0;
+ if (ContainsKey(num_live_tablets_by_range_per_table_, table_id)) {
+ auto ranges = FindOrDie(num_live_tablets_by_range_per_table_, table_id);
+ ignore_result(FindCopy(ranges, range_key, &num_live_tablets_by_range));
+ }
+ return num_live_tablets_by_range;
+ }
+
+ // Return the number of live replicas (i.e. running or bootstrapping) in the
given table.
+ int num_live_replicas_by_table(const std::string& table_id) const {
+ shared_lock<rw_spinlock> l(lock_);
+ int32_t num_live_tablets_by_table = 0;
+ if (ContainsKey(num_live_tablets_by_range_per_table_, table_id)) {
+ auto ranges = FindOrDie(num_live_tablets_by_range_per_table_, table_id);
+ for (const auto& range : ranges) {
+ num_live_tablets_by_table += range.second;
+ }
+ }
+ return num_live_tablets_by_table;
+ }
+
// Return the location of the tablet server. This returns a safe copy
// since the location could change at any time if the tablet server
// re-registers.
@@ -190,6 +269,11 @@ class TSDescriptor : public
enable_make_shared<TSDescriptor> {
void DecayRecentReplicaCreationsUnlocked();
+ void DecayRecentReplicaCreationsByRangeUnlocked(const std::string&
range_start_key,
+ const std::string& table_id);
+
+ void DecayRecentReplicaCreationsByTableUnlocked(const std::string&
range_start_key);
+
void AssignLocationForTesting(std::string loc) {
location_ = std::move(loc);
}
@@ -210,12 +294,24 @@ class TSDescriptor : public
enable_make_shared<TSDescriptor> {
double recent_replica_creations_;
MonoTime last_replica_creations_decay_;
+ // A map that contains the number of times this tablet server has recently
been selected to
+ // create a tablet replica per range and table. This value decays back to 0
over time.
+ RecentReplicasByTable recent_replicas_by_range_;
+ LastReplicaDecayByTable last_replica_decay_by_range_;
+
+ // The time this tablet server was started, used to calculate decay for
+ // recent replica creations by range and table.
+ MonoTime init_time_;
+
// The number of live replicas on this host, from the last heartbeat.
int num_live_replicas_;
// The number of live replicas in each dimension, from the last heartbeat.
std::optional<TabletNumByDimensionMap> num_live_tablets_by_dimension_;
+ // The number of live replicas for each range for each table on this host
from the last heartbeat.
+ TabletNumByRangePerTableMap num_live_tablets_by_range_per_table_;
+
// The tablet server's location, as determined by the master at registration.
std::optional<std::string> location_;
diff --git a/src/kudu/tools/rebalancer_tool-test.cc
b/src/kudu/tools/rebalancer_tool-test.cc
index 2a2815125..597ef71b7 100644
--- a/src/kudu/tools/rebalancer_tool-test.cc
+++ b/src/kudu/tools/rebalancer_tool-test.cc
@@ -29,6 +29,7 @@
#include <string>
#include <thread>
#include <tuple>
+#include <type_traits>
#include <unordered_map>
#include <unordered_set>
#include <utility>
@@ -186,6 +187,7 @@ TEST_P(RebalanceStartCriteriaTest, TabletServerIsDown) {
const bool is_343_scheme = (GetParam() == Kudu1097::Enable);
const vector<string> kMasterFlags = {
Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
+ "--enable_range_replica_placement=false",
};
const vector<string> kTserverFlags = {
Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
@@ -241,6 +243,7 @@ TEST_P(RebalanceStartCriteriaTest, UnknownIgnoredTServer) {
const bool is_343_scheme = (GetParam() == Kudu1097::Enable);
const vector<string> kMasterFlags = {
Substitute("--raft_prepare_replacement_before_eviction=$0",
is_343_scheme),
+ "--enable_range_replica_placement=false",
};
const vector<string> kTserverFlags = {
Substitute("--raft_prepare_replacement_before_eviction=$0",
is_343_scheme),
@@ -283,6 +286,7 @@ TEST_P(RebalanceStartCriteriaTest,
TabletServerInMaintenanceMode) {
const bool is_343_scheme = (GetParam() == Kudu1097::Enable);
const vector<string> kMasterFlags = {
Substitute("--raft_prepare_replacement_before_eviction=$0",
is_343_scheme),
+ "--enable_range_replica_placement=false",
};
const vector<string> kTserverFlags = {
Substitute("--raft_prepare_replacement_before_eviction=$0",
is_343_scheme),
@@ -355,6 +359,7 @@ TEST_P(RebalanceStartSafetyTest, TooManyIgnoredTservers) {
const bool is_343_scheme = (GetParam() == Kudu1097::Enable);
const vector<string> kMasterFlags = {
Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
+ "--enable_range_replica_placement=false",
};
const vector<string> kTserverFlags = {
Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
@@ -413,7 +418,10 @@ TEST_F(RebalanceIgnoredTserversTest, Basic) {
FLAGS_num_tablet_servers = 5;
// Start a cluster with a single tablet.
- NO_FATALS(BuildAndStart());
+ const vector<string> kMasterFlags = {
+ "--enable_range_replica_placement=false",
+ };
+ NO_FATALS(BuildAndStart({ }, kMasterFlags));
// Pre-condition: all replicas on ignored tservers should be healthy or
recovering.
// Here we just ensure the cluster is healthy.
@@ -625,6 +633,7 @@ TEST_P(RebalanceParamTest, Rebalance) {
const auto timeout = MonoDelta::FromSeconds(30);
const vector<string> kMasterFlags = {
"--allow_unsafe_replication_factor",
+ "--enable_range_replica_placement=false",
Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
Substitute("--tserver_unresponsive_timeout_ms=$0", kTserverUnresponsiveMs),
};
@@ -738,6 +747,7 @@ class RebalancingTest : public
tserver::TabletServerIntegrationTestBase {
master_flags_ = {
Substitute("--tserver_unresponsive_timeout_ms=$0",
tserver_unresponsive_ms_),
+ "--enable_range_replica_placement=false",
};
}
@@ -764,6 +774,7 @@ class RebalancingTest : public
tserver::TabletServerIntegrationTestBase {
"--raft_prepare_replacement_before_eviction=$0", is_343_scheme());
master_flags_.push_back(scheme_flag);
tserver_flags_.push_back(scheme_flag);
+ master_flags_.push_back("--enable_range_replica_placement=false");
copy(extra_tserver_flags.begin(), extra_tserver_flags.end(),
back_inserter(tserver_flags_));
@@ -1631,6 +1642,7 @@ TEST_P(RebalancerAndSingleReplicaTablets,
SingleReplicasStayOrMove) {
const vector<string> master_flags = {
Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
Substitute("--tserver_unresponsive_timeout_ms=$0", kTserverUnresponsiveMs),
+ "--enable_range_replica_placement=false",
};
const vector<string> tserver_flags = {
Substitute("--raft_prepare_replacement_before_eviction=$0", is_343_scheme),
@@ -2227,6 +2239,7 @@ TEST_F(IntraLocationRebalancingBasicTest,
LocationsWithEmptyTabletServers) {
{ "/D", 2 } };
const vector<string>& extra_master_flags = {
"--master_client_location_assignment_enabled=false",
+ "--enable_range_replica_placement=false",
};
copy(extra_master_flags.begin(), extra_master_flags.end(),
diff --git a/src/kudu/tserver/heartbeater.cc b/src/kudu/tserver/heartbeater.cc
index ccc116fac..4828a38ce 100644
--- a/src/kudu/tserver/heartbeater.cc
+++ b/src/kudu/tserver/heartbeater.cc
@@ -26,6 +26,7 @@
#include <optional>
#include <ostream>
#include <string>
+#include <tuple>
#include <type_traits>
#include <unordered_map>
#include <unordered_set>
@@ -506,6 +507,19 @@ Status Heartbeater::Thread::DoHeartbeat(MasterErrorPB*
error,
auto num_live_tablets_by_dimension =
server_->tablet_manager()->GetNumLiveTabletsByDimension();
req.mutable_num_live_tablets_by_dimension()->insert(num_live_tablets_by_dimension.begin(),
num_live_tablets_by_dimension.end());
+ auto num_live_tablets_by_range_and_table =
+ server_->tablet_manager()->GetNumLiveTabletsByRangePerTable();
+ for (const auto& [table, ranges] : num_live_tablets_by_range_and_table) {
+ master::TabletsByRangePerTablePB table_pb;
+ table_pb.mutable_num_live_tablets_by_range()->Reserve(ranges.size());
+ for (const auto& range : ranges) {
+ master::TabletsByRangePB* range_pb =
table_pb.add_num_live_tablets_by_range();
+ range_pb->set_range_start_key(range.first);
+ range_pb->set_tablets(range.second);
+ }
+ auto pair = google::protobuf::MapPair(table, table_pb);
+ req.mutable_num_live_tablets_by_range_per_table()->insert(pair);
+ }
VLOG(2) << "Sending heartbeat:\n" << SecureDebugString(req);
master::TSHeartbeatResponsePB resp;
diff --git a/src/kudu/tserver/ts_tablet_manager.cc
b/src/kudu/tserver/ts_tablet_manager.cc
index 63282cd01..fa5c9c9bf 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -33,6 +33,7 @@
#include <glog/logging.h>
#include "kudu/common/common.pb.h"
+#include "kudu/common/partition.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.pb.h"
@@ -1634,6 +1635,21 @@ TabletNumByDimensionMap
TSTabletManager::GetNumLiveTabletsByDimension() const {
return result;
}
+TabletNumByRangePerTableMap
TSTabletManager::GetNumLiveTabletsByRangePerTable() const {
+ TabletNumByRangePerTableMap result;
+ shared_lock<RWMutex> l(lock_);
+ for (const auto& entry : tablet_map_) {
+ tablet::TabletStatePB state = entry.second->state();
+ if (state == tablet::BOOTSTRAPPING ||
+ state == tablet::RUNNING) {
+ const string& table_id = entry.second->tablet_metadata()->table_id();
+ const string& range_key =
entry.second->tablet_metadata()->partition().begin().range_key();
+ result[table_id][range_key]++;
+ }
+ }
+ return result;
+}
+
void TSTabletManager::InitLocalRaftPeerPB() {
DCHECK_EQ(state(), MANAGER_INITIALIZING);
local_peer_pb_.set_permanent_uuid(fs_manager_->uuid());
diff --git a/src/kudu/tserver/ts_tablet_manager.h
b/src/kudu/tserver/ts_tablet_manager.h
index e088fdabe..865441f7f 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -82,9 +82,15 @@ class TabletServer;
// Map of tablet id -> transition reason string.
typedef std::unordered_map<std::string, std::string> TransitionInProgressMap;
-// Map of dimension -> tablets number.
+// Map of dimension -> number of tablets.
typedef std::unordered_map<std::string, int32_t> TabletNumByDimensionMap;
+// Map of range start key -> number of tablets in that range.
+typedef std::unordered_map<std::string, int32_t> TabletNumByRangeMap;
+
+// Map of table id -> number of tablets by each range in that table.
+typedef std::unordered_map<std::string, TabletNumByRangeMap>
TabletNumByRangePerTableMap;
+
class TransitionInProgressDeleter;
// Keeps track of the tablets hosted on the tablet server side.
@@ -114,7 +120,7 @@ class TSTabletManager : public
tserver::TabletReplicaLookupIf {
// the first tablet whose bootstrap failed.
Status WaitForAllBootstrapsToFinish();
- // Shut down all of the tablets, gracefully flushing before shutdown.
+ // Shut down all the tablets, gracefully flushing before shutdown.
void Shutdown();
// Create a new tablet and register it with the tablet manager. The new
tablet
@@ -192,7 +198,7 @@ class TSTabletManager : public
tserver::TabletReplicaLookupIf {
void PopulateIncrementalTabletReport(master::TabletReportPB* report,
const std::vector<std::string>&
tablet_ids) const;
- // Get all of the tablets currently hosted on this server.
+ // Get all the tablets currently hosted on this server.
void GetTabletReplicas(
std::vector<scoped_refptr<tablet::TabletReplica>>* replicas) const
override;
@@ -212,6 +218,10 @@ class TSTabletManager : public
tserver::TabletReplicaLookupIf {
// Get the number of tablets in RUNNING or BOOTSTRAPPING state in each
dimension.
TabletNumByDimensionMap GetNumLiveTabletsByDimension() const;
+ // Get the number of tablets in RUNNING or BOOTSTRAPPING state in each range
for each table.
+ // TODO(mreddy) Include tablets in INITIALIZED state, but need to check if
it's tombstoned.
+ TabletNumByRangePerTableMap GetNumLiveTabletsByRangePerTable() const;
+
Status RunAllLogGC();
// Delete the tablet using the specified delete_type as the final metadata