This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new d8467c571 KUDU-3497 optimize leader rebalancer algorithm
d8467c571 is described below

commit d8467c571a5e7eecaf689c9c9647851ce9bf0fd1
Author: 宋家成 <[email protected]>
AuthorDate: Fri Jul 28 18:19:17 2023 +0800

    KUDU-3497 optimize leader rebalancer algorithm
    
    Leader rebalancing might not work well for now, especially for
    the tables with smaller number of hash partitions.
    For instance, for a table, consisting of 9 tablets, RF = 3, in a3-tservers 
cluster.
    
    Its leaders distribution is as follow:
    
    Tablet server A : 4
    Tablet server B : 4
    Tablet server C : 1
    
    According to the algorithm for now, there will not be any rebalance
    operation scheduled.
    
    Therefore, try to find a better algorithm to make it always find
    the best leader distribution.
    
    The formula is:
    expected leader num = (tablets sum) % (tablets server num) = 0 ?
    (tablets sum) / (tablets server num) :
    ceil((tablets sum) / (tablets server num))
    A tserver whose leader num is more than the expected value needs
    to transfer the leaderships.
    
    So the leader skew will never be more than 1.
    
    Change-Id: I0f1fe796fd98da2d8764da793b7e254319e6348a
    Reviewed-on: http://gerrit.cloudera.org:8080/20310
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/master/auto_leader_rebalancer-test.cc | 252 ++++++++++++++++++++++++-
 src/kudu/master/auto_leader_rebalancer.cc      |  34 +++-
 src/kudu/mini-cluster/external_mini_cluster.cc |   9 +
 src/kudu/mini-cluster/external_mini_cluster.h  |   6 +
 src/kudu/mini-cluster/internal_mini_cluster.cc |   8 +
 src/kudu/mini-cluster/internal_mini_cluster.h  |   6 +
 src/kudu/mini-cluster/mini_cluster.h           |   9 +
 7 files changed, 314 insertions(+), 10 deletions(-)

diff --git a/src/kudu/master/auto_leader_rebalancer-test.cc 
b/src/kudu/master/auto_leader_rebalancer-test.cc
index 5357bdd15..234a6a8da 100644
--- a/src/kudu/master/auto_leader_rebalancer-test.cc
+++ b/src/kudu/master/auto_leader_rebalancer-test.cc
@@ -17,17 +17,23 @@
 #include "kudu/master/auto_leader_rebalancer.h"
 
 #include <algorithm>
+#include <cmath>
 #include <cstdint>
+#include <iostream>
+#include <map>
 #include <memory>
-#include <ostream>
+#include <numeric>
 #include <string>
 #include <type_traits>
+#include <utility>
 #include <vector>
 
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "kudu/client/client.h"
+#include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/integration-tests/test_workload.h"
@@ -42,6 +48,7 @@
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/tserver/tserver_service.proxy.h" // IWYU pragma: keep
+#include "kudu/consensus/consensus.proxy.h"// IWYU pragma: keep
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
@@ -53,10 +60,13 @@ class AutoRebalancerTask;
 }  // namespace master
 }  // namespace kudu
 
-
 using kudu::cluster::InternalMiniCluster;
 using kudu::cluster::InternalMiniClusterOptions;
 using kudu::tserver::ListTabletsResponsePB;
+using kudu::tserver::MiniTabletServer;
+using kudu::consensus::LeaderStepDownRequestPB;
+using kudu::consensus::LeaderStepDownResponsePB;
+using kudu::rpc::RpcController;
 using std::string;
 using std::unique_ptr;
 using std::vector;
@@ -129,12 +139,250 @@ class LeaderRebalancerTest : public KuduTest {
         table_info, tserver_uuids, {}, 
AutoLeaderRebalancerTask::ExecuteMode::TEST);
   }
 
+  // Get the leader numbers of each tablet server.
+  void GetLeaderDistribution(std::map<string, int32_t>* leader_map) {
+    leader_map->clear();
+    scoped_refptr<TableInfo> table;
+    master::Master* master = cluster_->mini_master()->master();
+    master::CatalogManager* catalog_manager = master->catalog_manager();
+    {
+      CatalogManager::ScopedLeaderSharedLock leaderlock(catalog_manager);
+      catalog_manager->GetTableInfoByName(table_name(), &table);
+    }
+    std::vector<string> leader_list;
+    for (const auto& tablet : table->tablet_map()) {
+      client::KuduTablet* ptr;
+      workload_->client()->GetTablet(tablet.second->id(), &ptr);
+      unique_ptr<client::KuduTablet> tablet_ptr(ptr);
+      for (const auto* replica : tablet_ptr->replicas()) {
+        if (replica->is_leader()) {
+          leader_list.push_back(replica->ts().uuid());
+        }
+      }
+    }
+    TSDescriptorVector descriptors;
+    master->ts_manager()->GetAllDescriptors(&descriptors);
+    for (const auto& e : descriptors) {
+      if (e->PresumedDead()) {
+        continue;
+      }
+      leader_map->emplace(e->permanent_uuid(), count(
+          leader_list.begin(), leader_list.end(), e->permanent_uuid()));
+    }
+  }
+
+  // Make the leader distribution as the vector passed in.
+  Status MakeLeaderDistribution(std::vector<int32_t> leader_distribution) {
+    master::Master* master = cluster_->mini_master()->master();
+    TSDescriptorVector descriptors;
+    master->ts_manager()->GetAllDescriptors(&descriptors);
+    if (descriptors.size() != leader_distribution.size()) {
+      return Status::IllegalState("The size of leader distribution vector 
should "
+                                  "be the number of tablet servers.");
+    }
+
+    scoped_refptr<TableInfo> table;
+    master::CatalogManager* catalog_manager = master->catalog_manager();
+    {
+      CatalogManager::ScopedLeaderSharedLock leaderlock(catalog_manager);
+      catalog_manager->GetTableInfoByName(table_name(), &table);
+    }
+
+    if (std::accumulate(leader_distribution.begin(), 
leader_distribution.end(), 0) !=
+        table->num_tablets()) {
+      return Status::IllegalState("The sum of leader distribution should "
+                                  "be the tablet number of the table.");
+    }
+
+    int32_t index = 0;
+    int32_t tmp_distribution = 0;
+    MiniTabletServer* tserver = cluster_->mini_tablet_server(0);
+    for (const auto& tablet : table->tablet_map()) {
+      if (tmp_distribution >= leader_distribution.at(index)) {
+        index++;
+        tmp_distribution = 0;
+        tserver = cluster_->mini_tablet_server(index);
+      }
+      unique_ptr<client::KuduTablet> tablet_copy;
+      {
+        client::KuduTablet* ptr;
+        workload_->client()->GetTablet(tablet.second->id(), &ptr);
+        tablet_copy.reset(ptr);
+      }
+      for (const auto* replica: tablet_copy->replicas()) {
+        if (replica->is_leader()) {
+          if (replica->ts().uuid() == tserver->uuid()) {
+            break;
+          }
+          LeaderStepDownRequestPB req;
+          req.set_dest_uuid(replica->ts().uuid());
+          req.set_tablet_id(tablet.second->id());
+          req.set_new_leader_uuid(tserver->uuid());
+          req.set_mode(consensus::GRACEFUL);
+          LeaderStepDownResponsePB resp;
+          RpcController rpc;
+          RETURN_NOT_OK(cluster_->tserver_consensus_proxy(cluster_
+                        ->tablet_server_index_by_uuid(replica->ts().uuid()))
+                        ->LeaderStepDown(req, &resp, &rpc));
+          break;
+        }
+      }
+      tmp_distribution++;
+    }
+    return Status::OK();
+  }
+
  protected:
   unique_ptr<InternalMiniCluster> cluster_;
   InternalMiniClusterOptions cluster_opts_;
   unique_ptr<TestWorkload> workload_;
 };
 
+// Verify if the leader rebalancing is able to balance the leaders in various
+// workloads.
+// We need to make sure that the function RunLeaderRebalanceForTable is
+// correct. After that we could use it to check leader balance by passing
+// TEST mode.
+TEST_F(LeaderRebalancerTest, FunctionalTestForDivided) {
+  const int kNumTServers = 3;
+  const int kNumTablets = 9;
+  cluster_opts_.num_tablet_servers = kNumTServers;
+
+  ASSERT_OK(CreateAndStartCluster());
+  CreateWorkloadTable(kNumTablets, /*num_replicas*/ 3);
+
+  // Simulate the leader distribution.
+  std::vector<int32_t> leader_distribution = {4, 4, 1};
+  MakeLeaderDistribution(leader_distribution);
+
+  SleepFor(MonoDelta::FromMilliseconds(3000));
+  std::map<string, int32_t> leader_map;
+  GetLeaderDistribution(&leader_map);
+  LOG(INFO) << "The leader distribution is " << '\n';
+  for (const auto& leader : leader_map) {
+    std::cout << leader.first << "  " << leader.second << '\n';
+  }
+
+  // Try to do rebalance 10 times.
+  master::Master* master = cluster_->mini_master()->master();
+  int32_t retries = 10;
+  master::AutoLeaderRebalancerTask* leader_rebalancer =
+      master->catalog_manager()->auto_leader_rebalancer();
+  for (int i = 0; i < retries; i++) {
+    leader_rebalancer->RunLeaderRebalancer();
+    SleepFor(MonoDelta::FromMilliseconds(FLAGS_heartbeat_interval_ms));
+  }
+
+  // Check the leader numbers of each tablet server. It should always be 
floor(avg)
+  // or ceil(avg), where the parameter avg is (tablet num) / (tablet server 
num).
+  double expected_leader_num = static_cast<double>(kNumTablets) / 3;
+  GetLeaderDistribution(&leader_map);
+  LOG(INFO) << "The leader distribution is " << '\n';
+  for (const auto& leader : leader_map) {
+    std::cout << leader.first << "  " << leader.second << '\n';
+  }
+  for (const auto& leader: leader_map) {
+    ASSERT_GE(leader.second, std::floor(expected_leader_num));
+    ASSERT_LE(leader.second, std::ceil(expected_leader_num));
+  }
+
+  // Try different leader distribution.
+  std::vector<int32_t> leader_distribution2 = {0, 8, 1};
+  MakeLeaderDistribution(leader_distribution2);
+
+  SleepFor(MonoDelta::FromMilliseconds(3000));
+  GetLeaderDistribution(&leader_map);
+  LOG(INFO) << "The leader distribution is " << '\n';
+  for (const auto& leader : leader_map) {
+    std::cout << leader.first << "  " << leader.second << '\n';
+  }
+
+  for (int i = 0; i < retries; i++) {
+    leader_rebalancer->RunLeaderRebalancer();
+    SleepFor(MonoDelta::FromMilliseconds(FLAGS_heartbeat_interval_ms));
+  }
+
+  GetLeaderDistribution(&leader_map);
+  LOG(INFO) << "The leader distribution is " << '\n';
+  for (const auto& leader : leader_map) {
+    std::cout << leader.first << "  " << leader.second << '\n';
+  }
+  for (const auto& leader: leader_map) {
+    ASSERT_GE(leader.second, std::floor(expected_leader_num));
+    ASSERT_LE(leader.second, std::ceil(expected_leader_num));
+  }
+}
+
+TEST_F(LeaderRebalancerTest, FunctionalTestForNotDivided) {
+  const int kNumTServers = 3;
+  const int kNumTablets = 10;
+  cluster_opts_.num_tablet_servers = kNumTServers;
+
+  ASSERT_OK(CreateAndStartCluster());
+  CreateWorkloadTable(kNumTablets, /*num_replicas*/ 3);
+
+  // Simulate the leader distribution.
+  std::vector<int32_t> leader_distribution = {5, 4, 1};
+  MakeLeaderDistribution(leader_distribution);
+
+  SleepFor(MonoDelta::FromMilliseconds(3000));
+  std::map<string, int32_t> leader_map;
+  GetLeaderDistribution(&leader_map);
+  LOG(INFO) << "The leader distribution is " << '\n';
+  for (const auto& leader : leader_map) {
+    std::cout << leader.first << "  " << leader.second << '\n';
+  }
+
+  // Try to do rebalance 10 times.
+  master::Master* master = cluster_->mini_master()->master();
+  int32_t retries = 10;
+  master::AutoLeaderRebalancerTask* leader_rebalancer =
+    master->catalog_manager()->auto_leader_rebalancer();
+  for (int i = 0; i < retries; i++) {
+    leader_rebalancer->RunLeaderRebalancer();
+    SleepFor(MonoDelta::FromMilliseconds(FLAGS_heartbeat_interval_ms));
+  }
+
+  // Check the leader numbers of each tablet server. It should always be 
floor(avg)
+  // or ceil(avg), where the parameter avg is (tablet num) / (tablet server 
num).
+  double expected_leader_num = static_cast<double>(kNumTablets) / 3;
+  GetLeaderDistribution(&leader_map);
+  LOG(INFO) << "The leader distribution is " << '\n';
+  for (const auto& leader : leader_map) {
+    std::cout << leader.first << "  " << leader.second << '\n';
+  }
+  for (const auto& leader: leader_map) {
+    ASSERT_GE(leader.second, std::floor(expected_leader_num));
+    ASSERT_LE(leader.second, std::ceil(expected_leader_num));
+  }
+
+  // Try different leader distribution.
+  std::vector<int32_t> leader_distribution2 = {8, 1, 1};
+  MakeLeaderDistribution(leader_distribution2);
+
+  SleepFor(MonoDelta::FromMilliseconds(3000));
+  GetLeaderDistribution(&leader_map);
+  LOG(INFO) << "The leader distribution is " << '\n';
+  for (const auto& leader : leader_map) {
+    std::cout << leader.first << "  " << leader.second << '\n';
+  }
+
+  for (int i = 0; i < retries; i++) {
+    leader_rebalancer->RunLeaderRebalancer();
+    SleepFor(MonoDelta::FromMilliseconds(FLAGS_heartbeat_interval_ms));
+  }
+
+  GetLeaderDistribution(&leader_map);
+  LOG(INFO) << "The leader distribution is " << '\n';
+  for (const auto& leader : leader_map) {
+    std::cout << leader.first << "  " << leader.second << '\n';
+  }
+  for (const auto& leader: leader_map) {
+    ASSERT_GE(leader.second, std::floor(expected_leader_num));
+    ASSERT_LE(leader.second, std::ceil(expected_leader_num));
+  }
+}
+
 // Create a cluster, and create a table,
 // whether tablets is balanced, which decided by creating table process.
 // Bring up another tserver, the table is not balanced and leaders is also
diff --git a/src/kudu/master/auto_leader_rebalancer.cc 
b/src/kudu/master/auto_leader_rebalancer.cc
index 3f9030c9c..1ee94e5cb 100644
--- a/src/kudu/master/auto_leader_rebalancer.cc
+++ b/src/kudu/master/auto_leader_rebalancer.cc
@@ -18,6 +18,7 @@
 #include "kudu/master/auto_leader_rebalancer.h"
 
 #include <algorithm>
+#include <cstddef>
 #include <cstdint>
 #include <functional>
 #include <map>
@@ -201,29 +202,43 @@ Status 
AutoLeaderRebalancerTask::RunLeaderRebalanceForTable(
   map<string, std::pair<int32_t, int32_t>> replica_and_leader_count_by_ts_uuid;
   // uuid->leader should transfer count
   map<string, int32_t> leader_transfer_source;
+  size_t remaining_tablets = tablet_infos.size();
+  size_t remaining_tservers = tserver_uuids.size();
   for (const auto& uuid : tserver_uuids) {
     auto* tablet_ids_ptr = FindOrNull(tablet_ids_by_ts_uuid, uuid);
-    uint32_t replica_count = tablet_ids_ptr ? tablet_ids_ptr->size() : 0;
+    int32_t replica_count = tablet_ids_ptr ? tablet_ids_ptr->size() : 0;
     if (replica_count == 0) {
       // means no replicas (and no leaders), maybe a tserver joined kudu 
cluster just now, skip it
+      remaining_tservers--;
       continue;
     }
     auto* leader_tablet_ids_ptr = FindOrNull(leader_tablet_ids_by_ts_uuid, 
uuid);
-    uint32_t leader_count = leader_tablet_ids_ptr ? 
leader_tablet_ids_ptr->size() : 0;
+    int32_t leader_count = leader_tablet_ids_ptr ? 
leader_tablet_ids_ptr->size() : 0;
     replica_and_leader_count_by_ts_uuid.insert(
         {uuid, std::pair<int32_t, int32_t>(replica_count, leader_count)});
     VLOG(1) << Substitute(
         "uuid: $0, replica_count: $1, leader_count: $2", uuid, replica_count, 
leader_count);
 
-    // Our target is every tserver' replicas, number of leader : number of 
follower is
-    // 1 : (replica_refactor -1). The constant 1 is a coarse-grained 
correction factor to help
-    // leader rebalancer to converge stable.
-    int32_t should_transfer_count = static_cast<int32_t>(leader_count) -
-                                    (static_cast<int32_t>(replica_count) / 
replication_factor + 1);
+    // If the number of remaining tablets is divisible by the number of 
remaining tablet
+    // servers, the leader num of all the remaining tablet servers should be 
the division
+    // result.
+    // Else, the maximum number of leader replicas per tablet server should be 
the ceil value
+    // of the average leaders num. Transfer the excess leaders if a tablet 
server's
+    // leader num is more than that.
+    const uint32_t target_leader_count =
+        (remaining_tablets + remaining_tservers - 1) / remaining_tservers;
+    int32_t should_transfer_count = leader_count - 
static_cast<int32_t>(target_leader_count);
     if (should_transfer_count > 0) {
       leader_transfer_source.insert({uuid, should_transfer_count});
       VLOG(1) << Substitute("$0 should transfer leader count: $1", uuid, 
should_transfer_count);
     }
+    if (remaining_tablets % remaining_tservers == 0) {
+      remaining_tablets -= target_leader_count;
+    } else {
+      remaining_tablets -= (should_transfer_count >= 0 ? target_leader_count :
+                            (target_leader_count - 1));
+    }
+    remaining_tservers--;
   }
 
   // Step 3.
@@ -383,7 +398,10 @@ Status AutoLeaderRebalancerTask::RunLeaderRebalancer() {
   // we get all tserver uuids
   TSDescriptorVector descriptors;
   ts_manager_->GetAllDescriptors(&descriptors);
-
+  if (PREDICT_FALSE(descriptors.empty())) {
+    VLOG(1) << "No tserver registered for now, skipping this leader 
rebalancing round.";
+    return Status::OK();
+  }
   vector<string> tserver_uuids;
   for (const auto& e : descriptors) {
     if (e->PresumedDead()) {
diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc 
b/src/kudu/mini-cluster/external_mini_cluster.cc
index 19e4efd58..ea621f44e 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster.cc
@@ -68,6 +68,7 @@
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/tserver/tserver_admin.proxy.h"
 #include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/consensus/consensus.proxy.h"
 #include "kudu/util/async_util.h"
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
@@ -104,6 +105,7 @@ using kudu::tserver::ListTabletsRequestPB;
 using kudu::tserver::ListTabletsResponsePB;
 using kudu::tserver::TabletServerAdminServiceProxy;
 using kudu::tserver::TabletServerServiceProxy;
+using kudu::consensus::ConsensusServiceProxy;
 using std::back_inserter;
 using std::copy;
 using std::map;
@@ -1123,6 +1125,13 @@ std::shared_ptr<TabletServerAdminServiceProxy> 
ExternalMiniCluster::tserver_admi
   return std::make_shared<TabletServerAdminServiceProxy>(messenger_, addr, 
addr.host());
 }
 
+std::shared_ptr<ConsensusServiceProxy> 
ExternalMiniCluster::tserver_consensus_proxy(
+    int idx) const {
+  CHECK_LT(idx, tablet_servers_.size());
+  const auto& addr = CHECK_NOTNULL(tablet_server(idx))->bound_rpc_addr();
+  return std::make_shared<consensus::ConsensusServiceProxy>(messenger_, addr, 
addr.host());
+}
+
 Status ExternalMiniCluster::CreateClient(client::KuduClientBuilder* builder,
                                          
client::sp::shared_ptr<client::KuduClient>* client) const {
   client::KuduClientBuilder defaults;
diff --git a/src/kudu/mini-cluster/external_mini_cluster.h 
b/src/kudu/mini-cluster/external_mini_cluster.h
index 5d82c55da..47cdfe1cb 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.h
+++ b/src/kudu/mini-cluster/external_mini_cluster.h
@@ -99,6 +99,10 @@ class TabletServerAdminServiceProxy;
 class TabletServerServiceProxy;
 } // namespace tserver
 
+namespace consensus {
+class ConsensusServiceProxy;
+} // namespace consensus
+
 namespace cluster {
 
 class ExternalDaemon;
@@ -507,6 +511,8 @@ class ExternalMiniCluster : public MiniCluster {
   std::shared_ptr<tserver::TabletServerServiceProxy> tserver_proxy(int idx) 
const override;
   std::shared_ptr<tserver::TabletServerAdminServiceProxy> tserver_admin_proxy(
       int idx) const override;
+  std::shared_ptr<consensus::ConsensusServiceProxy> tserver_consensus_proxy(
+      int idx) const override;
 
   std::string block_manager_type() const {
     return opts_.block_manager_type;
diff --git a/src/kudu/mini-cluster/internal_mini_cluster.cc 
b/src/kudu/mini-cluster/internal_mini_cluster.cc
index 0c146398f..0bfaeb5b6 100644
--- a/src/kudu/mini-cluster/internal_mini_cluster.cc
+++ b/src/kudu/mini-cluster/internal_mini_cluster.cc
@@ -45,6 +45,7 @@
 #include "kudu/tserver/ts_tablet_manager.h"
 #include "kudu/tserver/tserver_admin.proxy.h"
 #include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/consensus/consensus.proxy.h"
 #include "kudu/util/env.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
@@ -74,6 +75,7 @@ using tserver::MiniTabletServer;
 using tserver::TabletServer;
 using tserver::TabletServerAdminServiceProxy;
 using tserver::TabletServerServiceProxy;
+using consensus::ConsensusServiceProxy;
 
 InternalMiniClusterOptions::InternalMiniClusterOptions()
   : num_masters(1),
@@ -449,6 +451,12 @@ std::shared_ptr<TabletServerAdminServiceProxy> 
InternalMiniCluster::tserver_admi
   return std::make_shared<TabletServerAdminServiceProxy>(messenger_, addr, 
addr.host());
 }
 
+std::shared_ptr<ConsensusServiceProxy> 
InternalMiniCluster::tserver_consensus_proxy(
+    int idx) const {
+  const auto& addr = CHECK_NOTNULL(mini_tablet_server(idx))->bound_rpc_addr();
+  return std::make_shared<ConsensusServiceProxy>(messenger_, addr, 
addr.host());
+}
+
 string InternalMiniCluster::WalRootForTS(int ts_idx) const {
   return mini_tablet_server(ts_idx)->options()->fs_opts.wal_root;
 }
diff --git a/src/kudu/mini-cluster/internal_mini_cluster.h 
b/src/kudu/mini-cluster/internal_mini_cluster.h
index 59a2184d2..db8fcaec1 100644
--- a/src/kudu/mini-cluster/internal_mini_cluster.h
+++ b/src/kudu/mini-cluster/internal_mini_cluster.h
@@ -60,6 +60,10 @@ class TabletServerAdminServiceProxy;
 class TabletServerServiceProxy;
 } // namespace tserver
 
+namespace consensus {
+class ConsensusServiceProxy;
+}  // namespace consensus
+
 namespace cluster {
 
 struct InternalMiniClusterOptions {
@@ -232,6 +236,8 @@ class InternalMiniCluster : public MiniCluster {
   std::shared_ptr<tserver::TabletServerServiceProxy> tserver_proxy(int idx) 
const override;
   std::shared_ptr<tserver::TabletServerAdminServiceProxy> tserver_admin_proxy(
       int idx) const override;
+  std::shared_ptr<consensus::ConsensusServiceProxy> tserver_consensus_proxy(
+      int idx) const override;
 
  private:
   FRIEND_TEST(kudu::tools::ToolTest, TestRebuildTserverByLocalReplicaCopy);
diff --git a/src/kudu/mini-cluster/mini_cluster.h 
b/src/kudu/mini-cluster/mini_cluster.h
index 232c9ed62..74f8a7405 100644
--- a/src/kudu/mini-cluster/mini_cluster.h
+++ b/src/kudu/mini-cluster/mini_cluster.h
@@ -48,6 +48,10 @@ class TabletServerAdminServiceProxy;
 class TabletServerServiceProxy;
 } // namespace tserver
 
+namespace consensus {
+class ConsensusServiceProxy;
+} // namespace consensus
+
 namespace cluster {
 
 // Mode to which node types a certain action (like Shutdown()) should apply.
@@ -125,6 +129,11 @@ class MiniCluster {
   virtual std::shared_ptr<tserver::TabletServerAdminServiceProxy> 
tserver_admin_proxy(
       int idx) const = 0;
 
+  // Returns an RPC proxy to the tserver consensus service at 'idx'. Requires 
that
+  // the tserver at 'idx' is running.
+  virtual std::shared_ptr<consensus::ConsensusServiceProxy> 
tserver_consensus_proxy(
+      int idx) const = 0;
+
   // Returns the UUID for the tablet server 'ts_idx'
   virtual std::string UuidForTS(int ts_idx) const = 0;
 

Reply via email to