This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e66534  [master] AddMaster and copy sys catalog automatically
7e66534 is described below

commit 7e66534d0e62fb850bf300d52da4a0a76889f4b8
Author: Andrew Wong <[email protected]>
AuthorDate: Tue Jun 22 18:52:21 2021 -0700

    [master] AddMaster and copy sys catalog automatically
    
    This patch updates the masters to ping all other masters, collect their
    UUIDs and Raft configs, and detect whether the master being started
    meant to join the cluster via AddMaster RPC. If so, it sends the RPC,
    performs a tablet copy, and bootstraps the master from the copy.
    
    This patch is fairly conservative about when it will perform the
    automatic operations -- the existing master Raft config  must be in good
    health, with all masters agreeing on members and the current term.
    Otherwise, the new master will fail to come up, as it's not a part of
    the existing cluster.
    
    This patch includes a gflag with which to disable this behavior. It is
    disabled in the following cases:
    - when running the `kudu master add` tooling
    - tests for the existing tooling
    
    Some test utility updates come along with this patch as well, used in
    newly introduced tests:
    - the ExternalMiniCluster can now add a master
    - ExternalMiniCluster::WaitForCatalogManager() is made more robust to
      the shutting down and starting up of a master
    
    In order to get this to work in a secured environment, this patch also
    updates the coarse grained authz requirements of the AddMaster endpoint
    to allow the service user to send RPCs.
    
    Sample usage (from build/latest):
     $ ../../src/kudu/scripts/start_kudu.sh
     $ mkdir -p master-1/log
     $ ./bin/kudu-master --master_addresses=127.0.0.1:8764,127.0.0.1:8766 
--fs_data_dirs=$PWD/master-1/data --fs_wal_dir=$PWD/master-1/wal 
--log_dir=$PWD/master-1/log --rpc_bind_addresses=127.0.0.1:8766 
--time_source=system_unsync --unlock_unsafe_flags --webserver_port=8767 
--webserver_interface=127.0.0.1 --webserver_doc_root=$PWD/../../www 
--logtostderr
     ...
     $ ../../src/kudu/scripts/stop_kudu.sh
     $ ../../src/kudu/scripts/start_kudu.sh --num-masters 2
    
    Change-Id: Ic7635723f30317a45799ad7b9c9b725eafbd735b
    Reviewed-on: http://gerrit.cloudera.org:8080/17528
    Reviewed-by: Alexey Serbin <[email protected]>
    Tested-by: Alexey Serbin <[email protected]>
---
 src/kudu/master/CMakeLists.txt                     |   2 +-
 src/kudu/master/dynamic_multi_master-test.cc       | 631 ++++++++++++++++-----
 src/kudu/master/master.proto                       |   5 +-
 src/kudu/master/master_options.h                   |   7 +-
 src/kudu/master/master_runner.cc                   | 338 ++++++++++-
 .../mini-cluster/external_mini_cluster-test.cc     |  40 +-
 src/kudu/mini-cluster/external_mini_cluster.cc     | 240 +++++---
 src/kudu/mini-cluster/external_mini_cluster.h      |  22 +-
 src/kudu/tools/tool_action_master.cc               |   1 +
 9 files changed, 1043 insertions(+), 243 deletions(-)

diff --git a/src/kudu/master/CMakeLists.txt b/src/kudu/master/CMakeLists.txt
index dafeea6..5dd8bae 100644
--- a/src/kudu/master/CMakeLists.txt
+++ b/src/kudu/master/CMakeLists.txt
@@ -108,7 +108,7 @@ SET_KUDU_TEST_LINK_LIBS(
 
 ADD_KUDU_TEST(auto_rebalancer-test)
 ADD_KUDU_TEST(catalog_manager-test)
-ADD_KUDU_TEST(dynamic_multi_master-test NUM_SHARDS 4)
+ADD_KUDU_TEST(dynamic_multi_master-test NUM_SHARDS 6)
 ADD_KUDU_TEST(hms_notification_log_listener-test)
 ADD_KUDU_TEST(location_cache-test DATA_FILES ../scripts/first_argument.sh)
 ADD_KUDU_TEST(master_options-test)
diff --git a/src/kudu/master/dynamic_multi_master-test.cc 
b/src/kudu/master/dynamic_multi_master-test.cc
index e473a93..e6d345d 100644
--- a/src/kudu/master/dynamic_multi_master-test.cc
+++ b/src/kudu/master/dynamic_multi_master-test.cc
@@ -16,13 +16,16 @@
 // under the License.
 
 #include <algorithm>
+#include <atomic>
 #include <cstdint>
 #include <functional>
 #include <iterator>
 #include <map>
 #include <memory>
+#include <mutex>
 #include <ostream>
 #include <string>
+#include <thread>
 #include <tuple>
 #include <unordered_set>
 #include <utility>
@@ -49,6 +52,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/integration-tests/cluster_verifier.h"
+#include "kudu/integration-tests/test_workload.h"
 #include "kudu/master/master.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/master/master.proxy.h"
@@ -60,12 +64,15 @@
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
+#include "kudu/util/pb_util.h"
 #include "kudu/util/random.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -93,9 +100,12 @@ using kudu::consensus::LeaderStepDownRequestPB;
 using kudu::consensus::LeaderStepDownResponsePB;
 using kudu::consensus::RaftConfigPB;
 using kudu::consensus::RaftPeerPB;
+using kudu::pb_util::SecureShortDebugString;
 using kudu::rpc::RpcController;
+using std::atomic;
 using std::map;
 using std::string;
+using std::thread;
 using std::tuple;
 using std::unique_ptr;
 using std::unordered_set;
@@ -105,14 +115,12 @@ using strings::Substitute;
 namespace kudu {
 namespace master {
 
-static Status CreateTable(ExternalMiniCluster* cluster,
-                          const std::string& table_name) {
-  shared_ptr<KuduClient> client;
-      RETURN_NOT_OK(cluster->CreateClient(nullptr, &client));
+namespace {
+Status CreateTableWithClient(KuduClient* client, const std::string& 
table_name) {
   KuduSchema schema;
   KuduSchemaBuilder b;
   b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
-      RETURN_NOT_OK(b.Build(&schema));
+  RETURN_NOT_OK(b.Build(&schema));
   unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
   return table_creator->table_name(table_name)
       .schema(&schema)
@@ -120,6 +128,153 @@ static Status CreateTable(ExternalMiniCluster* cluster,
       .num_replicas(1)
       .Create();
 }
+Status CreateTable(ExternalMiniCluster* cluster,
+                   const std::string& table_name) {
+  shared_ptr<KuduClient> client;
+  RETURN_NOT_OK(cluster->CreateClient(nullptr, &client));
+  return CreateTableWithClient(client.get(), table_name);
+}
+
+Status ReserveSocketForMaster(int master_idx, unique_ptr<Socket>* socket,
+                              Sockaddr* addr, HostPort* hp) {
+  unique_ptr<Socket> s;
+  Sockaddr a;
+  RETURN_NOT_OK(MiniCluster::ReserveDaemonSocket(MiniCluster::MASTER, 
master_idx,
+                                                 kDefaultBindMode, &s));
+  RETURN_NOT_OK(s->GetSocketAddress(&a));
+  *socket = std::move(s);
+  *addr = a;
+  *hp = HostPort(a);
+  return Status::OK();
+}
+
+// Functor that takes a leader_master_idx and runs the desired master RPC 
against
+// the leader master returning the RPC status and the optional 
MasterErrorPB::Code.
+typedef std::function<
+    std::pair<Status, boost::optional<MasterErrorPB::Code>>(int 
leader_master_idx)> MasterRPC;
+
+// Helper function that runs the master RPC against the leader master and 
retries the RPC
+// if the expected leader master returns NOT_THE_LEADER error due to 
leadership change.
+// Returns a single combined Status:
+//   - RPC return status if not OK.
+//   - IllegalState for a master response error other than NOT_THE_LEADER 
error.
+//   - TimedOut if all attempts to run the RPC against leader master are 
exhausted.
+//   - OK if the master RPC is successful.
+Status RunLeaderMasterRPC(const MasterRPC& master_rpc, ExternalMiniCluster* 
cluster) {
+  int64_t time_left_to_sleep_msecs = 2000;
+  while (time_left_to_sleep_msecs > 0) {
+    int leader_master_idx;
+    RETURN_NOT_OK(cluster->GetLeaderMasterIndex(&leader_master_idx));
+    const auto& rpc_result = master_rpc(leader_master_idx);
+    RETURN_NOT_OK(rpc_result.first);
+    const auto& master_error = rpc_result.second;
+    if (!master_error) {
+      return Status::OK();
+    }
+    if (master_error != MasterErrorPB::NOT_THE_LEADER) {
+      // Some other master error.
+      return Status::IllegalState(Substitute("Master error: $0"),
+                                  MasterErrorPB_Code_Name(*master_error));
+    }
+    // NOT_THE_LEADER error, so retry after some duration.
+    static const MonoDelta kSleepDuration = MonoDelta::FromMilliseconds(100);
+    SleepFor(kSleepDuration);
+    time_left_to_sleep_msecs -= kSleepDuration.ToMilliseconds();
+  }
+  return Status::TimedOut("Failed contacting the right leader master after 
multiple attempts");
+}
+
+// Run ListMasters RPC, retrying on leadership change, returning the response
+// in 'resp'.
+Status RunListMasters(ListMastersResponsePB* resp, ExternalMiniCluster* 
cluster) {
+  auto list_masters = [&] (int leader_master_idx) {
+    ListMastersRequestPB req;
+    RpcController rpc;
+    Status s = cluster->master_proxy(leader_master_idx)->ListMasters(req, 
resp, &rpc);
+    boost::optional<MasterErrorPB::Code> err_code(resp->has_error(), 
resp->error().code());
+    return std::make_pair(s, err_code);
+  };
+  return RunLeaderMasterRPC(list_masters, cluster);
+}
+
+// Verify the ExternalMiniCluster 'cluster' contains 'num_masters' overall and
+// are all VOTERS. Populates the new master addresses in 'master_hps', if not
+// nullptr. Returns an error if the expected state is not present.
+Status VerifyVoterMastersForCluster(int num_masters, vector<HostPort>* 
master_hps,
+                                    ExternalMiniCluster* cluster) {
+  ListMastersResponsePB resp;
+  RETURN_NOT_OK(RunListMasters(&resp, cluster));
+  if (num_masters != resp.masters_size()) {
+    return Status::IllegalState(Substitute("expected $0 masters but got $1",
+                                           num_masters, resp.masters_size()));
+  }
+  vector<HostPort> hps;
+  for (const auto& master : resp.masters()) {
+    if ((master.role() != RaftPeerPB::LEADER && master.role() != 
RaftPeerPB::FOLLOWER) ||
+        master.member_type() != RaftPeerPB::VOTER ||
+        master.registration().rpc_addresses_size() != 1) {
+      return Status::IllegalState(Substitute("bad master: $0", 
SecureShortDebugString(master)));
+    }
+    hps.emplace_back(HostPortFromPB(master.registration().rpc_addresses(0)));
+  }
+  if (master_hps) {
+    *master_hps = std::move(hps);
+  }
+  return Status::OK();
+}
+
+// Initiates leadership transfer to the specified master returning status of
+// the request. The request is performed synchronously, though the transfer of
+// leadership is asynchronous -- callers need to wait to ensure leadership is
+// actually transferred.
+Status TransferMasterLeadershipAsync(ExternalMiniCluster* cluster, const 
string& master_uuid) {
+  int leader_master_idx;
+  RETURN_NOT_OK(cluster->GetLeaderMasterIndex(&leader_master_idx));
+  auto leader_master_addr = 
cluster->master(leader_master_idx)->bound_rpc_addr();
+  ConsensusServiceProxy consensus_proxy(cluster->messenger(), 
leader_master_addr,
+                                        leader_master_addr.host());
+  LeaderStepDownRequestPB req;
+  req.set_dest_uuid(cluster->master(leader_master_idx)->uuid());
+  req.set_tablet_id(master::SysCatalogTable::kSysCatalogTabletId);
+  req.set_new_leader_uuid(master_uuid);
+  req.set_mode(consensus::GRACEFUL);
+  LeaderStepDownResponsePB resp;
+  RpcController rpc;
+  RETURN_NOT_OK(consensus_proxy.LeaderStepDown(req, &resp, &rpc));
+  if (resp.has_error()) {
+    return StatusFromPB(resp.error().status());
+  }
+  return Status::OK();
+}
+
+// Transfers leadership among masters in the 'cluster' to the specified 
'new_master_uuid'
+// verifies the transfer is successful.
+void TransferMasterLeadership(ExternalMiniCluster* cluster, const string& 
new_master_uuid) {
+  ASSERT_OK(TransferMasterLeadershipAsync(cluster, new_master_uuid));
+  // It takes some time for the leadership transfer to complete, hence the
+  // ASSERT_EVENTUALLY.
+  ASSERT_EVENTUALLY([&] {
+    int leader_master_idx = -1;
+    ASSERT_OK(cluster->GetLeaderMasterIndex(&leader_master_idx));
+    ASSERT_EQ(new_master_uuid, cluster->master(leader_master_idx)->uuid());
+  });
+}
+
+// Fetch uuid of the specified 'fs_wal_dir' and 'fs_data_dirs'.
+Status GetFsUuid(const string& fs_wal_dir, const vector<string>& fs_data_dirs, 
string* uuid) {
+  google::FlagSaver saver;
+  FLAGS_fs_wal_dir = fs_wal_dir;
+  FLAGS_fs_data_dirs = JoinStrings(fs_data_dirs, ",");
+  FsManagerOpts fs_opts;
+  fs_opts.read_only = true;
+  fs_opts.update_instances = fs::UpdateInstanceBehavior::DONT_UPDATE;
+  FsManager fs_manager(Env::Default(), std::move(fs_opts));
+  RETURN_NOT_OK(fs_manager.PartialOpen());
+  *uuid = fs_manager.uuid();
+  return Status::OK();
+}
+
+} // anonymous namespace
 
 // Test class for testing addition/removal of masters to a Kudu cluster.
 class DynamicMultiMasterTest : public KuduTest {
@@ -131,11 +286,8 @@ class DynamicMultiMasterTest : public KuduTest {
     orig_num_masters_ = num_masters;
 
     // Reserving a port upfront for the new master that'll be added to the 
cluster.
-    ASSERT_OK(MiniCluster::ReserveDaemonSocket(MiniCluster::MASTER, 
orig_num_masters_ /* index */,
-                                               kDefaultBindMode, 
&reserved_socket_));
-
-    ASSERT_OK(reserved_socket_->GetSocketAddress(&reserved_addr_));
-    reserved_hp_ = HostPort(reserved_addr_);
+    ASSERT_OK(ReserveSocketForMaster(/*index*/orig_num_masters_, 
&reserved_socket_,
+                                     &reserved_addr_, &reserved_hp_));
   }
 
   void StartCluster(const vector<string>& extra_master_flags = {},
@@ -143,6 +295,7 @@ class DynamicMultiMasterTest : public KuduTest {
     opts_.num_masters = orig_num_masters_;
     opts_.supply_single_master_addr = supply_single_master_addr;
     opts_.extra_master_flags = extra_master_flags;
+    opts_.extra_master_flags.emplace_back("--master_auto_join_cluster=false");
 
     cluster_.reset(new ExternalMiniCluster(opts_));
     ASSERT_OK(cluster_->Start());
@@ -225,80 +378,6 @@ class DynamicMultiMasterTest : public KuduTest {
     ASSERT_TRUE(wal_gc_counts_updated) << "Timed out waiting for system 
catalog WAL to be GC'ed";
   }
 
-  // Functor that takes a leader_master_idx and runs the desired master RPC 
against
-  // the leader master returning the RPC status and the optional 
MasterErrorPB::Code.
-  typedef std::function<
-      std::pair<Status, boost::optional<MasterErrorPB::Code>>(int 
leader_master_idx)> MasterRPC;
-
-  // Helper function that runs the master RPC against the leader master and 
retries the RPC
-  // if the expected leader master returns NOT_THE_LEADER error due to 
leadership change.
-  // Returns a single combined Status:
-  //   - RPC return status if not OK.
-  //   - IllegalState for a master response error other than NOT_THE_LEADER 
error.
-  //   - TimedOut if all attempts to run the RPC against leader master are 
exhausted.
-  //   - OK if the master RPC is successful.
-  Status RunLeaderMasterRPC(const MasterRPC& master_rpc, ExternalMiniCluster* 
cluster = nullptr) {
-    if (cluster == nullptr) {
-      cluster = cluster_.get();
-    }
-
-    int64_t time_left_to_sleep_msecs = 2000;
-    while (time_left_to_sleep_msecs > 0) {
-      int leader_master_idx;
-      RETURN_NOT_OK(cluster->GetLeaderMasterIndex(&leader_master_idx));
-      const auto& rpc_result = master_rpc(leader_master_idx);
-      RETURN_NOT_OK(rpc_result.first);
-      const auto& master_error = rpc_result.second;
-      if (!master_error) {
-        return Status::OK();
-      }
-      if (master_error != MasterErrorPB::NOT_THE_LEADER) {
-        // Some other master error.
-        return Status::IllegalState(Substitute("Master error: $0"),
-                                    MasterErrorPB_Code_Name(*master_error));
-      }
-      // NOT_THE_LEADER error, so retry after some duration.
-      static const MonoDelta kSleepDuration = MonoDelta::FromMilliseconds(100);
-      SleepFor(kSleepDuration);
-      time_left_to_sleep_msecs -= kSleepDuration.ToMilliseconds();
-    }
-    return Status::TimedOut("Failed contacting the right leader master after 
multiple attempts");
-  }
-
-  // Run ListMasters RPC, retrying on leadership change, returning the 
response in 'resp'.
-  void RunListMasters(ListMastersResponsePB* resp, ExternalMiniCluster* 
cluster = nullptr) {
-    if (cluster == nullptr) {
-      cluster = cluster_.get();
-    }
-    auto list_masters = [&] (int leader_master_idx) {
-      ListMastersRequestPB req;
-      RpcController rpc;
-      Status s = cluster->master_proxy(leader_master_idx)->ListMasters(req, 
resp, &rpc);
-      boost::optional<MasterErrorPB::Code> err_code(resp->has_error(), 
resp->error().code());
-      return std::make_pair(s, err_code);
-    };
-    ASSERT_OK(RunLeaderMasterRPC(list_masters, cluster));
-  }
-
-  // Verify the ExternalMiniCluster 'cluster' contains 'num_masters' overall
-  // and are all VOTERS.
-  // Return master addresses in 'master_hps', if not nullptr.
-  void VerifyVoterMasters(int num_masters, vector<HostPort>* master_hps = 
nullptr,
-                          ExternalMiniCluster* cluster = nullptr) {
-    ListMastersResponsePB resp;
-    NO_FATALS(RunListMasters(&resp, cluster));
-    ASSERT_EQ(num_masters, resp.masters_size());
-    if (master_hps) master_hps->clear();
-    for (const auto& master : resp.masters()) {
-      ASSERT_TRUE(master.role() == RaftPeerPB::LEADER || master.role() == 
RaftPeerPB::FOLLOWER);
-      ASSERT_EQ(RaftPeerPB::VOTER, master.member_type());
-      ASSERT_EQ(1, master.registration().rpc_addresses_size());
-      if (master_hps) {
-        
master_hps->emplace_back(HostPortFromPB(master.registration().rpc_addresses(0)));
-      }
-    }
-  }
-
   // Brings up a new master 'new_master_hp' where 'master_hps' contains master 
addresses including
   // the new master to be added at the index 'new_master_idx' in the 
ExternalMiniCluster.
   void StartNewMaster(const vector<HostPort>& master_hps,
@@ -314,9 +393,11 @@ class DynamicMultiMasterTest : public KuduTest {
     ExternalDaemonOptions new_master_opts;
     ASSERT_OK(BuildMasterOpts(new_master_idx, new_master_hp, 
&new_master_opts));
     auto& flags = new_master_opts.extra_flags;
-    flags.insert(flags.end(),
-                 {"--master_addresses=" + JoinStrings(master_addresses, ","),
-                  "--master_address_add_new_master=" + 
new_master_hp.ToString()});
+    flags.insert(flags.end(), {
+        "--master_addresses=" + JoinStrings(master_addresses, ","),
+        "--master_address_add_new_master=" + new_master_hp.ToString(),
+        "--master_auto_join_cluster=false",
+    });
 
     LOG(INFO) << "Bringing up the new master at: " << new_master_hp.ToString();
     scoped_refptr<ExternalMaster> master = new ExternalMaster(new_master_opts);
@@ -338,6 +419,14 @@ class DynamicMultiMasterTest : public KuduTest {
     *new_master_out = std::move(master);
   }
 
+  void VerifyVoterMasters(int num_masters, vector<HostPort>* master_hps = 
nullptr,
+                          ExternalMiniCluster* cluster = nullptr) {
+    if (cluster == nullptr) {
+      cluster = cluster_.get();
+    }
+    NO_FATALS(VerifyVoterMastersForCluster(num_masters, master_hps, cluster));
+  }
+
   // Fetch a follower (non-leader) master index from the cluster.
   Status GetFollowerMasterIndex(int* follower_master_idx) {
     int leader_master_idx;
@@ -492,59 +581,6 @@ class DynamicMultiMasterTest : public KuduTest {
     ASSERT_TRUE(dead_master_found);
   }
 
-  // Initiates leadership transfer to the specified master returning status of 
the asynchronous
-  // request.
-  static Status TransferMasterLeadershipAsync(ExternalMiniCluster* cluster,
-                                              const string& master_uuid) {
-    LOG(INFO) << "Transferring leadership to master: " << master_uuid;
-
-    int leader_master_idx;
-    RETURN_NOT_OK(cluster->GetLeaderMasterIndex(&leader_master_idx));
-    auto leader_master_addr = 
cluster->master(leader_master_idx)->bound_rpc_addr();
-    ConsensusServiceProxy consensus_proxy(cluster->messenger(), 
leader_master_addr,
-                                          leader_master_addr.host());
-    LeaderStepDownRequestPB req;
-    req.set_dest_uuid(cluster->master(leader_master_idx)->uuid());
-    req.set_tablet_id(master::SysCatalogTable::kSysCatalogTabletId);
-    req.set_new_leader_uuid(master_uuid);
-    req.set_mode(consensus::GRACEFUL);
-    LeaderStepDownResponsePB resp;
-    RpcController rpc;
-    RETURN_NOT_OK(consensus_proxy.LeaderStepDown(req, &resp, &rpc));
-    if (resp.has_error()) {
-      return StatusFromPB(resp.error().status());
-    }
-    return Status::OK();
-  }
-
-  // Transfers leadership among masters in the 'cluster' to the specified 
'new_master_uuid'
-  // verifies the transfer is successful.
-  static void TransferMasterLeadership(ExternalMiniCluster* cluster,
-                                       const string& new_master_uuid) {
-    ASSERT_OK(TransferMasterLeadershipAsync(cluster, new_master_uuid));
-    // LeaderStepDown request is asynchronous, hence using ASSERT_EVENTUALLY.
-    ASSERT_EVENTUALLY([&] {
-      int leader_master_idx = -1;
-      ASSERT_OK(cluster->GetLeaderMasterIndex(&leader_master_idx));
-      ASSERT_EQ(new_master_uuid, cluster->master(leader_master_idx)->uuid());
-    });
-  }
-
-  // Fetch uuid of the specified 'fs_wal_dir' and 'fs_data_dirs'.
-  static Status GetFsUuid(const string& fs_wal_dir, const vector<string>& 
fs_data_dirs,
-                          string* uuid) {
-    google::FlagSaver saver;
-    FLAGS_fs_wal_dir = fs_wal_dir;
-    FLAGS_fs_data_dirs = JoinStrings(fs_data_dirs, ",");
-    FsManagerOpts fs_opts;
-    fs_opts.read_only = true;
-    fs_opts.update_instances = fs::UpdateInstanceBehavior::DONT_UPDATE;
-    FsManager fs_manager(Env::Default(), std::move(fs_opts));
-    RETURN_NOT_OK(fs_manager.PartialOpen());
-    *uuid = fs_manager.uuid();
-    return Status::OK();
-  }
-
   // Verification steps after the new master has been added successfully and 
it's promoted
   // as VOTER. The supplied 'master_hps' includes the new_master as well.
   void VerifyClusterAfterMasterAddition(const vector<HostPort>& master_hps,
@@ -580,7 +616,7 @@ class DynamicMultiMasterTest : public KuduTest {
     // Verify the cluster still has the same masters.
     {
       ListMastersResponsePB resp;
-      NO_FATALS(RunListMasters(&resp, &migrated_cluster));
+      ASSERT_OK(RunListMasters(&resp, &migrated_cluster));
       ASSERT_EQ(expected_num_masters, resp.masters_size());
 
       UnorderedHostPortSet hps_found;
@@ -600,6 +636,7 @@ class DynamicMultiMasterTest : public KuduTest {
     }
 
     // Transfer leadership to the new master.
+    LOG(INFO) << "Transferring leadership to master: " << new_master_uuid;
     NO_FATALS(TransferMasterLeadership(&migrated_cluster, new_master_uuid));
 
     shared_ptr<KuduClient> client;
@@ -891,6 +928,7 @@ TEST_P(ParameterizedRemoveMasterTest, TestRemoveMaster) {
   if (leader_master_idx == non_leader_master_idx) {
     // Move the leader to the first master index
     auto first_master_uuid = cluster_->master(0)->uuid();
+    LOG(INFO) << "Transferring leadership to master: " << first_master_uuid;
     NO_FATALS(TransferMasterLeadership(cluster_.get(), first_master_uuid));
   }
   ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_master_idx));
@@ -939,6 +977,7 @@ TEST_P(ParameterizedRemoveMasterTest, TestRemoveMaster) {
   ASSERT_STR_CONTAINS(err, Substitute("Master $0 not found", 
master_to_remove.ToString()));
 
   // Attempt transferring leadership to the removed master
+  LOG(INFO) << "Transferring leadership to master: " << master_to_remove_uuid;
   s = TransferMasterLeadershipAsync(cluster_.get(), master_to_remove_uuid);
   ASSERT_TRUE(s.IsInvalidArgument());
   ASSERT_STR_CONTAINS(s.ToString(),
@@ -1368,5 +1407,315 @@ TEST_P(ParameterizedRemoveLeaderMasterTest, 
TestRemoveLeaderMaster) {
   NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps));
 }
 
+struct MultiMasterClusterArgs {
+  int orig_num_masters;
+  bool is_secure;
+};
+
+class AutoAddMasterTest : public KuduTest {
+ public:
+  Status SetUpWithTestArgs(const MultiMasterClusterArgs& args) {
+    opts_.num_masters = args.orig_num_masters;
+    opts_.enable_kerberos = args.is_secure;
+    args_ = args;
+    cluster_.reset(new ExternalMiniCluster(opts_));
+    RETURN_NOT_OK(cluster_->Start());
+    return cluster_->WaitForTabletServerCount(cluster_->num_tablet_servers(),
+                                              MonoDelta::FromSeconds(10));
+  }
+  void SetUp() override {
+    ASSERT_OK(SetUpWithTestArgs({ /*orig_num_masters*/2, /*is_secure*/false 
}));
+    TestWorkload w(cluster_.get());
+    w.set_num_replicas(1);
+    w.Setup();
+  }
+ protected:
+  MultiMasterClusterArgs args_;
+  ExternalMiniClusterOptions opts_;
+  unique_ptr<ExternalMiniCluster> cluster_;
+};
+
+// Test that nothing goes wrong when starting up masters but the entire cluster
+// isn't fully healthy. The auto-add checks should still run, but should be
+// inconsequential if they fail because the entire cluster isn't healthy.
+TEST_F(AutoAddMasterTest, TestRestartMastersWhileSomeDown) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  // We'll start with three masters, and then restart two, leaving one down.
+  ASSERT_OK(cluster_->AddMaster());
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(VerifyVoterMastersForCluster(cluster_->num_masters(), nullptr, 
cluster_.get()));
+  });
+
+  // Emulate one of the masters going down by only restarting two.
+  cluster_->Shutdown();
+  for (int i = 1; i < cluster_->num_masters(); i++) {
+    ASSERT_OK(cluster_->master(i)->Restart());
+  }
+  int table_idx = 0;
+  constexpr const char* kTablePrefix = "default.table";
+  const auto& deadline = MonoTime::Now() + MonoDelta::FromSeconds(30);
+  while (MonoTime::Now() > deadline) {
+    SleepFor(MonoDelta::FromSeconds(1));
+    // Nothing sinister should happen despite one master being down. The
+    // remaining masters should be operable and alive.
+    ASSERT_OK(CreateTable(cluster_.get(), Substitute("$0-$1", kTablePrefix, 
++table_idx)));
+    for (int i = 1; i < cluster_->num_masters(); i++) {
+      ASSERT_TRUE(cluster_->master(i)->IsProcessAlive());
+    }
+  }
+  ASSERT_OK(cluster_->master(0)->Restart());
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(VerifyVoterMastersForCluster(cluster_->num_masters(), nullptr, 
cluster_.get()));
+  });
+}
+
+// Test the procedure when some masters aren't reachable.
+TEST_F(AutoAddMasterTest, TestSomeMastersUnreachable) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  auto* stopped_master = cluster_->master(0);
+  ASSERT_OK(stopped_master->Pause());
+  // Adding a master to a cluster wherein a master is already down will fail.
+  // This is similar behavior to starting a new master while some are down
+  // since the new master can't resolve all peers' UUIDs. Shorten the time
+  // masters will wait to communicate to all peers to speed up this test.
+  ASSERT_OK(cluster_->AddMaster({ "--raft_get_node_instance_timeout_ms=3000" 
}));
+  auto* new_master = cluster_->master(args_.orig_num_masters);
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_FALSE(new_master->IsProcessAlive());
+  });
+  ASSERT_OK(stopped_master->Resume());
+
+  // Even after restarting, we still won't be quite able to start healthily
+  // because our previous crashes will have left an unusable set of metadata
+  // (i.e. no consensus metadata).
+  new_master->Shutdown();
+  ASSERT_OK(new_master->Restart());
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_FALSE(new_master->IsProcessAlive());
+  });
+  // If we blow away our new master and start anew, we should be able to
+  // proceed.
+  new_master->Shutdown();
+  ASSERT_OK(new_master->DeleteFromDisk());
+  ASSERT_OK(new_master->Restart());
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(VerifyVoterMastersForCluster(cluster_->num_masters(), nullptr, 
cluster_.get()));
+  });
+  // Ensure that even after waiting a bit, our cluster is stable.
+  SleepFor(MonoDelta::FromSeconds(3));
+  ASSERT_OK(VerifyVoterMastersForCluster(cluster_->num_masters(), nullptr, 
cluster_.get()));
+}
+
+// Test if we fail to replicate the AddMaster request.
+TEST_F(AutoAddMasterTest, TestFailWithoutReplicatingAddMaster) {
+  // Make master followers unable to accept updates, including config changes.
+  // We'll set this for all masters including leaders for simplicity.
+  for (int i = 0; i < cluster_->num_masters(); i++) {
+    ASSERT_OK(cluster_->SetFlag(cluster_->master(i),
+                                "follower_reject_update_consensus_requests", 
"true"));
+  }
+  // Upon starting, the master will attempt to add itself, but fail to do so
+  // and exit early.
+  ASSERT_OK(cluster_->AddMaster());
+  auto* new_master = cluster_->master(args_.orig_num_masters);
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_FALSE(new_master->IsProcessAlive());
+  });
+  // Since nothing was successfully replicated, it shouldn't be a problem to
+  // start up again and re-add.
+  new_master->Shutdown();
+  for (int i = 0; i < cluster_->num_masters() - 1; i++) {
+    ASSERT_OK(cluster_->SetFlag(cluster_->master(i),
+                                "follower_reject_update_consensus_requests", 
"false"));
+  }
+  ASSERT_OK(new_master->Restart());
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(VerifyVoterMastersForCluster(cluster_->num_masters(), nullptr, 
cluster_.get()));
+  });
+}
+
+// Test when the new master fails to copy.
+TEST_F(AutoAddMasterTest, TestFailTabletCopy) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  ASSERT_OK(cluster_->AddMaster({ 
"--tablet_copy_fault_crash_during_download_wal=1" }));
+  auto* new_master = cluster_->master(args_.orig_num_masters);
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_FALSE(new_master->IsProcessAlive());
+  });
+  // We should have been able to add the master to the Raft quorum, but been
+  // able to copy. Upon doing so, the new master should fail to come up.
+  new_master->Shutdown();
+  
new_master->mutable_flags()->emplace_back("--tablet_copy_fault_crash_during_download_wal=0");
+  ASSERT_OK(new_master->Restart());
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_FALSE(new_master->IsProcessAlive());
+  });
+
+  // Even blowing the new master away entirely will result in a new master
+  // being unable to join. The cluster already believes there to be a new
+  // master, but no live majority, so we're unable to add _another_ master.
+  ASSERT_OK(new_master->DeleteFromDisk());
+  new_master->Shutdown();
+  ASSERT_OK(new_master->Restart());
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_FALSE(new_master->IsProcessAlive());
+  });
+  new_master->Shutdown();
+
+  // So, we first need to remove the master from the quorum, and then restart,
+  // at which point the new master should be able to join the cluster.
+  vector<string> addresses;
+  for (const auto& hp : cluster_->master_rpc_addrs()) {
+    addresses.emplace_back(hp.ToString());
+  }
+  // TODO(awong): we should really consider automating this step from the
+  // leader master.
+  ASSERT_OK(tools::RunKuduTool({ "master", "remove", JoinStrings(addresses, 
","),
+                                 new_master->bound_rpc_hostport().ToString() 
}));
+  ASSERT_OK(new_master->Restart());
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(VerifyVoterMastersForCluster(cluster_->num_masters(), nullptr, 
cluster_.get()));
+  });
+  SleepFor(MonoDelta::FromSeconds(3));
+  ASSERT_OK(VerifyVoterMastersForCluster(cluster_->num_masters(), nullptr, 
cluster_.get()));
+}
+
+TEST_F(AutoAddMasterTest, TestAddWithOnGoingDdl) {
+  simple_spinlock master_addrs_lock;
+  vector<string> master_addrs_unlocked;
+  for (const auto& hp : cluster_->master_rpc_addrs()) {
+    master_addrs_unlocked.emplace_back(hp.ToString());
+  }
+
+  // Start a thread that creates a client and tries to create tables.
+  const auto generate_client = [&] (shared_ptr<KuduClient>* c) {
+    vector<string> master_addrs;
+    {
+      std::lock_guard<simple_spinlock> l(master_addrs_lock);
+      master_addrs = master_addrs_unlocked;
+    }
+    shared_ptr<KuduClient> client;
+    RETURN_NOT_OK(client::KuduClientBuilder()
+        .master_server_addrs(master_addrs)
+        .Build(&client));
+    *c = std::move(client);
+    return Status::OK();
+  };
+
+  atomic<bool> proceed = true;
+  constexpr const int kNumThreads = 2;
+  vector<thread> threads;
+  threads.reserve(kNumThreads);
+  vector<Status> errors(kNumThreads);
+  for (int i = 0; i < kNumThreads; i++) {
+    threads.emplace_back([&, i] {
+      int idx = 0;
+      while (proceed) {
+        client::sp::shared_ptr<KuduClient> c;
+        Status s = generate_client(&c).AndThen([&] {
+          return CreateTableWithClient(c.get(), Substitute("default.$0_$1", i, 
++idx));
+        });
+        if (!s.ok()) {
+          errors[i] = s;
+        }
+        SleepFor(MonoDelta::FromSeconds(1));
+      }
+    });
+  }
+  auto thread_joiner = MakeScopedCleanup([&] {
+    proceed = false;
+    for (auto& t : threads) {
+      t.join();
+    }
+  });
+
+  int num_masters = args_.orig_num_masters;
+  for (int i = 0; i < 3; i++) {
+    ASSERT_OK(cluster_->AddMaster());
+    auto* new_master = cluster_->master(args_.orig_num_masters);
+    ASSERT_OK(new_master->WaitForCatalogManager());
+    num_masters++;
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_OK(VerifyVoterMastersForCluster(num_masters, nullptr, 
cluster_.get()));
+    });
+    {
+      std::lock_guard<simple_spinlock> l(master_addrs_lock);
+      
master_addrs_unlocked.emplace_back(new_master->bound_rpc_hostport().ToString());
+    }
+    cluster_->Shutdown();
+    ASSERT_OK(cluster_->Restart());
+    
ASSERT_OK(cluster_->WaitForTabletServerCount(cluster_->num_tablet_servers(),
+                                                 MonoDelta::FromSeconds(5)));
+  }
+  proceed = false;
+  thread_joiner.cancel();
+  for (auto& t : threads) {
+    t.join();
+  }
+  for (const auto& e : errors) {
+    if (e.ok() || e.IsTimedOut()) {
+      continue;
+    }
+    // TODO(awong): we should relax the need for clients to have the precise
+    // list of masters.
+    if (e.IsConfigurationError()) {
+      ASSERT_STR_CONTAINS(e.ToString(), "cluster indicates it expects");
+      continue;
+    }
+    // TODO(KUDU-1358): we should probably allow clients to retry if the RF is
+    // within some normal-looking range.
+    ASSERT_TRUE(e.IsInvalidArgument()) << e.ToString();
+    ASSERT_STR_CONTAINS(e.ToString(), "not enough live tablet servers");
+  }
+}
+
+class ParameterizedAutoAddMasterTest : public AutoAddMasterTest,
+                                       public 
::testing::WithParamInterface<tuple<int, bool>> {
+ public:
+  void SetUp() override {
+    ASSERT_OK(SetUpWithTestArgs({ /*orig_num_masters*/std::get<0>(GetParam()),
+                                  /*is_secure*/std::get<1>(GetParam()) }));
+  }
+};
+
+TEST_P(ParameterizedAutoAddMasterTest, TestBasicAddition) {
+  TestWorkload w(cluster_.get());
+  w.set_num_replicas(1);
+  w.Setup();
+  w.Start();
+  int num_masters = args_.orig_num_masters;
+  for (int i = 0; i < 3; i++) {
+    ASSERT_OK(cluster_->AddMaster());
+    auto* new_master = cluster_->master(args_.orig_num_masters);
+    ASSERT_OK(new_master->WaitForCatalogManager());
+    num_masters++;
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_OK(VerifyVoterMastersForCluster(num_masters, nullptr, 
cluster_.get()));
+    });
+  }
+  w.StopAndJoin();
+  ClusterVerifier cv(cluster_.get());
+  NO_FATALS(cv.CheckCluster());
+  NO_FATALS(cv.CheckRowCount(w.kDefaultTableName, ClusterVerifier::EXACTLY, 
w.rows_inserted()));
+
+  cluster_->Shutdown();
+  ASSERT_OK(cluster_->Restart());
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(VerifyVoterMastersForCluster(num_masters, nullptr, 
cluster_.get()));
+  });
+
+  NO_FATALS(cv.CheckCluster());
+  NO_FATALS(cv.CheckRowCount(w.kDefaultTableName, ClusterVerifier::EXACTLY, 
w.rows_inserted()));
+}
+
+INSTANTIATE_TEST_SUITE_P(,
+    ParameterizedAutoAddMasterTest, ::testing::Combine(
+                                        ::testing::Values(1, 2),
+                                        ::testing::Bool()),
+    [] (const 
::testing::TestParamInfo<ParameterizedAutoAddMasterTest::ParamType>& info) {
+      return Substitute("$0_orig_masters_$1secure", std::get<0>(info.param),
+                        std::get<1>(info.param) ? "" : "not_");
+    });
+
 } // namespace master
 } // namespace kudu
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index c1dee96..850cba9 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -1140,7 +1140,10 @@ service MasterService {
 
   // Add a new master to the existing cluster.
   rpc AddMaster(AddMasterRequestPB) returns (AddMasterResponsePB) {
-    option (kudu.rpc.authz_method) = "AuthorizeSuperUser";
+    // NOTE: the master service may send this RPC as a part of its automated
+    // addition of masters to existing clusters, hence the inclusion of the
+    // service user.
+    option (kudu.rpc.authz_method) = "AuthorizeClientOrServiceUser";
   }
 
   // Remove a master from the existing cluster.
diff --git a/src/kudu/master/master_options.h b/src/kudu/master/master_options.h
index b9de110..be11f00 100644
--- a/src/kudu/master/master_options.h
+++ b/src/kudu/master/master_options.h
@@ -34,9 +34,9 @@ namespace master {
 struct MasterOptions : public kserver::KuduServerOptions {
   MasterOptions();
 
-  // Fetch master addresses from the user supplied gflags which may be empty 
for single
-  // master configuration.
-  // Note: Only to be used during master init time as masters can be 
added/removed dynamically.
+  // Fetch master addresses from the user supplied gflags which may be empty
+  // for single master configuration.
+  // NOTE: Only to be used during master init time as masters can be 
added/removed dynamically.
   // Use Master::GetMasterHostPorts() instead after initializing the master at 
runtime.
   const std::vector<HostPort>& master_addresses() const {
     return master_addresses_;
@@ -65,6 +65,7 @@ struct MasterOptions : public kserver::KuduServerOptions {
   }
 
  private:
+  // The list of deduplicated masters, as specified by --master_addresses.
   std::vector<HostPort> master_addresses_;
   Cache::ExistingMetricsPolicy block_cache_metrics_policy_;
 };
diff --git a/src/kudu/master/master_runner.cc b/src/kudu/master/master_runner.cc
index 8e9e0f9..77bb386 100644
--- a/src/kudu/master/master_runner.cc
+++ b/src/kudu/master/master_runner.cc
@@ -18,25 +18,97 @@
 #include "kudu/master/master_runner.h"
 
 #include <cstdint>
+#include <functional>
 #include <iostream>
+#include <memory>
+#include <set>
 #include <string>
+#include <utility>
+#include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 
+#include "kudu/client/client.h"
+#include "kudu/client/master_proxy_rpc.h"
+#include "kudu/common/common.pb.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/consensus.proxy.h"
+#include "kudu/consensus/consensus_meta_manager.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/fs/fs_manager.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/master.h"
+#include "kudu/master/master.pb.h"
+#include "kudu/master/master.proxy.h"
+#include "kudu/master/master_options.h"
+#include "kudu/master/sys_catalog.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/response_callback.h"
+#include "kudu/rpc/rpc.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/tablet/metadata.pb.h"
+#include "kudu/tablet/tablet_metadata.h"
+#include "kudu/tablet/tablet_replica.h"
+#include "kudu/tserver/tablet_copy_client.h"
+#include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/util/async_util.h"
+#include "kudu/util/env.h"
+#include "kudu/util/flag_tags.h"
 #include "kudu/util/flags.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/pb_util.h"
 #include "kudu/util/version_info.h"
 
 using gflags::SET_FLAGS_DEFAULT;
+using kudu::client::KuduClientBuilder;
+using kudu::client::KuduClient;
+using kudu::client::KuduClientBuilder;
+using kudu::consensus::ConsensusServiceProxy;
+using kudu::consensus::ConsensusMetadataManager;
+using kudu::consensus::GetConsensusStateRequestPB;
+using kudu::consensus::GetConsensusStateResponsePB;
+using kudu::master::GetMasterRegistrationRequestPB;
+using kudu::master::GetMasterRegistrationResponsePB;
+using kudu::pb_util::SecureShortDebugString;
+using kudu::rpc::RpcController;
+using kudu::tablet::TabletDataState;
+using kudu::tablet::TabletMetadata;
+using kudu::tserver::TabletCopyClient;
+using kudu::tserver::TSTabletManager;
+using std::set;
 using std::string;
+using std::unique_ptr;
+using std::vector;
 using std::to_string;
+using strings::Substitute;
+
+DEFINE_bool(master_auto_join_cluster, true, "Whether Kudu Masters should 
automatically attempt "
+            "to join existing an existing cluster, specified by the master 
addresses");
+TAG_FLAG(master_auto_join_cluster, runtime);
+
+DEFINE_int64(master_auto_join_rpc_timeout_secs, 30, "The amount of time, in 
seconds, to use as a "
+             "timeout when sending RPCs to add this Master to a cluster.");
+TAG_FLAG(master_auto_join_rpc_timeout_secs, runtime);
+
+DEFINE_int64(master_auto_join_retry_interval_secs, 5, "The interval, in 
seconds, with which to "
+             "retry checks to determine whether this Master should be added to 
a cluster.");
+TAG_FLAG(master_auto_join_retry_interval_secs, runtime);
 
 DECLARE_bool(evict_failed_followers);
 
 DECLARE_bool(hive_metastore_sasl_enabled);
+DECLARE_string(master_addresses);
 DECLARE_string(keytab_file);
 
 DECLARE_bool(auto_rebalancing_enabled);
@@ -45,11 +117,13 @@ DECLARE_bool(raft_prepare_replacement_before_eviction);
 namespace kudu {
 namespace master {
 
+namespace {
+
 // Validates that if the HMS is configured with SASL enabled, the server has a
 // keytab available. This doesn't use a GROUP_FLAG_VALIDATOR because this check
 // only needs to be run on a server. E.g. tools that run with the HMS don't 
need
 // to pass in a keytab.
-static Status ValidateHiveMetastoreSaslEnabled() {
+Status ValidateHiveMetastoreSaslEnabled() {
     if (FLAGS_hive_metastore_sasl_enabled &&
         FLAGS_keytab_file.empty()) {
         return Status::ConfigurationError("When the Hive Metastore has SASL 
enabled "
@@ -59,6 +133,190 @@ static Status ValidateHiveMetastoreSaslEnabled() {
     return Status::OK();
 }
 
+// Verifies that:
+// - all masters are reachable,
+// - all masters agree on the term,
+// - there are no pending Raft configs,
+// - all masters agree on the Raft config index,
+// - there is a leader, and
+// - if we need to add 'local_uuid' to the Raft config, it is the only change
+//   required to get the Raft config to match the UUIDs corresponding to
+//   'master_addrs'.
+//
+// Returns an error if there is a non-retriable error (e.g. there might be a
+// misconfiguration) and we should not attempt to retry this method.
+//
+// If OK is returned, sets 'needs_retry' to true if there is a potentially
+// transient error, in which case calling the method again may help. If no
+// retriable error was encountered, 'needs_retry' is set to false.
+//
+// Sets 'leader_hp' and 'local_hp' to the current master leader's hostport and
+// the local hostport respectively. Sets 'needs_add' to whether 'local_uuid'
+// should be added to the Raft config. These out-parameters should only be
+// used if OK is returned and 'needs_retry' is set to false.
+Status VerifyMastersGetHostPorts(const vector<HostPort>& master_addrs,
+                                 const string& local_uuid,
+                                 const std::shared_ptr<rpc::Messenger>& 
messenger,
+                                 HostPort* leader_hp,
+                                 HostPort* local_hp,
+                                 bool* needs_retry,
+                                 bool* needs_add) {
+  vector<set<string>> each_remote_masters_master_uuids;
+  set<string> fetched_uuids;
+  int64_t current_term = -1;
+  int64_t committed_config_index = -1;
+  for (const auto& hp : master_addrs) {
+    Sockaddr master_addr;
+    RETURN_NOT_OK(SockaddrFromHostPort(hp, &master_addr));
+
+    // First, get the UUID of the remote master.
+    GetMasterRegistrationRequestPB reg_req;
+    GetMasterRegistrationResponsePB reg_resp;
+    RpcController reg_rpc;
+    MasterServiceProxy proxy(messenger, master_addr, master_addr.host());
+    Status s = proxy.GetMasterRegistration(reg_req, &reg_resp, &reg_rpc);
+    if (!s.ok() || reg_resp.has_error()) {
+      LOG(INFO) << Substitute("Error getting master registration for $0: $1, 
$2",
+                              master_addr.ToString(), s.ToString(),
+                              SecureShortDebugString(reg_resp));
+      *needs_retry = true;
+      return Status::OK();
+    }
+    const bool is_leader = reg_resp.role() == consensus::RaftPeerPB::LEADER;
+    if (is_leader) {
+      *leader_hp = hp;
+    }
+    // Skip the local master -- we only care about what the other masters
+    // think, in case we should be trying to join their quorum.
+    const auto& uuid = reg_resp.instance_id().permanent_uuid();
+    EmplaceIfNotPresent(&fetched_uuids, uuid);
+    if (local_uuid == uuid) {
+      *local_hp = hp;
+      continue;
+    }
+
+    // Get the Raft config from the remote master to get their quorum's
+    // UUIDs.
+    RpcController rpc;
+    GetConsensusStateRequestPB req;
+    req.add_tablet_ids(SysCatalogTable::kSysCatalogTabletId);
+    req.set_dest_uuid(uuid);
+    req.set_report_health(consensus::INCLUDE_HEALTH_REPORT);
+    GetConsensusStateResponsePB resp;
+    ConsensusServiceProxy consensus_proxy(messenger, master_addr, 
master_addr.host());
+    s = consensus_proxy.GetConsensusState(req, &resp, &rpc);
+    if (!s.ok() || resp.has_error()) {
+      LOG(INFO) << Substitute("Error getting master consensus for $0: $1",
+                              master_addr.ToString(), s.ToString());
+      *needs_retry = true;
+      return Status::OK();
+    }
+    if (resp.tablets_size() != 1) {
+      return Status::Corruption(
+          Substitute("Error getting master consensus, expected one tablet but 
got $0: $1",
+                     resp.tablets_size(), SecureShortDebugString(resp)));
+    }
+    // Retry if the the masters don't agree on the current term.
+    const auto& cstate = resp.tablets(0).cstate();
+    if (current_term == -1) {
+      current_term = cstate.current_term();
+    }
+    if (cstate.current_term() != current_term) {
+      LOG(INFO) << Substitute("Existing masters have differing terms: $0 vs 
$1",
+                              current_term, cstate.current_term());
+      *needs_retry = true;
+      return Status::OK();
+    }
+    // Retry if there's a pending config -- presumably pending means it's
+    // transient.
+    if (cstate.has_pending_config()) {
+      LOG(INFO) << Substitute("Existing masters have pending config: $0",
+                              SecureShortDebugString(cstate.pending_config()));
+      *needs_retry = true;
+      return Status::OK();
+    }
+    // Retry if the masters don't agree on the current Raft config's index.
+    if (committed_config_index == -1) {
+      committed_config_index = cstate.committed_config().opid_index();
+    }
+    if (cstate.committed_config().opid_index() != committed_config_index) {
+      LOG(INFO) << Substitute("Existing masters have differing Raft config 
indexes: $0 vs $1",
+                              committed_config_index, 
cstate.committed_config().opid_index());
+      *needs_retry = true;
+    }
+    const auto& config = cstate.committed_config();
+    set<string> uuids;
+    for (const auto& p : config.peers()) {
+      EmplaceIfNotPresent(&uuids, p.permanent_uuid());
+    }
+    each_remote_masters_master_uuids.emplace_back(std::move(uuids));
+
+  }
+  if (!leader_hp->Initialized()) {
+    LOG(INFO) << Substitute("No leader master found from master $0", 
local_uuid);
+    *needs_retry = true;
+    return Status::OK();
+  }
+  // Ensure the Raft configs from each master match. If not, presumably it's
+  // transient and should be retried.
+  auto& raft_config_uuids = each_remote_masters_master_uuids[0];
+  for (int i = 1; i < each_remote_masters_master_uuids.size(); i++) {
+    const auto& cur_uuids = each_remote_masters_master_uuids[i];
+    if (cur_uuids != raft_config_uuids) {
+      set<string> set_diff;
+      STLSetDifference(cur_uuids, raft_config_uuids, &set_diff);
+      LOG(INFO) << Substitute("Remote masters have differing Raft 
configurations:"
+                              "[$0] vs [$1] (diff: [$2])", 
JoinStrings(cur_uuids, ","),
+                              JoinStrings(raft_config_uuids, ","), 
JoinStrings(set_diff, ","));
+      *needs_retry = true;
+      return Status::OK();
+    }
+  }
+  // Ensure that if we need to add this master to the Raft config, it's the
+  // only one we need to add.
+  if (!ContainsKey(raft_config_uuids, local_uuid)) {
+    EmplaceIfNotPresent(&raft_config_uuids, local_uuid);
+    if (raft_config_uuids != fetched_uuids) {
+      set<string> set_diff;
+      STLSetDifference(fetched_uuids, raft_config_uuids, &set_diff);
+      return Status::NotSupported(Substitute("Kudu only supports adding one 
master at a time; "
+          "tentative Raft config doesn't match the UUIDs fetched from 
--master_addresses. "
+          "Raft config + local UUID: [$0] vs fetched UUIDs: [$1], diff: [$2]",
+          JoinStrings(raft_config_uuids, ","), JoinStrings(fetched_uuids, ","),
+          JoinStrings(set_diff, ",")));
+    }
+    *needs_add = true;
+  } else {
+    *needs_add = false;
+  }
+  *needs_retry = false;
+  return Status::OK();
+}
+
+// Deletes the local system catalog tablet and performs a copy from 'src_hp'.
+Status ClearLocalSystemCatalogAndCopy(const HostPort& src_hp) {
+  LOG(INFO) << "Clearing existing system tablet";
+  FsManager fs_manager(Env::Default(), FsManagerOpts());
+  RETURN_NOT_OK(fs_manager.Open());
+  scoped_refptr<ConsensusMetadataManager> cmeta_manager(
+      new ConsensusMetadataManager(&fs_manager));
+  scoped_refptr<TabletMetadata> meta;
+  RETURN_NOT_OK(TabletMetadata::Load(&fs_manager, 
SysCatalogTable::kSysCatalogTabletId, &meta));
+  RETURN_NOT_OK(TSTabletManager::DeleteTabletData(
+      meta, cmeta_manager, TabletDataState::TABLET_DATA_DELETED,
+      /*last_logged_opid*/boost::none));
+  LOG(INFO) << "Copying system tablet from " << src_hp.ToString();
+  std::shared_ptr<rpc::Messenger> messenger;
+  RETURN_NOT_OK(rpc::MessengerBuilder("tablet_copy_client").Build(&messenger));
+  TabletCopyClient copy_client(SysCatalogTable::kSysCatalogTabletId, 
&fs_manager, cmeta_manager,
+                               messenger, nullptr /* no metrics */);
+  RETURN_NOT_OK(copy_client.Start(src_hp, /*meta*/nullptr));
+  RETURN_NOT_OK(copy_client.FetchAll(/*tablet_replica*/nullptr));
+  return copy_client.Finish();
+}
+
+} // anonymous namespace
+
 void SetMasterFlagDefaults() {
   constexpr int32_t kDefaultRpcServiceQueueLength = 100;
 
@@ -108,9 +366,81 @@ Status RunMasterServer() {
 
   RETURN_NOT_OK(ValidateHiveMetastoreSaslEnabled());
 
-  Master server({});
-  RETURN_NOT_OK(server.Init());
-  RETURN_NOT_OK(server.Start());
+  MasterOptions opts;
+  unique_ptr<Master> server(new Master(opts));
+  RETURN_NOT_OK(server->Init());
+  RETURN_NOT_OK(server->Start());
+
+  const auto local_uuid = server->fs_manager()->uuid();
+  while (FLAGS_master_auto_join_cluster) {
+    if (!opts.IsDistributed()) {
+      // We definitely don't have to add a master if there's only one master
+      // specified.
+      break;
+    }
+    // Keep checking to see if the other masters think we are a part of their
+    // Raft group. If so, we don't need to do anything.
+    bool try_again = false;
+    bool needs_add = false;
+    HostPort leader_hp;
+    HostPort local_hp;
+    RETURN_NOT_OK(VerifyMastersGetHostPorts(opts.master_addresses(),
+                                            local_uuid, server->messenger(),
+                                            &leader_hp, &local_hp,
+                                            &try_again, &needs_add));
+
+    // Something went wrong. Sleep for a bit and try later.
+    if (try_again) {
+      LOG(INFO) << "Couldn't verify the masters in the cluster. Trying 
again...";
+      
SleepFor(MonoDelta::FromSeconds(FLAGS_master_auto_join_retry_interval_secs));
+      continue;
+    }
+    if (!needs_add) {
+      // All the other masters think this master is a part of the quorum.
+      // There's nothing left to do but run!
+      break;
+    }
+    // The other masters don't see this master in their quorum. Add it now.
+    LOG(INFO) << Substitute(
+        "Detected that this master $0 is joining an existing cluster", 
local_uuid);
+
+    // Send an add master RPC to the leader master.
+    LOG(INFO) << Substitute("Initiating AddMaster RPC to add $0", 
local_hp.ToString());
+    vector<string> master_addrs;
+    for (const auto& hp : opts.master_addresses()) {
+      master_addrs.emplace_back(hp.ToString());
+    }
+    client::sp::shared_ptr<KuduClient> client;
+    RETURN_NOT_OK(KuduClientBuilder()
+        .master_server_addrs(master_addrs)
+        .Build(&client));
+    AddMasterRequestPB add_req;
+    *add_req.mutable_rpc_addr() = HostPortToPB(local_hp);
+    AddMasterResponsePB add_resp;;
+    Synchronizer sync;
+    client::internal::AsyncLeaderMasterRpc<AddMasterRequestPB, 
AddMasterResponsePB> add_rpc(
+        MonoTime::Now() + 
MonoDelta::FromSeconds(FLAGS_master_auto_join_rpc_timeout_secs),
+        client.get(), rpc::BackoffType::LINEAR,
+        add_req, &add_resp, &MasterServiceProxy::AddMasterAsync,
+        "AddMaster", sync.AsStatusCallback(), 
{MasterFeatures::DYNAMIC_MULTI_MASTER});
+    add_rpc.SendRpc();
+    Status s = sync.Wait();
+    bool master_already_present =
+        add_resp.has_error() &&
+        add_resp.error().code() == 
master::MasterErrorPB::MASTER_ALREADY_PRESENT;
+    if (!s.ok() && !master_already_present) {
+      RETURN_NOT_OK_PREPEND(s, "Failed to perform AddMaster RPC");
+    }
+    server->Shutdown();
+
+    // If we succeeded, wipe the system catalog on this node and initiate a
+    // copy from another node.
+    RETURN_NOT_OK(ClearLocalSystemCatalogAndCopy(leader_hp));
+    server.reset(new Master(opts));
+    RETURN_NOT_OK(server->Init());
+    RETURN_NOT_OK(server->Start());
+    break;
+  }
 
   while (true) {
     SleepFor(MonoDelta::FromSeconds(60));
diff --git a/src/kudu/mini-cluster/external_mini_cluster-test.cc 
b/src/kudu/mini-cluster/external_mini_cluster-test.cc
index 74dd860..50e6a69 100644
--- a/src/kudu/mini-cluster/external_mini_cluster-test.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster-test.cc
@@ -154,8 +154,6 @@ void SmokeExternalMiniCluster(const 
ExternalMiniClusterOptions& opts,
   CHECK(master_rpc_addresses);
   master_rpc_addresses->clear();
 
-  ASSERT_OK(cluster->Start());
-
   // Verify each of the masters.
   for (int i = 0; i < opts.num_masters; i++) {
     SCOPED_TRACE(i);
@@ -281,6 +279,7 @@ TEST_P(ExternalMiniClusterTest, TestBasicOperation) {
   opts.num_tablet_servers = 3;
 
   unique_ptr<ExternalMiniCluster> cluster(new ExternalMiniCluster(opts));
+  ASSERT_OK(cluster->Start());
   vector<HostPort> master_rpc_addresses;
   NO_FATALS(SmokeExternalMiniCluster(opts, cluster.get(), 
&master_rpc_addresses));
 
@@ -296,6 +295,7 @@ TEST_P(ExternalMiniClusterTest, TestBasicOperation) {
   // the prior run.
   opts.master_rpc_addresses = master_rpc_addresses;
   cluster.reset(new ExternalMiniCluster(opts));
+  ASSERT_OK(cluster->Start());
   NO_FATALS(SmokeExternalMiniCluster(opts, cluster.get(), 
&master_rpc_addresses));
   ASSERT_EQ(opts.master_rpc_addresses, master_rpc_addresses);
 
@@ -307,5 +307,41 @@ TEST_P(ExternalMiniClusterTest, TestBasicOperation) {
   cluster->Shutdown();
 }
 
+TEST_P(ExternalMiniClusterTest, TestAddMaster) {
+  ExternalMiniClusterOptions opts;
+  const auto& param = GetParam();
+  opts.enable_kerberos = std::get<0>(param) == Kerberos::ENABLED;
+  if (std::get<1>(param) == HiveMetastore::ENABLED) {
+    opts.hms_mode = HmsMode::ENABLE_HIVE_METASTORE;
+  }
+#if !defined(NO_CHRONY)
+  opts.num_ntp_servers = std::get<2>(param);
+#endif
+
+  opts.num_masters = 3;
+  opts.num_tablet_servers = 1;
+
+  unique_ptr<ExternalMiniCluster> cluster(new ExternalMiniCluster(opts));
+  ASSERT_OK(cluster->Start());
+
+  // Add a master and wait for it to start up and get reported to.
+  cluster->AddMaster();
+  ASSERT_OK(cluster->master(opts.num_masters)->WaitForCatalogManager());
+  cluster->tablet_server(0)->Shutdown();
+  ASSERT_OK(cluster->tablet_server(0)->Restart());
+  ASSERT_OK(cluster->WaitForTabletServerCount(
+      opts.num_tablet_servers, MonoDelta::FromSeconds(30), 
/*master_idx*/opts.num_masters));
+
+  // Smoke the cluster using an updated 'opts' that expects the new number of
+  // masters.
+  opts.num_masters++;
+  vector<HostPort> master_rpc_addresses;
+  NO_FATALS(SmokeExternalMiniCluster(opts, cluster.get(), 
&master_rpc_addresses));
+
+  // Shutdown the cluster explicitly on top of the one in the cluster's
+  // destructor to test repeated calls to Shutdown().
+  cluster->Shutdown();
+}
+
 } // namespace cluster
 } // namespace kudu
diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc 
b/src/kudu/mini-cluster/external_mini_cluster.cc
index 6a59811..cd666f9 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster.cc
@@ -34,6 +34,7 @@
 
 #include "kudu/client/client.h"
 #include "kudu/client/master_rpc.h"
+#include "kudu/rpc/rpc_header.pb.h"
 #if !defined(NO_CHRONY)
 #include "kudu/clock/test/mini_chronyd.h"
 #endif
@@ -502,94 +503,13 @@ Status ExternalMiniCluster::StartMasters() {
       reserved_sockets.emplace_back(std::move(reserved_socket));
     }
   }
-
-  vector<string> flags;
-  flags.emplace_back("--rpc_reuseport=true");
-  // Setting --master_addresses flag for a single master configuration is now 
supported but not
-  // mandatory. Not setting the flag helps test existing kudu deployments that 
don't specify
-  // the --master_addresses flag for single master configuration.
-  if (num_masters > 1 || opts_.supply_single_master_addr) {
-    flags.emplace_back(Substitute("--master_addresses=$0",
-                                  
HostPort::ToCommaSeparatedString(master_rpc_addrs)));
-  }
-  if (!opts_.location_info.empty()) {
-    string bin_path;
-    RETURN_NOT_OK(DeduceBinRoot(&bin_path));
-    const auto mapping_script_path =
-        JoinPathSegments(bin_path, "testdata/assign-location.py");
-    const auto state_store_fpath =
-        JoinPathSegments(opts_.cluster_root, "location-assignment.state");
-    auto location_cmd = Substitute("$0 --state_store=$1",
-                                   mapping_script_path, state_store_fpath);
-    for (const auto& elem : opts_.location_info) {
-      // Per-location mapping rule specified as a pair 'location:num_servers',
-      // where 'location' is the location string and 'num_servers' is the 
number
-      // of tablet servers to be assigned the location.
-      location_cmd += Substitute(" --map $0:$1", elem.first, elem.second);
-    }
-    flags.emplace_back(Substitute("--location_mapping_cmd=$0", location_cmd));
-#   if defined(__APPLE__)
-    // On macOS, it's not possible to have unique loopback interfaces. To make
-    // location mapping working, a tablet server is identified by its UUID
-    // instead of IP address of its RPC end-point.
-    flags.emplace_back("--location_mapping_by_uuid");
-#   endif
-  }
-
-  // Add custom master flags.
-  copy(opts_.extra_master_flags.begin(), opts_.extra_master_flags.end(),
-       std::back_inserter(flags));
-
   // Start the masters.
-  const string& exe = GetBinaryPath(kKuduBinaryName);
   for (int i = 0; i < num_masters; i++) {
-    string daemon_id = Substitute("master-$0", i);
-
-    ExternalDaemonOptions opts;
-    opts.messenger = messenger_;
-    opts.block_manager_type = opts_.block_manager_type;
-    opts.exe = exe;
-    opts.wal_dir = GetWalPath(daemon_id);
-    opts.data_dirs = GetDataPaths(daemon_id);
-    opts.log_dir = GetLogPath(daemon_id);
-    if (FLAGS_perf_record) {
-      opts.perf_record_filename =
-          Substitute("$0/perf-$1.data", opts.log_dir, daemon_id);
-    }
-
-    vector<string> time_source_flags;
-    RETURN_NOT_OK(AddTimeSourceFlags(i, &time_source_flags));
-    // Custom flags come last because they can contain overrides.
-    flags.insert(flags.begin(),
-                 time_source_flags.begin(), time_source_flags.end());
-
-    opts.extra_flags = SubstituteInFlags(flags, i);
-    opts.start_process_timeout = opts_.start_process_timeout;
-    opts.rpc_bind_address = master_rpc_addrs[i];
-    if (opts_.hms_mode == HmsMode::ENABLE_METASTORE_INTEGRATION) {
-      opts.extra_flags.emplace_back(Substitute("--hive_metastore_uris=$0", 
hms_->uris()));
-      if (opts_.enable_kerberos) {
-        opts.extra_flags.emplace_back("--hive_metastore_sasl_enabled=true");
-      }
-    }
-    if (opts_.enable_ranger) {
-      opts.extra_flags.emplace_back(Substitute("--ranger_config_path=$0",
-                                               JoinPathSegments(cluster_root(),
-                                                                
"ranger-client")));
-    }
-    opts.logtostderr = opts_.logtostderr;
-
-    scoped_refptr<ExternalMaster> peer = new ExternalMaster(opts);
-    if (opts_.enable_kerberos) {
-      RETURN_NOT_OK_PREPEND(
-          peer->EnableKerberos(kdc_.get(), opts_.principal, 
master_rpc_addrs[i].host()),
-          "could not enable Kerberos");
-    }
-    RETURN_NOT_OK_PREPEND(peer->Start(),
-                          Substitute("Unable to start Master at index $0", i));
+    scoped_refptr<ExternalMaster> peer;
+    RETURN_NOT_OK(CreateMaster(master_rpc_addrs, i, &peer));
+    RETURN_NOT_OK_PREPEND(peer->Start(), Substitute("Unable to start Master at 
index $0", i));
     masters_.emplace_back(std::move(peer));
   }
-
   return Status::OK();
 }
 
@@ -648,6 +568,129 @@ Status ExternalMiniCluster::AddTabletServer() {
   return Status::OK();
 }
 
+Status ExternalMiniCluster::CreateMaster(const vector<HostPort>& 
master_rpc_addrs, int idx,
+                                         scoped_refptr<ExternalMaster>* 
master) {
+  DCHECK_LT(idx, master_rpc_addrs.size());
+  vector<string> flags;
+  // We expect that callers have reserved a socket for the master we're
+  // creating, and we'll thus have to have the daemon reuse the bound port.
+  flags.emplace_back("--rpc_reuseport=true");
+  // Setting --master_addresses flag for a single master configuration is now
+  // supported but not mandatory. Not setting the flag helps test existing kudu
+  // deployments that don't specify the --master_addresses flag for single
+  // master configuration.
+  if (master_rpc_addrs.size() > 1 || opts_.supply_single_master_addr) {
+    flags.emplace_back(Substitute("--master_addresses=$0",
+                                  
HostPort::ToCommaSeparatedString(master_rpc_addrs)));
+  }
+  if (!opts_.location_info.empty()) {
+    string bin_path;
+    RETURN_NOT_OK(DeduceBinRoot(&bin_path));
+    const auto mapping_script_path =
+        JoinPathSegments(bin_path, "testdata/assign-location.py");
+    const auto state_store_fpath =
+        JoinPathSegments(opts_.cluster_root, "location-assignment.state");
+    auto location_cmd = Substitute("$0 --state_store=$1",
+                                   mapping_script_path, state_store_fpath);
+    for (const auto& elem : opts_.location_info) {
+      // Per-location mapping rule specified as a pair 'location:num_servers',
+      // where 'location' is the location string and 'num_servers' is the 
number
+      // of tablet servers to be assigned the location.
+      location_cmd += Substitute(" --map $0:$1", elem.first, elem.second);
+    }
+    flags.emplace_back(Substitute("--location_mapping_cmd=$0", location_cmd));
+#   if defined(__APPLE__)
+    // On macOS, it's not possible to have unique loopback interfaces. To make
+    // location mapping working, a tablet server is identified by its UUID
+    // instead of IP address of its RPC end-point.
+    flags.emplace_back("--location_mapping_by_uuid");
+#   endif
+  }
+  if (opts_.hms_mode == HmsMode::ENABLE_METASTORE_INTEGRATION) {
+    flags.emplace_back(Substitute("--hive_metastore_uris=$0", hms_->uris()));
+    if (opts_.enable_kerberos) {
+      flags.emplace_back("--hive_metastore_sasl_enabled=true");
+    }
+  }
+  if (opts_.enable_ranger) {
+    flags.emplace_back(Substitute("--ranger_config_path=$0",
+                                  JoinPathSegments(cluster_root(),
+                                                   "ranger-client")));
+  }
+  // Add custom master flags.
+  copy(opts_.extra_master_flags.begin(), opts_.extra_master_flags.end(),
+       std::back_inserter(flags));
+
+  string daemon_id = Substitute("master-$0", idx);
+
+  ExternalDaemonOptions opts;
+  opts.messenger = messenger_;
+  opts.block_manager_type = opts_.block_manager_type;
+  opts.exe = GetBinaryPath(kKuduBinaryName);
+  opts.wal_dir = GetWalPath(daemon_id);
+  opts.data_dirs = GetDataPaths(daemon_id);
+  opts.log_dir = GetLogPath(daemon_id);
+  if (FLAGS_perf_record) {
+    opts.perf_record_filename =
+        Substitute("$0/perf-$1.data", opts.log_dir, daemon_id);
+  }
+
+  vector<string> time_source_flags;
+  RETURN_NOT_OK(AddTimeSourceFlags(idx, &time_source_flags));
+  // Custom flags set above come last because they can contain overrides.
+  flags.insert(flags.begin(), time_source_flags.begin(), 
time_source_flags.end());
+
+  opts.extra_flags = SubstituteInFlags(flags, idx);
+  opts.start_process_timeout = opts_.start_process_timeout;
+  opts.rpc_bind_address = master_rpc_addrs[idx];
+  opts.logtostderr = opts_.logtostderr;
+
+  scoped_refptr<ExternalMaster> peer(new ExternalMaster(opts));
+  if (opts_.enable_kerberos) {
+    RETURN_NOT_OK_PREPEND(
+        peer->EnableKerberos(kdc_.get(), opts_.principal, 
master_rpc_addrs[idx].host()),
+        "could not enable Kerberos");
+  }
+  *master = std::move(peer);
+  return Status::OK();
+}
+
+Status ExternalMiniCluster::AddMaster(const vector<string>& extra_flags) {
+  const int idx = masters_.size();
+  const string daemon_id = Substitute("master-$0", idx);
+
+  unique_ptr<Socket> reserved_socket;
+  RETURN_NOT_OK_PREPEND(ReserveDaemonSocket(DaemonType::MASTER, idx, 
opts_.bind_mode,
+                                            &reserved_socket),
+                        "failed to reserve master socket address");
+  Sockaddr addr;
+  RETURN_NOT_OK(reserved_socket->GetSocketAddress(&addr));
+  vector<HostPort> master_rpc_addrs = this->master_rpc_addrs();
+  master_rpc_addrs.emplace_back(addr.host(), addr.port());
+  scoped_refptr<ExternalMaster> peer;
+  RETURN_NOT_OK(CreateMaster(master_rpc_addrs, idx, &peer));
+  peer->mutable_flags()->insert(peer->mutable_flags()->end(),
+                                extra_flags.begin(), extra_flags.end());
+  RETURN_NOT_OK_PREPEND(peer->Start(),
+                        Substitute("unable to start master at index $0", idx));
+  // Update the existing servers' gflags so the new master is accounted for the
+  // next time they restart.
+  // NOTE: the new master already has the correct list set for this flag, from
+  // the call to CreateMaster().
+  const auto& new_master_addrs_list = 
HostPort::ToCommaSeparatedString(master_rpc_addrs);
+  for (auto& master : masters_) {
+    master->mutable_flags()->emplace_back(
+        Substitute("--master_addresses=$0", new_master_addrs_list));
+  }
+  for (auto& ts : tablet_servers_) {
+    ts->mutable_flags()->emplace_back(
+        Substitute("--tserver_master_addrs=$0", new_master_addrs_list));
+  }
+  masters_.emplace_back(std::move(peer));
+  ++opts_.num_masters;
+  return Status::OK();
+}
+
 #if !defined(NO_CHRONY)
 Status ExternalMiniCluster::AddNtpServer(const Sockaddr& addr) {
   clock::MiniChronydOptions options;
@@ -663,13 +706,21 @@ Status ExternalMiniCluster::AddNtpServer(const Sockaddr& 
addr) {
 }
 #endif // #if !defined(NO_CHRONY) ...
 
-Status ExternalMiniCluster::WaitForTabletServerCount(int count, const 
MonoDelta& timeout) {
+Status ExternalMiniCluster::WaitForTabletServerCount(int count, const 
MonoDelta& timeout,
+                                                     int master_idx) {
+  CHECK_LT(master_idx, opts_.num_masters);
   MonoTime deadline = MonoTime::Now() + timeout;
 
   unordered_set<int> masters_to_search;
-  for (int i = 0; i < masters_.size(); i++) {
-    if (!masters_[i]->IsShutdown()) {
-      masters_to_search.insert(i);
+  if (master_idx == -1) {
+    for (int i = 0; i < masters_.size(); i++) {
+      if (!masters_[i]->IsShutdown()) {
+        masters_to_search.insert(i);
+      }
+    }
+  } else {
+    if (!masters_[master_idx]->IsShutdown()) {
+      masters_to_search.insert(master_idx);
     }
   }
 
@@ -1454,6 +1505,19 @@ Status ExternalMaster::WaitForCatalogManager(WaitMode 
wait_mode) {
     ListTablesResponsePB resp;
     RpcController rpc;
     Status s = proxy->ListTables(req, &resp, &rpc);
+    if (s.IsRemoteError()) {
+      auto* err = rpc.error_response();
+      if (err && err->has_code()) {
+        switch (err->code()) {
+          case rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY:
+          case rpc::ErrorStatusPB::ERROR_UNAVAILABLE:
+            continue;
+          default:
+            return s;
+        }
+      }
+      return s;
+    }
     if (s.ok()) {
       if (!resp.has_error()) {
         // This master is the leader and is up and running.
diff --git a/src/kudu/mini-cluster/external_mini_cluster.h 
b/src/kudu/mini-cluster/external_mini_cluster.h
index 3057edf..7c00de0 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.h
+++ b/src/kudu/mini-cluster/external_mini_cluster.h
@@ -279,6 +279,9 @@ class ExternalMiniCluster : public MiniCluster {
   // Requires that the master is already running.
   Status AddTabletServer();
 
+  // Add a new master to the cluster. The new master is started.
+  Status AddMaster(const std::vector<std::string>& extra_flags = {});
+
 #if !defined(NO_CHRONY)
   // Add a new NTP server to the cluster. The new NTP server is started upon
   // adding, bind to the address and port specified by 'addr'.
@@ -411,9 +414,12 @@ class ExternalMiniCluster : public MiniCluster {
   }
 
   // Wait until the number of registered tablet servers reaches the given count
-  // on all of the running masters. Returns Status::TimedOut if the desired
-  // count is not achieved with the given timeout.
-  Status WaitForTabletServerCount(int count, const MonoDelta& timeout);
+  // on running masters. Returns Status::TimedOut if the desired count is not
+  // achieved with the given timeout.
+  // If 'master_idx' is specified, only examines the given master if it's
+  // running. Otherwise, checks all running masters.
+  Status WaitForTabletServerCount(int count, const MonoDelta& timeout,
+                                  int master_idx = -1);
 
   // Runs gtest assertions that no servers have crashed.
   void AssertNoCrashes();
@@ -495,6 +501,16 @@ class ExternalMiniCluster : public MiniCluster {
  private:
   Status StartMasters();
 
+  // Constructs an ExternalMaster based on 'opts_' but with the given set of
+  // master addresses, giving the new master the address in the list
+  // corresponding to 'idx'. Callers are expected to call Start() with the
+  // output 'master'.
+  //
+  // It's expected that the port for the master at 'idx' is reserved, and that
+  // the master can be run with the --rpc_reuseport flag.
+  Status CreateMaster(const std::vector<HostPort>& master_rpc_addrs, int idx,
+                      scoped_refptr<ExternalMaster>* master);
+
   Status DeduceBinRoot(std::string* ret);
   Status HandleOptions();
 
diff --git a/src/kudu/tools/tool_action_master.cc 
b/src/kudu/tools/tool_action_master.cc
index 710903c..0c308ef 100644
--- a/src/kudu/tools/tool_action_master.cc
+++ b/src/kudu/tools/tool_action_master.cc
@@ -396,6 +396,7 @@ Status AddMaster(const RunnerContext& context) {
     const auto& flag = name_flag_pair.second;
     new_master_flags.emplace_back(Substitute("--$0=$1", flag.name, 
flag.current_value));
   }
+  new_master_flags.emplace_back("--master_auto_join_cluster=false");
 
   // Bring up the new master that includes master addresses of the cluster and 
itself.
   // It's possible this is a retry in which case the new master is already 
part of

Reply via email to