This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 10efaf2c7 [master] KUDU-3390 support auto rebalance tablet leaders
across TServers
10efaf2c7 is described below
commit 10efaf2c77dfe5e4474505e0267c583c011703be
Author: shenxingwuying <[email protected]>
AuthorDate: Tue Mar 15 16:49:23 2022 +0800
[master] KUDU-3390 support auto rebalance tablet leaders across TServers
The number of leader replicas per tablet server can become imbalanced
over time, which lead to load skew on some nodes. This patch adds
auto leader rebalance task to avoid leader replicas skew.
Two reasons of load skew:
- The main reason. Scan Requests has two modes: LeaderOnly(default) and
CLOSEST_REPLICA.
For more accurate results, users will choose the LeaderOnly(default) mode.
Mostly, the scan load is positive correlation with leader numbers.
- The other reason. Write requests, leaders receive write requests and
followers receive
UpdateConsensus request(raft called AppendEntries), the flow of processing
is a little
different: the queue, pipeline, batch size, that may cause imbalanced load.
Leader
rebalance will make leader and followers balanced and make service more
stable.
An experiment present leader rebalance's effect (more details at KUDU-3390):
|leader ratio| scan cost | cpu usage | io
before|40: 0: 0 | 811.586 s | 47%, 18%, 19% | 102MB/s ioutil:55%, 8KB/s
2%, 64KB/s 3%
after |13: 14: 13 | 611.012 s | 39%, 45%, 35% | 53MB/s ioutil:31%, 80MB/s
18%, 45MB/s 24%
Change-Id: Ibfb60d8759a93b6a19238637c27df4f6b1cac918
Reviewed-on: http://gerrit.cloudera.org:8080/18454
Reviewed-by: Yuqi Du <[email protected]>
Reviewed-by: Yingchun Lai <[email protected]>
Tested-by: Kudu Jenkins
---
src/kudu/master/CMakeLists.txt | 2 +
src/kudu/master/auto_leader_rebalancer-test.cc | 207 +++++++++++++
src/kudu/master/auto_leader_rebalancer.cc | 404 +++++++++++++++++++++++++
src/kudu/master/auto_leader_rebalancer.h | 108 +++++++
src/kudu/master/auto_rebalancer-test.cc | 83 ++++-
src/kudu/master/auto_rebalancer.h | 1 +
src/kudu/master/catalog_manager.cc | 30 ++
src/kudu/master/catalog_manager.h | 16 +-
8 files changed, 836 insertions(+), 15 deletions(-)
diff --git a/src/kudu/master/CMakeLists.txt b/src/kudu/master/CMakeLists.txt
index 5dd8bae86..7dfd6a0f2 100644
--- a/src/kudu/master/CMakeLists.txt
+++ b/src/kudu/master/CMakeLists.txt
@@ -51,6 +51,7 @@ ADD_EXPORTABLE_LIBRARY(txn_manager_proto
set(MASTER_SRCS
authz_provider.cc
auto_rebalancer.cc
+ auto_leader_rebalancer.cc
catalog_manager.cc
hms_notification_log_listener.cc
location_cache.cc
@@ -107,6 +108,7 @@ SET_KUDU_TEST_LINK_LIBS(
mini_kdc)
ADD_KUDU_TEST(auto_rebalancer-test)
+ADD_KUDU_TEST(auto_leader_rebalancer-test)
ADD_KUDU_TEST(catalog_manager-test)
ADD_KUDU_TEST(dynamic_multi_master-test NUM_SHARDS 6)
ADD_KUDU_TEST(hms_notification_log_listener-test)
diff --git a/src/kudu/master/auto_leader_rebalancer-test.cc
b/src/kudu/master/auto_leader_rebalancer-test.cc
new file mode 100644
index 000000000..8ff1d0d0e
--- /dev/null
+++ b/src/kudu/master/auto_leader_rebalancer-test.cc
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/master/auto_leader_rebalancer.h"
+
+#include <cstdint>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <type_traits>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/master/catalog_manager.h"
+#include "kudu/master/master.h"
+#include "kudu/master/mini_master.h"
+#include "kudu/master/ts_descriptor.h"
+#include "kudu/master/ts_manager.h"
+#include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+namespace master {
+class AutoRebalancerTask;
+} // namespace master
+} // namespace kudu
+
+
+using kudu::cluster::InternalMiniCluster;
+using kudu::cluster::InternalMiniClusterOptions;
+using std::string;
+using std::unique_ptr;
+
+DECLARE_bool(auto_leader_rebalancing_enabled);
+DECLARE_bool(auto_rebalancing_enabled);
+DECLARE_int32(heartbeat_interval_ms);
+DECLARE_uint32(auto_leader_rebalancing_interval_seconds);
+DECLARE_uint32(auto_rebalancing_interval_seconds);
+DECLARE_uint32(auto_rebalancing_max_moves_per_server);
+DECLARE_uint32(auto_rebalancing_wait_for_replica_moves_seconds);
+DECLARE_uint32(leader_rebalancing_max_moves_per_round);
+
+namespace kudu {
+namespace master {
+
+enum class BalanceThreadType { REPLICA_REBALANCE, LEADER_REBALANCE };
+
+class LeaderRebalancerTest : public KuduTest {
+ public:
+ Status CreateAndStartCluster() {
+ // Disable replica rebalancing, we'll do it manually
+ FLAGS_auto_rebalancing_enabled = true;
+ FLAGS_auto_rebalancing_interval_seconds = 1; // Shorten for
testing.
+ FLAGS_auto_rebalancing_wait_for_replica_moves_seconds = 0; // Shorten for
testing.
+ // Disable leader rebalancing, we'll do it manually.
+ FLAGS_auto_leader_rebalancing_enabled = false;
+ cluster_.reset(new InternalMiniCluster(env_, cluster_opts_));
+ return cluster_->Start();
+ }
+
+ void CreateWorkloadTable(int num_tablets, int num_replicas) {
+ workload_.reset(new TestWorkload(cluster_.get()));
+ workload_->set_num_tablets(num_tablets);
+ workload_->set_num_replicas(num_replicas);
+ workload_->Setup();
+ }
+
+ void TearDown() override {
+ if (cluster_) {
+ cluster_->Shutdown();
+ }
+ KuduTest::TearDown();
+ }
+
+ std::string table_name() { return workload_->table_name(); }
+
+ Status CheckLeaderBalance() {
+ // Leader master
+ master::Master* master = cluster_->mini_master()->master();
+ master::CatalogManager* catalog_manager = master->catalog_manager();
+ scoped_refptr<master::TableInfo> table_info;
+ {
+ CatalogManager::ScopedLeaderSharedLock leaderlock(catalog_manager);
+ catalog_manager->GetTableInfoByName(table_name(), &table_info);
+ }
+
+ TSDescriptorVector descriptors;
+ master->ts_manager()->GetAllDescriptors(&descriptors);
+
+ std::vector<std::string> tserver_uuids;
+ for (const auto& e : descriptors) {
+ if (e->PresumedDead()) {
+ continue;
+ }
+ tserver_uuids.emplace_back(e->permanent_uuid());
+ }
+
+ return
catalog_manager->auto_leader_rebalancer()->RunLeaderRebalanceForTable(
+ table_info, tserver_uuids,
AutoLeaderRebalancerTask::ExecuteMode::TEST);
+ }
+
+ protected:
+ unique_ptr<InternalMiniCluster> cluster_;
+ InternalMiniClusterOptions cluster_opts_;
+ unique_ptr<TestWorkload> workload_;
+};
+
+// Create a cluster, and create a table,
+// whether tablets is balanced, which decided by creating table process.
+// Bring up another tserver, the table is not balanced and leaders is also
+// not balanced. We verify that moves are scheduled,
+// since the cluster is no longer balanced.
+TEST_F(LeaderRebalancerTest, AddTserver) {
+ const int kNumTServers = 3;
+ const int kNumTablets = 59;
+
+ cluster_opts_.num_tablet_servers = kNumTServers;
+ FLAGS_leader_rebalancing_max_moves_per_round = 5;
+ ASSERT_OK(CreateAndStartCluster());
+
+ CreateWorkloadTable(kNumTablets, /*num_replicas*/ 3);
+
+ // Add a tablet server and verify that the master schedules some moves, and
+ // the tablet servers copy bytes as appropriate.
+ ASSERT_OK(cluster_->AddTabletServer());
+ int tserver_size = cluster_->num_tablet_servers();
+ LOG(INFO) << "add a tserver: " << cluster_->mini_tablet_server(tserver_size
- 1)->uuid();
+
+ // Leader master
+ master::Master* master = cluster_->mini_master()->master();
+
+ master::AutoRebalancerTask* replica_rebalancer =
master->catalog_manager()->auto_rebalancer();
+ master::AutoLeaderRebalancerTask* leader_rebalancer =
+ master->catalog_manager()->auto_leader_rebalancer();
+ ASSERT_NE(replica_rebalancer, nullptr);
+ ASSERT_NE(leader_rebalancer, nullptr);
+
+ // To wait replica_rebalancer execute some runs and reach balanced.
+ SleepFor(MonoDelta::FromSeconds(20 *
FLAGS_auto_rebalancing_interval_seconds));
+ constexpr const int32_t retries = 40;
+ for (int i = 0; i < retries; i++) {
+ leader_rebalancer->RunLeaderRebalancer();
+ if (CheckLeaderBalance().ok()) {
+ break;
+ }
+ SleepFor(MonoDelta::FromMilliseconds(FLAGS_heartbeat_interval_ms));
+ }
+
+ ASSERT_OK(CheckLeaderBalance());
+}
+
+TEST_F(LeaderRebalancerTest, RestartTserver) {
+ const int kNumTServers = 4;
+ const int kNumTablets = 59;
+ cluster_opts_.num_tablet_servers = kNumTServers;
+ FLAGS_leader_rebalancing_max_moves_per_round = 5;
+ ASSERT_OK(CreateAndStartCluster());
+
+ CreateWorkloadTable(kNumTablets, /*num_replicas*/ 3);
+
+ // Leader master
+ master::Master* master = cluster_->mini_master()->master();
+
+ master::AutoRebalancerTask* replica_rebalancer =
master->catalog_manager()->auto_rebalancer();
+ master::AutoLeaderRebalancerTask* leader_rebalancer =
+ master->catalog_manager()->auto_leader_rebalancer();
+ ASSERT_NE(replica_rebalancer, nullptr);
+ ASSERT_NE(leader_rebalancer, nullptr);
+
+ cluster_->mini_tablet_server(0)->Restart();
+ // To wait replica_rebalancer execute some runs and reach balanced.
+ SleepFor(MonoDelta::FromSeconds(10 *
FLAGS_auto_rebalancing_interval_seconds));
+ constexpr const int32_t retries = 20;
+ for (int i = 0; i < retries; i++) {
+ leader_rebalancer->RunLeaderRebalancer();
+ if (CheckLeaderBalance().ok()) {
+ break;
+ }
+ SleepFor(MonoDelta::FromMilliseconds(FLAGS_heartbeat_interval_ms));
+ }
+ ASSERT_OK(CheckLeaderBalance());
+}
+
+} // namespace master
+} // namespace kudu
diff --git a/src/kudu/master/auto_leader_rebalancer.cc
b/src/kudu/master/auto_leader_rebalancer.cc
new file mode 100644
index 000000000..b56ca737a
--- /dev/null
+++ b/src/kudu/master/auto_leader_rebalancer.cc
@@ -0,0 +1,404 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/master/auto_leader_rebalancer.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <functional>
+#include <map>
+#include <optional>
+#include <ostream>
+#include <string>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/common/wire_protocol.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/consensus.proxy.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/master/catalog_manager.h"
+#include "kudu/master/master.pb.h"
+#include "kudu/master/ts_descriptor.h"
+#include "kudu/master/ts_manager.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/util/cow_object.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+using kudu::consensus::ConsensusServiceProxy;
+using kudu::consensus::LeaderStepDownMode;
+using kudu::consensus::LeaderStepDownRequestPB;
+using kudu::consensus::LeaderStepDownResponsePB;
+using kudu::consensus::RaftPeerPB;
+using kudu::rpc::MessengerBuilder;
+using kudu::rpc::RpcController;
+using std::map;
+using std::nullopt;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+DEFINE_uint32(auto_leader_rebalancing_rpc_timeout_seconds, 10,
+ "auto leader rebalancing send leader step down rpc timeout
seconds");
+TAG_FLAG(auto_leader_rebalancing_rpc_timeout_seconds, advanced);
+TAG_FLAG(auto_leader_rebalancing_rpc_timeout_seconds, runtime);
+
+DEFINE_uint32(auto_leader_rebalancing_interval_seconds, 3600,
+ "How long to sleep in between auto leader rebalancing cycles,
before checking "
+ "the cluster again to see if there is leader skew and if run
task again.");
+TAG_FLAG(auto_leader_rebalancing_interval_seconds, advanced);
+TAG_FLAG(auto_leader_rebalancing_interval_seconds, runtime);
+
+DEFINE_uint32(leader_rebalancing_max_moves_per_round, 10,
+ "Max count of leader transfer when every leader rebalance runs");
+TAG_FLAG(leader_rebalancing_max_moves_per_round, advanced);
+TAG_FLAG(leader_rebalancing_max_moves_per_round, runtime);
+
+DECLARE_bool(auto_leader_rebalancing_enabled);
+
+namespace kudu {
+namespace master {
+
+AutoLeaderRebalancerTask::AutoLeaderRebalancerTask(CatalogManager*
catalog_manager,
+ TSManager* ts_manager)
+ : catalog_manager_(catalog_manager),
+ ts_manager_(ts_manager),
+ shutdown_(1),
+ random_generator_(random_device_()),
+ number_of_loop_iterations_for_test_(0),
+ moves_scheduled_this_round_for_test_(0) {}
+
+AutoLeaderRebalancerTask::~AutoLeaderRebalancerTask() { Shutdown(); }
+
+Status AutoLeaderRebalancerTask::Init() {
+ DCHECK(!thread_) << "AutoleaderRebalancerTask is already initialized";
+ RETURN_NOT_OK(MessengerBuilder("auto-leader-rebalancer").Build(&messenger_));
+ return Thread::Create("catalog manager", "auto-leader-rebalancer",
+ [this]() { this->RunLoop(); }, &thread_);
+}
+
+void AutoLeaderRebalancerTask::Shutdown() {
+ CHECK(thread_) << "AutoLeaderRebalancerTask is not initialized";
+ if (!shutdown_.CountDown()) {
+ return;
+ }
+ CHECK_OK(ThreadJoiner(thread_.get()).Join());
+ thread_.reset();
+}
+
+Status AutoLeaderRebalancerTask::RunLeaderRebalanceForTable(
+ const scoped_refptr<TableInfo>& table_info,
+ const vector<string>& tserver_uuids,
+ AutoLeaderRebalancerTask::ExecuteMode mode) {
+ LOG(INFO) << Substitute("leader rebalance for table $0",
table_info->table_name());
+ TableMetadataLock table_l(table_info.get(), LockMode::READ);
+ const SysTablesEntryPB& table_data = table_info->metadata().state().pb;
+ int replication_factor = table_data.num_replicas();
+ DCHECK_GT(replication_factor, 0);
+ if (table_data.state() == SysTablesEntryPB::REMOVED || replication_factor ==
1) {
+ // Don't worry about rebalancing replicas that belong to deleted tables.
+ return Status::OK();
+ }
+
+ // tablet_id -> leader‘s tserver uuid
+ map<string, string> leader_ts_uuid_by_tablet_id;
+ // tablet_id -> followers' tserver uuids
+ map<string, vector<string>> follower_ts_uuids_by_tablet_id;
+ // tserver uuid -> leaders' replicas
+ map<string, vector<string>> leader_tablet_ids_by_ts_uuid;
+ // tserver uuid -> all replicas
+ map<string, vector<string>> tablet_ids_by_ts_uuid;
+
+ map<string, HostPort> host_port_by_leader_ts_uuid;
+
+ vector<scoped_refptr<TabletInfo>> tablet_infos;
+ table_info->GetAllTablets(&tablet_infos);
+
+ // step 1. Get basic statistics
+ for (const auto& tablet : tablet_infos) {
+ TabletMetadataLock tablet_l(tablet.get(), LockMode::READ);
+
+ // Retrieve all replicas of the tablet.
+ TabletLocationsPB locs_pb;
+ CatalogManager::TSInfosDict ts_infos_dict;
+
+ {
+ CatalogManager::ScopedLeaderSharedLock leaderlock(catalog_manager_);
+ RETURN_NOT_OK(leaderlock.first_failed_status());
+ // This will only return tablet replicas in the RUNNING state, and
+ // filter to only retrieve voter replicas.
+ RETURN_NOT_OK(catalog_manager_->GetTabletLocations(
+ tablet->id(), ReplicaTypeFilter::VOTER_REPLICA, &locs_pb,
&ts_infos_dict, nullopt));
+ }
+
+ // Build a summary for each replica of the tablet.
+ for (const auto& r : locs_pb.interned_replicas()) {
+ int index = r.ts_info_idx();
+ const TSInfoPB& ts_info = *(ts_infos_dict.ts_info_pbs()[index]);
+ string uuid = ts_info.permanent_uuid();
+ if (r.role() == RaftPeerPB::LEADER) {
+ auto& leader_uuids = LookupOrInsert(&leader_tablet_ids_by_ts_uuid,
uuid, {});
+ leader_uuids.emplace_back(tablet->id());
+ InsertOrDie(&leader_ts_uuid_by_tablet_id, tablet->id(), uuid);
+ InsertIfNotPresent(
+ &host_port_by_leader_ts_uuid, uuid,
HostPortFromPB(ts_info.rpc_addresses(0)));
+ } else if (r.role() == RaftPeerPB::FOLLOWER) {
+ auto& follower_uuids = LookupOrInsert(&follower_ts_uuids_by_tablet_id,
tablet->id(), {});
+ follower_uuids.emplace_back(uuid);
+ } else {
+ LOG(WARNING) << Substitute("table_id $0, permanent_uuid $1, not a
VOTER, role: $2",
+ tablet->id(),
+ uuid,
+ RaftPeerPB::Role_Name(r.role()));
+ continue;
+ }
+
+ auto& uuid_replicas = LookupOrInsert(&tablet_ids_by_ts_uuid,
ts_info.permanent_uuid(), {});
+ uuid_replicas.emplace_back(tablet->id());
+ }
+ }
+
+ // step 2.
+ // pick the servers which number of leaders greater than 1/3 of number of
all replicas
+ // <uuid, number of replica, number of leader>
+ map<string, std::pair<int32_t, int32_t>> replica_and_leader_count_by_ts_uuid;
+ // uuid->leader should transfer count
+ map<string, int32_t> leader_transfer_source;
+ for (const auto& uuid : tserver_uuids) {
+ auto* tablet_ids_ptr = FindOrNull(tablet_ids_by_ts_uuid, uuid);
+ uint32_t replica_count = tablet_ids_ptr ? tablet_ids_ptr->size() : 0;
+ if (replica_count == 0) {
+ // means no replicas (and no leaders), maybe a tserver joined kudu
cluster just now, skip it
+ continue;
+ }
+ auto* leader_tablet_ids_ptr = FindOrNull(leader_tablet_ids_by_ts_uuid,
uuid);
+ uint32_t leader_count = leader_tablet_ids_ptr ?
leader_tablet_ids_ptr->size() : 0;
+ replica_and_leader_count_by_ts_uuid.insert(
+ {uuid, std::pair<int32_t, int32_t>(replica_count, leader_count)});
+ VLOG(1) << Substitute(
+ "uuid: $0, replica_count: $1, leader_count: $2", uuid, replica_count,
leader_count);
+
+ // Our target is every tserver' replicas, number of leader : number of
follower is
+ // 1 : (replica_refactor -1). The constant 1 is a coarse-grained
correction factor to help
+ // leader rebalancer to converge stable.
+ int32_t should_transfer_count = static_cast<int32_t>(leader_count) -
+ (static_cast<int32_t>(replica_count) /
replication_factor + 1);
+ if (should_transfer_count > 0) {
+ leader_transfer_source.insert({uuid, should_transfer_count});
+ VLOG(1) << Substitute("$0 should transfer leader count: $1", uuid,
should_transfer_count);
+ }
+ }
+
+ // Step 3.
+ // Generate transfer task, <tablet_id, from_uuid, to_uuid>
+ map<string, std::pair<string, string>> leader_transfer_tasks;
+ for (const auto& from_info : leader_transfer_source) {
+ string leader_uuid = from_info.first;
+ int32_t need_transfer_count = from_info.second;
+ int32_t pick_count = 0;
+ vector<string>& uuid_leaders = leader_tablet_ids_by_ts_uuid[leader_uuid];
+ std::shuffle(uuid_leaders.begin(), uuid_leaders.end(), random_generator_);
+ // This loop would generate 'uuid_leaders.size()' leader transferring
tasks at most.
+ // Every task would pick a dest uuid to transfer leader.
+ for (int i = 0; i < uuid_leaders.size(); i++) {
+ const string& tablet_id = uuid_leaders[i];
+ vector<string> uuid_followers =
follower_ts_uuids_by_tablet_id[tablet_id];
+
+ // TabletId leader transfer, pick a dest follower
+ string dest_follower_uuid;
+ if (uuid_followers.size() + 1 < replication_factor) {
+ continue;
+ }
+ double min_score = 1;
+ for (int j = 0; j < uuid_followers.size(); j++) {
+ std::pair<int32_t, int32_t>& replica_and_leader_count =
+ replica_and_leader_count_by_ts_uuid[uuid_followers[j]];
+ int32_t replica_count = replica_and_leader_count.first;
+ if (replica_count <= 0) {
+ dest_follower_uuid.clear();
+ break;
+ }
+ int32_t leader_count = replica_and_leader_count.second;
+ // double is not precise.
+ double score = static_cast<double>(leader_count) / replica_count;
+ if (score < min_score) {
+ min_score = score;
+ dest_follower_uuid = uuid_followers[j];
+ }
+ }
+ if (dest_follower_uuid.empty()) {
+ continue;
+ }
+ std::pair<int32_t, int32_t>& replica_and_leader_count =
+ replica_and_leader_count_by_ts_uuid[leader_uuid];
+ int32_t replica_count = replica_and_leader_count.first;
+ int32_t leader_count = replica_and_leader_count.second;
+ double leader_score = static_cast<double>(leader_count) / replica_count;
+ if (min_score > leader_score) {
+ // Skip it, because the transfer will cause more leader skew
+ continue;
+ }
+
+ leader_transfer_tasks.insert(
+ {tablet_id, std::pair<string, string>(leader_uuid,
dest_follower_uuid)});
+ replica_and_leader_count_by_ts_uuid[leader_uuid].second--;
+ replica_and_leader_count_by_ts_uuid[dest_follower_uuid].second++;
+ if (leader_transfer_tasks.size() >=
FLAGS_leader_rebalancing_max_moves_per_round) {
+ break;
+ }
+ if (++pick_count == need_transfer_count) {
+ // Have picked enough leader transfer tasks for this tserver.
+ break;
+ }
+ }
+ if (leader_transfer_tasks.size() >=
FLAGS_leader_rebalancing_max_moves_per_round) {
+ VLOG(1) << Substitute(
+ "leader rebalance reach the upper limit: $0, try do left leader
transfer tasks next "
+ "time", FLAGS_leader_rebalancing_max_moves_per_round);
+ }
+ }
+
+ if (mode == AutoLeaderRebalancerTask::ExecuteMode::TEST) {
+ if (!leader_transfer_tasks.empty()) {
+ return Status::IllegalState(Substitute("leader_transfer_task size should
be 0, but $0",
+ leader_transfer_tasks.size()));
+ }
+ return Status::OK();
+ }
+
+ moves_scheduled_this_round_for_test_ = leader_transfer_tasks.size();
+ VLOG(1) << Substitute("leader rebalance tasks, size: $0,
leader_transfer_source, size: $1",
+ moves_scheduled_this_round_for_test_.load(),
+ leader_transfer_source.size());
+ // Step 4. Do Leader transfer tasks.
+ // @TODO(duyuqi), optimal speed
+ // If leader rebalancing tasks is too many, each rpc of the thread wait the
response
+ // synchronously, which may be very slow.
+
+ int leader_transfer_count = 0;
+ for (const auto& task : leader_transfer_tasks) {
+ const string& leader_uuid = task.second.first;
+ LeaderStepDownRequestPB request;
+ request.set_dest_uuid(task.second.first);
+ request.set_tablet_id(task.first);
+ request.set_mode(LeaderStepDownMode::GRACEFUL);
+ request.set_new_leader_uuid(task.second.second);
+
+ LeaderStepDownResponsePB response;
+ RpcController rpc;
+
rpc.set_timeout(MonoDelta::FromSeconds(FLAGS_auto_leader_rebalancing_rpc_timeout_seconds));
+
+ auto* host_port = FindOrNull(host_port_by_leader_ts_uuid, leader_uuid);
+ if (!host_port) {
+ continue;
+ }
+ std::shared_ptr<TSDescriptor> leader_desc;
+ if (!ts_manager_->LookupTSByUUID(leader_uuid, &leader_desc)) {
+ continue;
+ }
+
+ vector<Sockaddr> resolved;
+ RETURN_NOT_OK(host_port->ResolveAddresses(&resolved));
+ ConsensusServiceProxy proxy(messenger_, resolved[0], host_port->host());
+ RETURN_NOT_OK(proxy.LeaderStepDown(request, &response, &rpc));
+ leader_transfer_count++;
+ if (!response.has_error()) {
+ VLOG(1) << Substitute("leader transfer table: $0, tablet_id: $1, from:
$2 to: $3",
+ table_data.name(),
+ task.first,
+ leader_uuid,
+ task.second.second);
+ }
+ }
+ // @TODO(duyuqi)
+ // Add metrics to replace the log.
+ VLOG(0) << Substitute("table: $0, leader rebalance finish, leader transfer
count: $1",
+ table_data.name(),
+ leader_transfer_count);
+ return Status::OK();
+}
+
+Status AutoLeaderRebalancerTask::RunLeaderRebalancer() {
+ MutexLock auto_lock(running_mutex_);
+
+ // If catalog manager isn't initialized or isn't the leader, don't do leader
+ // rebalancing. Putting the auto-rebalancer to sleep shouldn't affect the
+ // master's ability to become the leader. When the thread wakes up and
+ // discovers it is now the leader, then it can begin auto-rebalancing.
+ {
+ CatalogManager::ScopedLeaderSharedLock l(catalog_manager_);
+ if (!l.first_failed_status().ok()) {
+ moves_scheduled_this_round_for_test_ = 0;
+ return Status::OK();
+ }
+ }
+
+ number_of_loop_iterations_for_test_++;
+
+ // Leader balance need not disk capacity, so
+ // we get all tserver uuids
+ TSDescriptorVector descriptors;
+ ts_manager_->GetAllDescriptors(&descriptors);
+
+ vector<string> tserver_uuids;
+ for (const auto& e : descriptors) {
+ if (e->PresumedDead()) {
+ continue;
+ }
+ tserver_uuids.emplace_back(e->permanent_uuid());
+ }
+
+ vector<scoped_refptr<TableInfo>> table_infos;
+ {
+ CatalogManager::ScopedLeaderSharedLock leader_lock(catalog_manager_);
+ RETURN_NOT_OK(leader_lock.first_failed_status());
+ catalog_manager_->GetAllTables(&table_infos);
+ }
+ for (const auto& table_info : table_infos) {
+ RunLeaderRebalanceForTable(table_info, tserver_uuids);
+ }
+ // @TODO(duyuqi)
+ // Enrich the log and add metrics for leader rebalancer.
+ LOG(INFO) << "All tables' leader rebalancing finished this round";
+ return Status::OK();
+}
+
+void AutoLeaderRebalancerTask::RunLoop() {
+ while (
+
!shutdown_.WaitFor(MonoDelta::FromSeconds(FLAGS_auto_leader_rebalancing_interval_seconds)))
{
+ if (FLAGS_auto_leader_rebalancing_enabled) {
+ WARN_NOT_OK(RunLeaderRebalancer(),
+ Substitute("the master instance isn't leader"));
+ }
+ }
+}
+
+} // namespace master
+} // namespace kudu
diff --git a/src/kudu/master/auto_leader_rebalancer.h
b/src/kudu/master/auto_leader_rebalancer.h
new file mode 100644
index 000000000..004e19b1d
--- /dev/null
+++ b/src/kudu/master/auto_leader_rebalancer.h
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <memory>
+#include <random>
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Thread;
+
+namespace rpc {
+class Messenger;
+} // namespace rpc
+
+namespace master {
+
+class CatalogManager;
+class TSManager;
+class TableInfo;
+
+// A CatalogManager background task which auto-rebalances tablets' leaders
distribution
+// by transferring leadership between tablet replicas.
+//
+// As a background task, the lifetime of an instance of this class must be less
+// than the catalog manager it belongs to.
+//
+// The auto-rebalancing task continuously wakes up according to its
+// configured poll period. It performs no work when the master is a follower.
+class AutoLeaderRebalancerTask {
+ public:
+ AutoLeaderRebalancerTask(CatalogManager* catalog_manager, TSManager*
ts_manager);
+
+ ~AutoLeaderRebalancerTask();
+
+ Status Init();
+
+ void Shutdown();
+
+ Status RunLeaderRebalancer();
+
+ enum class ExecuteMode { NORMAL, TEST };
+
+ private:
+ friend class AutoRebalancerTest;
+ friend class LeaderRebalancerTest;
+
+ // Runs the main loop of the auto-leader-rebalancing thread.
+ void RunLoop();
+
+ // Run Leader Rebalance for a table
+ Status RunLeaderRebalanceForTable(
+ const scoped_refptr<TableInfo>& table_info,
+ const std::vector<std::string>& tserver_uuids,
+ AutoLeaderRebalancerTask::ExecuteMode mode =
AutoLeaderRebalancerTask::ExecuteMode::NORMAL);
+
+ // Only one task can be scheduled at a time.
+ Mutex running_mutex_;
+
+ // The associated catalog manager.
+ CatalogManager* catalog_manager_;
+
+ // The associated TS manager.
+ TSManager* ts_manager_;
+
+ // The auto-rebalancing thread.
+ scoped_refptr<kudu::Thread> thread_;
+
+ // latch used to indicate that the thread is shutting down.
+ CountDownLatch shutdown_;
+
+ // send rpc messages for 'LeaderStepDown'.
+ std::shared_ptr<rpc::Messenger> messenger_;
+
+ // Random device and generator for selecting among multiple choices.
+ std::random_device random_device_;
+ std::mt19937 random_generator_;
+
+ // Variables for testing.
+ std::atomic<int> number_of_loop_iterations_for_test_;
+ std::atomic<int> moves_scheduled_this_round_for_test_;
+};
+
+} // namespace master
+} // namespace kudu
diff --git a/src/kudu/master/auto_rebalancer-test.cc
b/src/kudu/master/auto_rebalancer-test.cc
index abc4070c4..a969d3ab8 100644
--- a/src/kudu/master/auto_rebalancer-test.cc
+++ b/src/kudu/master/auto_rebalancer-test.cc
@@ -37,6 +37,7 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/integration-tests/test_workload.h"
+#include "kudu/master/auto_leader_rebalancer.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/master/master.h"
#include "kudu/master/master.pb.h"
@@ -66,12 +67,14 @@ using std::unordered_set;
using std::vector;
using strings::Substitute;
+DECLARE_bool(auto_leader_rebalancing_enabled);
DECLARE_bool(auto_rebalancing_enabled);
DECLARE_int32(consensus_inject_latency_ms_in_notifications);
DECLARE_int32(follower_unavailable_considered_failed_sec);
DECLARE_int32(raft_heartbeat_interval_ms);
DECLARE_int32(tablet_copy_download_file_inject_latency_ms);
DECLARE_int32(tserver_unresponsive_timeout_ms);
+DECLARE_uint32(auto_leader_rebalancing_interval_seconds);
DECLARE_uint32(auto_rebalancing_interval_seconds);
DECLARE_uint32(auto_rebalancing_max_moves_per_server);
DECLARE_uint32(auto_rebalancing_wait_for_replica_moves_seconds);
@@ -97,13 +100,20 @@ namespace {
namespace kudu {
namespace master {
+enum class BalanceThreadType {
+ REPLICA_REBALANCE,
+ LEADER_REBALANCE
+};
+
class AutoRebalancerTest : public KuduTest {
public:
- Status CreateAndStartCluster() {
+ Status CreateAndStartCluster(bool enable_leader_rebalance = true) {
FLAGS_auto_rebalancing_interval_seconds = 1; // Shorten for testing.
FLAGS_auto_rebalancing_wait_for_replica_moves_seconds = 0; // Shorten for
testing.
FLAGS_auto_rebalancing_enabled = true; // Enable for testing.
+ FLAGS_auto_leader_rebalancing_enabled = enable_leader_rebalance; // If
enable for testing.
+ FLAGS_auto_leader_rebalancing_interval_seconds = 2; // shorten for testing.
cluster_.reset(new InternalMiniCluster(env_, cluster_opts_));
return cluster_->Start();
}
@@ -161,14 +171,30 @@ class AutoRebalancerTest : public KuduTest {
}
// Make sure the leader master has begun the auto-rebalancing thread.
- void CheckAutoRebalancerStarted() {
+ void CheckAutoRebalancerStarted(BalanceThreadType type =
+ BalanceThreadType::REPLICA_REBALANCE) {
ASSERT_EVENTUALLY([&] {
int leader_idx;
ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
- ASSERT_LT(0, NumLoopIterations(leader_idx));
+ ASSERT_LT(0, NumLoopIterations(leader_idx, type));
});
}
+ void CheckNoLeaderMovesScheduled() {
+ int leader_idx;
+ ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+ const auto leader_initial_loop_iterations =
+ NumLoopIterations(leader_idx, BalanceThreadType::LEADER_REBALANCE);
+ ASSERT_EVENTUALLY([&] {
+ if (FLAGS_auto_leader_rebalancing_enabled) {
+ ASSERT_LT(leader_initial_loop_iterations + 3,
+ NumLoopIterations(leader_idx,
BalanceThreadType::LEADER_REBALANCE));
+ } else {
+ SleepFor(MonoDelta::FromSeconds(3 *
FLAGS_auto_rebalancing_interval_seconds));
+ }
+ ASSERT_EQ(0, NumMovesScheduled(leader_idx,
BalanceThreadType::LEADER_REBALANCE));
+ });
+ }
// Make sure the auto-rebalancing loop has iterated a few times,
// and no moves were scheduled.
void CheckNoMovesScheduled() {
@@ -179,6 +205,7 @@ class AutoRebalancerTest : public KuduTest {
ASSERT_LT(initial_loop_iterations + 3, NumLoopIterations(leader_idx));
ASSERT_EQ(0, NumMovesScheduled(leader_idx));
});
+ CheckNoLeaderMovesScheduled();
}
void CheckSomeMovesScheduled() {
@@ -223,16 +250,26 @@ class AutoRebalancerTest : public KuduTest {
return ret;
}
- int NumLoopIterations(int master_idx) {
+ int NumLoopIterations(int master_idx,
+ BalanceThreadType type =
BalanceThreadType::REPLICA_REBALANCE) {
DCHECK(cluster_ != nullptr);
- return cluster_->mini_master(master_idx)->master()->catalog_manager()->
+ if (type == BalanceThreadType::REPLICA_REBALANCE) {
+ return cluster_->mini_master(master_idx)->master()->catalog_manager()->
auto_rebalancer()->number_of_loop_iterations_for_test_;
+ }
+ return cluster_->mini_master(master_idx)->master()->catalog_manager()->
+ auto_leader_rebalancer()->number_of_loop_iterations_for_test_;
}
- int NumMovesScheduled(int master_idx) {
+ int NumMovesScheduled(int master_idx,
+ BalanceThreadType type =
BalanceThreadType::REPLICA_REBALANCE) {
DCHECK(cluster_ != nullptr);
+ if (type == BalanceThreadType::REPLICA_REBALANCE) {
return cluster_->mini_master(master_idx)->master()->catalog_manager()->
auto_rebalancer()->moves_scheduled_this_round_for_test_;
+ }
+ return cluster_->mini_master(master_idx)->master()->catalog_manager()->
+ auto_leader_rebalancer()->moves_scheduled_this_round_for_test_;
}
void CreateWorkloadTable(int num_tablets, int num_replicas) {
@@ -255,7 +292,8 @@ class AutoRebalancerTest : public KuduTest {
unique_ptr<TestWorkload> workload_;
};
-// Make sure that only the leader master is doing auto-rebalancing.
+// Make sure that only the leader master is doing auto-rebalancing
+// and auto leader-rebalancing.
TEST_F(AutoRebalancerTest, OnlyLeaderDoesAutoRebalancing) {
const int kNumMasters = 3;
const int kNumTservers = 3;
@@ -264,6 +302,7 @@ TEST_F(AutoRebalancerTest, OnlyLeaderDoesAutoRebalancing) {
cluster_opts_.num_tablet_servers = kNumTservers;
ASSERT_OK(CreateAndStartCluster());
NO_FATALS(CheckAutoRebalancerStarted());
+ NO_FATALS(CheckAutoRebalancerStarted(BalanceThreadType::LEADER_REBALANCE));
CreateWorkloadTable(kNumTablets, /*num_replicas*/1);
@@ -272,11 +311,13 @@ TEST_F(AutoRebalancerTest, OnlyLeaderDoesAutoRebalancing)
{
for (int i = 0; i < kNumMasters; i++) {
if (i == leader_idx) {
ASSERT_EVENTUALLY([&] {
- ASSERT_LT(0, NumLoopIterations(i));
+ ASSERT_LT(0, NumLoopIterations(i,
BalanceThreadType::REPLICA_REBALANCE));
+ ASSERT_LT(0, NumLoopIterations(i,
BalanceThreadType::LEADER_REBALANCE));
});
} else {
ASSERT_EVENTUALLY([&] {
- ASSERT_EQ(0, NumMovesScheduled(i));
+ ASSERT_EQ(0, NumLoopIterations(i,
BalanceThreadType::REPLICA_REBALANCE));
+ ASSERT_EQ(0, NumLoopIterations(i,
BalanceThreadType::LEADER_REBALANCE));
});
}
}
@@ -319,6 +360,7 @@ TEST_F(AutoRebalancerTest,
NextLeaderResumesAutoRebalancing) {
cluster_opts_.num_tablet_servers = kNumTservers;
ASSERT_OK(CreateAndStartCluster());
NO_FATALS(CheckAutoRebalancerStarted());
+ NO_FATALS(CheckAutoRebalancerStarted(BalanceThreadType::LEADER_REBALANCE));
CreateWorkloadTable(kNumTablets, /*num_replicas*/1);
@@ -329,6 +371,7 @@ TEST_F(AutoRebalancerTest,
NextLeaderResumesAutoRebalancing) {
for (int i = 0; i < kNumMasters; i++) {
if (i != leader_idx) {
ASSERT_EQ(0, NumLoopIterations(i));
+ ASSERT_EQ(0, NumLoopIterations(i, BalanceThreadType::LEADER_REBALANCE));
}
}
cluster_->mini_master(leader_idx)->Shutdown();
@@ -341,10 +384,13 @@ TEST_F(AutoRebalancerTest,
NextLeaderResumesAutoRebalancing) {
ASSERT_NE(leader_idx, new_leader_idx);
auto iterations = NumLoopIterations(new_leader_idx);
ASSERT_LT(0, iterations);
+
+ auto iterations_leader = NumLoopIterations(new_leader_idx,
BalanceThreadType::LEADER_REBALANCE);
+ ASSERT_LT(0, iterations_leader);
});
}
-// Create a cluster that is initially balanced.
+// Create a cluster that is initially balanced and leader balanced.
// Bring up another tserver, and verify that moves are scheduled,
// since the cluster is no longer balanced.
TEST_F(AutoRebalancerTest, MovesScheduledIfAddTserver) {
@@ -378,6 +424,8 @@ TEST_F(AutoRebalancerTest, MovesScheduledIfAddTserver) {
kNumTServers + 1);
ASSERT_GT(bytes_fetched_in_new_tservers, 0);
});
+
+ NO_FATALS(CheckNoMovesScheduled());
}
// A cluster with no tservers is balanced.
@@ -523,6 +571,8 @@ TEST_F(AutoRebalancerTest, TestMaxMovesPerServer) {
// The average number of moves per tablet server should not exceed that
specified.
ASSERT_GE(FLAGS_auto_rebalancing_max_moves_per_server *
cluster_->num_tablet_servers(),
AggregateMetricCounts(open_copy_clients_by_uuid, 0,
cluster_->num_tablet_servers()));
+
+ NO_FATALS(CheckNoLeaderMovesScheduled());
}
// Attempt rebalancing a cluster with unstable Raft configurations.
@@ -565,6 +615,8 @@ TEST_F(AutoRebalancerTest, AutoRebalancingUnstableCluster) {
cluster_->num_tablet_servers());
ASSERT_GT(bytes_fetched_in_new_tservers, 0);
});
+
+ NO_FATALS(CheckNoLeaderMovesScheduled());
}
// A cluster that cannot become healthy and meet the replication factor
@@ -598,7 +650,8 @@ TEST_F(AutoRebalancerTest,
NoRebalancingIfReplicasRecovering) {
const int kNumTablets = 4;
cluster_opts_.num_tablet_servers = kNumTservers;
- ASSERT_OK(CreateAndStartCluster());
+ bool auto_leader_rebalancing_enabled = false;
+ ASSERT_OK(CreateAndStartCluster(auto_leader_rebalancing_enabled));
NO_FATALS(CheckAutoRebalancerStarted());
CreateWorkloadTable(kNumTablets, /*num_replicas*/3);
@@ -620,12 +673,13 @@ TEST_F(AutoRebalancerTest, TestHandlingFailedTservers) {
// Set a high timeout for an unresponsive tserver to be presumed dead,
// so the TSManager believes it is still available.
FLAGS_tserver_unresponsive_timeout_ms = 120 * 1000;
+ bool auto_leader_rebalancing_enabled = false; // Disable for testing.
const int kNumTservers = 3;
const int kNumTablets = 4;
cluster_opts_.num_tablet_servers = kNumTservers;
- ASSERT_OK(CreateAndStartCluster());
+ ASSERT_OK(CreateAndStartCluster(auto_leader_rebalancing_enabled));
NO_FATALS(CheckAutoRebalancerStarted());
CreateWorkloadTable(kNumTablets, /*num_replicas*/3);
@@ -675,6 +729,7 @@ TEST_F(AutoRebalancerTest, TestHandlingFailedTservers) {
for (const auto& str : post_capture_logs.logged_msgs()) {
ASSERT_STR_NOT_CONTAINS(str, "scheduled replica move failed to complete:
Network error");
}
+ NO_FATALS(CheckNoLeaderMovesScheduled());
}
// Test that we schedule moves even if some tablets belong to deleted tables.
@@ -687,7 +742,8 @@ TEST_F(AutoRebalancerTest, TestDeletedTables) {
const int kNumTServers = 3;
const int kNumTablets = 4;
cluster_opts_.num_tablet_servers = kNumTServers;
- ASSERT_OK(CreateAndStartCluster());
+ bool auto_leader_rebalancing_enabled = false; // Disable for testing.
+ ASSERT_OK(CreateAndStartCluster(auto_leader_rebalancing_enabled));
NO_FATALS(CheckAutoRebalancerStarted());
// Create some tablets across multiple tables, so we can test rebalancing
@@ -753,6 +809,7 @@ TEST_F(AutoRebalancerTest, TestDeletedTables) {
JoinStrings(deleted_tablet_ids, ","),
JoinStrings(tablets_on_new_ts, ","));
}
+ NO_FATALS(CheckNoLeaderMovesScheduled());
}
} // namespace master
diff --git a/src/kudu/master/auto_rebalancer.h
b/src/kudu/master/auto_rebalancer.h
index e85377639..ecd7d3ca2 100644
--- a/src/kudu/master/auto_rebalancer.h
+++ b/src/kudu/master/auto_rebalancer.h
@@ -78,6 +78,7 @@ class AutoRebalancerTask {
private:
friend class AutoRebalancerTest;
+ friend class LeaderRebalancerTest;
// Runs the main loop of the auto-rebalancing thread.
void RunLoop();
diff --git a/src/kudu/master/catalog_manager.cc
b/src/kudu/master/catalog_manager.cc
index 916880a88..48f5924cf 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -97,6 +97,7 @@
#include "kudu/gutil/walltime.h"
#include "kudu/hms/hms_catalog.h"
#include "kudu/master/authz_provider.h"
+#include "kudu/master/auto_leader_rebalancer.h"
#include "kudu/master/auto_rebalancer.h"
#include "kudu/master/default_authz_provider.h"
#include "kudu/master/hms_notification_log_listener.h"
@@ -348,6 +349,12 @@ TAG_FLAG(auto_rebalancing_enabled, advanced);
TAG_FLAG(auto_rebalancing_enabled, experimental);
TAG_FLAG(auto_rebalancing_enabled, runtime);
+DEFINE_bool(auto_leader_rebalancing_enabled, false,
+ "Whether automatic leader rebalancing is enabled.");
+TAG_FLAG(auto_leader_rebalancing_enabled, advanced);
+TAG_FLAG(auto_leader_rebalancing_enabled, experimental);
+TAG_FLAG(auto_leader_rebalancing_enabled, runtime);
+
DEFINE_uint32(table_locations_cache_capacity_mb, 0,
"Capacity for the table locations cache (in MiB); a value "
"of 0 means table locations are not be cached");
@@ -1052,6 +1059,21 @@ Status CatalogManager::Init(bool is_first_run) {
RETURN_NOT_OK_PREPEND(task->Init(), "failed to initialize auto-rebalancing
task");
auto_rebalancer_ = std::move(task);
+ // Leader rebalancer depends on a good replicas balance, that means we'd
better enable
+ // auto_rebalancing. If auto-rebalancing is disabled and leader rebalancing
is enabled,
+ // the algorithm can work, but the effect of leader rebalancing is limited,
kudu
+ // cluster cannot reach the best balanced effect.
+ //
+ // The algorithm can work even if auto-rebalancing is disabled, because it
tries to keep
+ // a propotion of leaders/followers (1 : replication_refactor - 1) for every
tserver'
+ // every table.
+
+ unique_ptr<AutoLeaderRebalancerTask> leader_task(
+ new AutoLeaderRebalancerTask(this, master_->ts_manager()));
+ RETURN_NOT_OK_PREPEND(leader_task->Init(),
+ "failed thie initialize auto-leader-rebalancing task");
+ auto_leader_rebalancer_ = std::move(leader_task);
+
vector<HostPort> master_addresses;
RETURN_NOT_OK(master_->GetMasterHostPorts(&master_addresses));
if (hms::HmsCatalog::IsEnabled()) {
@@ -4109,6 +4131,14 @@ Status CatalogManager::GetTableInfo(const string&
table_id, scoped_refptr<TableI
return Status::OK();
}
+void CatalogManager::GetTableInfoByName(const string& table_name,
+ scoped_refptr<TableInfo> *table) {
+ leader_lock_.AssertAcquiredForReading();
+
+ shared_lock<LockType> l(lock_);
+ *table = FindPtrOrNull(normalized_table_names_map_, table_name);
+}
+
void CatalogManager::GetAllTables(vector<scoped_refptr<TableInfo>>* tables) {
leader_lock_.AssertAcquiredForReading();
diff --git a/src/kudu/master/catalog_manager.h
b/src/kudu/master/catalog_manager.h
index 340d3996d..3baa8bfdf 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -116,6 +116,7 @@ class TabletReplica;
namespace master {
class AuthzProvider;
+class AutoLeaderRebalancerTask;
class AutoRebalancerTask;
class CatalogManagerBgTasks;
class HmsNotificationLogListenerTask;
@@ -824,8 +825,13 @@ class CatalogManager : public
tserver::TabletReplicaLookupIf {
// Retrieve a table by ID, or null if no such table exists. May fail if the
// catalog manager is not yet running. Caller must hold leader_lock_.
//
- // NOTE: This should only be used by tests or web-ui
- Status GetTableInfo(const std::string& table_id, scoped_refptr<TableInfo>
*table);
+ // NOTE: This should only be used by tests or web-ui.
+ Status GetTableInfo(const std::string& table_id, scoped_refptr<TableInfo>*
table);
+
+ // Retrieve a table by table_name, or null if no such table exists.
+ //
+ // NOTE: This should only be used by tests.
+ void GetTableInfoByName(const std::string& table_name,
scoped_refptr<TableInfo>* table);
// Retrieve all known tables, even those that are not running. May fail if
// the catalog manager is not yet running. Caller must hold leader_lock_.
@@ -879,6 +885,10 @@ class CatalogManager : public
tserver::TabletReplicaLookupIf {
return auto_rebalancer_.get();
}
+ master::AutoLeaderRebalancerTask* auto_leader_rebalancer() const {
+ return auto_leader_rebalancer_.get();
+ }
+
// Returns the normalized form of the provided table name.
//
// If the HMS integration is configured and the table name is a valid HMS
@@ -1329,6 +1339,8 @@ class CatalogManager : public
tserver::TabletReplicaLookupIf {
std::unique_ptr<AutoRebalancerTask> auto_rebalancer_;
+ std::unique_ptr<AutoLeaderRebalancerTask> auto_leader_rebalancer_;
+
enum State {
kConstructed,
kStarting,