This is an automated email from the ASF dual-hosted git repository.

bankim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 47075a1  [master] KUDU-2181 Procedure for copying sys catalog on 
adding master
47075a1 is described below

commit 47075a180ca46366a56af2ff97f65b199b4a1f4e
Author: Bankim Bhavsar <[email protected]>
AuthorDate: Tue Dec 22 11:52:17 2020 -0800

    [master] KUDU-2181 Procedure for copying sys catalog on adding master
    
    This change outlines procedure to copy system catalog for the newly
    added master using existing CLI tools and the master ChangeConfig RPC.
    
    Only functional change is to hookup the master runtime flag
    --master_consensus_allow_status_msg_for_failed_peer to Raft consensus.
    New master could go into a FAILED_RECOVERABLE state if the leader
    master's system catalog WAL has been GC'ed. This change allows the
    new master to be promoted after copying the system catalog externally.
    
    Outline of the test procedure:
    1) Runtime flag --master_consensus_allow_status_msg_for_failed_peer
    must be turned on for existing masters.
    2) Start the new master with
    --master_address_add_new_master=<new-master-hostport> and
    --master_addresses that contains itself and existing masters.
    3) Invoke ChangeConfig to add the master.
    4) Verify the new master is part of the Raft config even if it's a
    LEARNER/NON_VOTER or goes into FAILED_RECOVERABLE state. If not,
    above steps will have to be repeated.
    5) If the new master is promoted to being a VOTER then following tablet
    copy steps can be skipped.
    6) Shutdown the new master.
    7) Delete the system catalog on the new master.
    8) Copy the system catalog from the leader master to the new master.
    9) Bring up the new master.
    10) Verify the new master is promoted as VOTER.
      If the new master doesn't get promoted to a VOTER then double check
      whether the new master is part of the Raft config for masters by
      running "kudu master list".
        - If yes, repeat procedure from step 6.
        - Else repeat the entire procedure
    
    Change-Id: I142c1dec442ec72c38c5be9d62cdf270e441d6e3
    Reviewed-on: http://gerrit.cloudera.org:8080/16830
    Tested-by: Kudu Jenkins
    Reviewed-by: Bankim Bhavsar <[email protected]>
---
 src/kudu/master/dynamic_multi_master-test.cc | 421 ++++++++++++++++++---------
 src/kudu/master/sys_catalog.cc               |  18 +-
 2 files changed, 299 insertions(+), 140 deletions(-)

diff --git a/src/kudu/master/dynamic_multi_master-test.cc 
b/src/kudu/master/dynamic_multi_master-test.cc
index b8db27f..4c05eaf 100644
--- a/src/kudu/master/dynamic_multi_master-test.cc
+++ b/src/kudu/master/dynamic_multi_master-test.cc
@@ -15,13 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <algorithm>
 #include <cstdint>
 #include <functional>
 #include <memory>
 #include <ostream>
 #include <set>
 #include <string>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -29,7 +29,6 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
-#include "kudu/client/client-test-util.h"
 #include "kudu/client/client.h"
 #include "kudu/client/schema.h"
 #include "kudu/common/common.pb.h"
@@ -43,6 +42,7 @@
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/cluster_verifier.h"
 #include "kudu/master/master.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/master/master.proxy.h"
@@ -50,6 +50,8 @@
 #include "kudu/mini-cluster/external_mini_cluster.h"
 #include "kudu/mini-cluster/mini_cluster.h"
 #include "kudu/rpc/rpc_controller.h"
+#include "kudu/tools/tool_test_util.h"
+#include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
@@ -67,7 +69,6 @@ using kudu::client::KuduClient;
 using kudu::client::KuduColumnSchema;
 using kudu::client::KuduSchema;
 using kudu::client::KuduSchemaBuilder;
-using kudu::client::KuduTable;
 using kudu::client::KuduTableCreator;
 using kudu::client::sp::shared_ptr;
 using kudu::cluster::ExternalDaemonOptions;
@@ -75,10 +76,13 @@ using kudu::cluster::ExternalMaster;
 using kudu::cluster::ExternalMiniCluster;
 using kudu::cluster::ExternalMiniClusterOptions;
 using kudu::cluster::MiniCluster;
+using kudu::consensus::LeaderStepDownRequestPB;
+using kudu::consensus::LeaderStepDownResponsePB;
 using kudu::rpc::RpcController;
 using std::set;
 using std::string;
 using std::unique_ptr;
+using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
@@ -103,7 +107,7 @@ static Status CreateTable(ExternalMiniCluster* cluster,
 
 // Test class for testing addition/removal of masters to a Kudu cluster.
 class DynamicMultiMasterTest : public KuduTest {
- public:
+ protected:
   void SetUpWithNumMasters(int num_masters) {
     // Initial number of masters in the cluster before adding a master.
     orig_num_masters_ = num_masters;
@@ -129,6 +133,61 @@ class DynamicMultiMasterTest : public KuduTest {
                                                  MonoDelta::FromSeconds(5)));
   }
 
+  // Bring up a cluster with bunch of tables and ensure the system catalog WAL
+  // has been GC'ed.
+  // Out parameter 'master_hps' returns the HostPort of the masters in the 
original
+  // cluster.
+  void StartClusterWithSysCatalogGCed(vector<HostPort>* master_hps,
+                                      const vector<string>& extra_flags = {}) {
+    // Using low values of log flush threshold and segment size to trigger GC 
of the
+    // sys catalog WAL
+    vector<string> flags = {"--master_support_change_config", 
"--flush_threshold_secs=0",
+                            "--log_segment_size_mb=1"};
+    flags.insert(flags.end(), extra_flags.begin(), extra_flags.end());
+    NO_FATALS(StartCluster(flags));
+
+    // Verify that masters are running as VOTERs and collect their addresses 
to be used
+    // for starting the new master.
+    NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, master_hps));
+
+    auto get_sys_catalog_wal_gc_count = [&] {
+      int64_t sys_catalog_wal_gc_count = 0;
+      
CHECK_OK(itest::GetInt64Metric(cluster_->master(0)->bound_http_hostport(),
+                                     &METRIC_ENTITY_tablet,
+                                     
master::SysCatalogTable::kSysCatalogTabletId,
+                                     &METRIC_log_gc_duration,
+                                     "total_count",
+                                     &sys_catalog_wal_gc_count));
+      return sys_catalog_wal_gc_count;
+    };
+    const int64_t orig_gc_count = get_sys_catalog_wal_gc_count();
+    int64_t updated_gc_count;
+
+    // Create a bunch of tables to ensure sys catalog WAL gets GC'ed.
+    // Need to create around 1k tables even with lowest flush threshold and 
log segment size.
+    int i;
+    for (i = 1; i < 1000; i++) {
+      updated_gc_count = get_sys_catalog_wal_gc_count();
+      if (updated_gc_count > orig_gc_count) {
+        break;
+      }
+      string table_name = Substitute("Table.$0.$1", CURRENT_TEST_NAME(), 
std::to_string(i));
+      ASSERT_OK(CreateTable(cluster_.get(), table_name));
+    }
+    LOG(INFO) << "Number of tables created: " << i - 1;
+
+    MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(2);
+    while (MonoTime::Now() < deadline) {
+      updated_gc_count = get_sys_catalog_wal_gc_count();
+      if (updated_gc_count > orig_gc_count) {
+        break;
+      }
+      SleepFor(MonoDelta::FromMilliseconds(100));
+    }
+    ASSERT_GT(updated_gc_count, orig_gc_count)
+      << "Timed out waiting for system catalog WAL to be GC'ed";
+  }
+
   // Functor that takes a leader_master_idx and runs the desired master RPC 
against
   // the leader master returning the RPC status and the optional 
MasterErrorPB::Code.
   typedef std::function<
@@ -286,7 +345,160 @@ class DynamicMultiMasterTest : public KuduTest {
     ASSERT_EQ(expected_member_type, resp.member_type());
   }
 
- protected:
+  // Verify the newly added master is in FAILED_UNRECOVERABLE state and can't 
be caught up
+  // from WAL.
+  void VerifyNewMasterInFailedUnrecoverableState() {
+    // GetConsensusState() RPC can be made to any master and not necessarily 
the leader master.
+    int leader_master_idx;
+    ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_master_idx));
+    auto leader_master_addr = 
cluster_->master(leader_master_idx)->bound_rpc_addr();
+    consensus::ConsensusServiceProxy consensus_proxy(cluster_->messenger(), 
leader_master_addr,
+                                                     
leader_master_addr.host());
+    consensus::GetConsensusStateRequestPB req;
+    consensus::GetConsensusStateResponsePB resp;
+    RpcController rpc;
+    req.set_dest_uuid(cluster_->master(leader_master_idx)->uuid());
+    req.set_report_health(consensus::INCLUDE_HEALTH_REPORT);
+    ASSERT_OK(consensus_proxy.GetConsensusState(req, &resp, &rpc));
+    ASSERT_FALSE(resp.has_error());
+    ASSERT_EQ(1, resp.tablets_size());
+
+    // Lookup the new_master from the consensus state of the system catalog.
+    const auto& sys_catalog = resp.tablets(0);
+    ASSERT_EQ(master::SysCatalogTable::kSysCatalogTabletId, 
sys_catalog.tablet_id());
+    const auto& cstate = sys_catalog.cstate();
+    const auto& config = cstate.has_pending_config() ?
+                         cstate.pending_config() : cstate.committed_config();
+    ASSERT_EQ(orig_num_masters_ + 1, config.peers_size());
+    int num_new_masters_found = 0;
+    for (const auto& peer : config.peers()) {
+      if (peer.permanent_uuid() == new_master_->uuid()) {
+        ASSERT_EQ(consensus::HealthReportPB::FAILED_UNRECOVERABLE,
+                  peer.health_report().overall_health());
+        num_new_masters_found++;
+      }
+    }
+    ASSERT_EQ(1, num_new_masters_found);
+  }
+
+  // Transfers leadership among masters in the 'cluster' to the specified 
'new_master_uuid'.
+  static void TransferMasterLeadership(ExternalMiniCluster* cluster,
+                                       const string& new_master_uuid) {
+    LOG(INFO) << "Transferring leadership to new master: " << new_master_uuid;
+
+    int leader_master_idx;
+    ASSERT_OK(cluster->GetLeaderMasterIndex(&leader_master_idx));
+    auto leader_master_addr = 
cluster->master(leader_master_idx)->bound_rpc_addr();
+    consensus::ConsensusServiceProxy consensus_proxy(cluster->messenger(), 
leader_master_addr,
+                                                     
leader_master_addr.host());
+    LeaderStepDownRequestPB req;
+    req.set_dest_uuid(cluster->master(leader_master_idx)->uuid());
+    req.set_tablet_id(master::SysCatalogTable::kSysCatalogTabletId);
+    req.set_new_leader_uuid(new_master_uuid);
+    req.set_mode(consensus::GRACEFUL);
+    LeaderStepDownResponsePB resp;
+    RpcController rpc;
+    ASSERT_OK(consensus_proxy.LeaderStepDown(req, &resp, &rpc));
+    ASSERT_FALSE(resp.has_error())
+      << Substitute("Failed transferring leadership to new master: $0, error: 
$1", new_master_uuid,
+                    
tserver::TabletServerErrorPB::Code_Name(resp.error().code()));
+
+    // LeaderStepDown request is asynchronous, hence using ASSERT_EVENTUALLY.
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_OK(cluster->GetLeaderMasterIndex(&leader_master_idx));
+      ASSERT_EQ(new_master_uuid, cluster->master(leader_master_idx)->uuid());
+    });
+  }
+
+  // Verification steps after the new master has been added successfully and 
it's promoted
+  // as VOTER. The supplied 'master_hps' includes the new_master as well.
+  void VerifyClusterAfterMasterAddition(const vector<HostPort>& master_hps) {
+    // Collect information about the cluster for verification later before 
shutting
+    // it down.
+    UnorderedHostPortSet master_hps_set(master_hps.begin(), master_hps.end());
+    ASSERT_EQ(master_hps.size(), master_hps_set.size()) << "Duplicates found 
in master_hps";
+
+    unordered_set<string> master_uuids;
+    for (int i = 0; i < cluster_->num_masters(); i++) {
+      master_uuids.emplace(cluster_->master(i)->uuid());
+    }
+    master_uuids.emplace(new_master_->uuid());
+
+    // Shutdown the cluster and the new master daemon process.
+    // This allows ExternalMiniCluster to manage the newly added master and 
allows
+    // client to connect to the new master if it's elected the leader.
+    LOG(INFO) << "Shutting down the old cluster";
+    new_master_->Shutdown();
+    cluster_.reset();
+
+    LOG(INFO) << "Bringing up the migrated cluster";
+    opts_.num_masters = orig_num_masters_ + 1;
+    opts_.master_rpc_addresses = master_hps;
+    ExternalMiniCluster migrated_cluster(opts_);
+    ASSERT_OK(migrated_cluster.Start());
+    for (int i = 0; i < migrated_cluster.num_masters(); i++) {
+      ASSERT_OK(migrated_cluster.master(i)->WaitForCatalogManager());
+    }
+
+    // Verify the cluster still has the same 3 masters.
+    {
+      ListMastersResponsePB resp;
+      NO_FATALS(RunListMasters(&resp, &migrated_cluster));
+      ASSERT_EQ(orig_num_masters_ + 1, resp.masters_size());
+
+      UnorderedHostPortSet hps_found;
+      unordered_set<string> uuids_found;
+      for (const auto& master : resp.masters()) {
+        ASSERT_EQ(consensus::RaftPeerPB::VOTER, master.member_type());
+        ASSERT_TRUE(master.role() == consensus::RaftPeerPB::LEADER ||
+            master.role() == consensus::RaftPeerPB::FOLLOWER);
+        ASSERT_EQ(1, master.registration().rpc_addresses_size());
+        HostPort actual_hp = 
HostPortFromPB(master.registration().rpc_addresses(0));
+        ASSERT_TRUE(ContainsKey(master_hps_set, actual_hp));
+        hps_found.insert(actual_hp);
+        ASSERT_TRUE(ContainsKey(master_uuids, 
master.instance_id().permanent_uuid()));
+        uuids_found.emplace(master.instance_id().permanent_uuid());
+      }
+      ASSERT_EQ(master_hps_set, hps_found);
+      ASSERT_EQ(master_uuids, uuids_found);
+    }
+
+    // Transfer leadership to the new master.
+    NO_FATALS(TransferMasterLeadership(&migrated_cluster, 
new_master_->uuid()));
+
+    shared_ptr<KuduClient> client;
+    ASSERT_OK(migrated_cluster.CreateClient(nullptr, &client));
+
+    ClusterVerifier cv(&migrated_cluster);
+    NO_FATALS(cv.CheckCluster());
+    LOG(INFO) << "Verifying the first table";
+    cv.CheckRowCount(kTableName, ClusterVerifier::EXACTLY, 0);
+
+    LOG(INFO) << "Creating and verifying the second table";
+    // Perform an operation that requires replication to masters.
+    ASSERT_OK(CreateTable(&migrated_cluster, "second_table"));
+    cv.CheckRowCount("second_table", ClusterVerifier::EXACTLY, 0);
+
+    // Pause one master at a time and create table at the same time which will 
allow
+    // new leader to be elected if the paused master is a leader.
+    // Need at least 3 masters to form consensus and elect a new leader.
+    if (orig_num_masters_ + 1 >= 3) {
+      LOG(INFO) << "Pausing and resuming individual masters";
+      string table_name = kTableName;
+      for (int i = 0; i < orig_num_masters_ + 1; i++) {
+        ASSERT_OK(migrated_cluster.master(i)->Pause());
+        cluster::ScopedResumeExternalDaemon 
resume_daemon(migrated_cluster.master(i));
+        cv.CheckRowCount(table_name, ClusterVerifier::EXACTLY, 0);
+
+        // See MasterFailoverTest.TestCreateTableSync to understand why we must
+        // check for IsAlreadyPresent as well.
+        table_name = Substitute("table-$0", i);
+        Status s = CreateTable(&migrated_cluster, table_name);
+        ASSERT_TRUE(s.ok() || s.IsAlreadyPresent());
+      }
+    }
+  }
+
   // Tracks the current number of masters in the cluster
   int orig_num_masters_;
   ExternalMiniClusterOptions opts_;
@@ -299,8 +511,12 @@ class DynamicMultiMasterTest : public KuduTest {
   string reserved_hp_str_;
   unique_ptr<MasterServiceProxy> new_master_proxy_;
   scoped_refptr<ExternalMaster> new_master_;
+
+  static const char* const kTableName;
 };
 
+const char* const DynamicMultiMasterTest::kTableName = "first_table";
+
 // Parameterized DynamicMultiMasterTest class that works with different 
initial number of masters.
 class ParameterizedDynamicMultiMasterTest : public DynamicMultiMasterTest,
                                             public 
::testing::WithParamInterface<int> {
@@ -327,7 +543,6 @@ TEST_P(ParameterizedDynamicMultiMasterTest, 
TestAddMasterCatchupFromWAL) {
   vector<HostPort> master_hps;
   NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
 
-  const string kTableName = "first_table";
   ASSERT_OK(CreateTable(cluster_.get(), kTableName));
 
   // Bring up the new master and add to the cluster.
@@ -337,7 +552,7 @@ TEST_P(ParameterizedDynamicMultiMasterTest, 
TestAddMasterCatchupFromWAL) {
 
   // Newly added master will be caught up from WAL itself without requiring 
tablet copy
   // since the system catalog is fresh with a single table.
-  // Catching up from WAL and promotion to VOTER will not be instantly after 
adding the
+  // Catching up from WAL and promotion to VOTER will not be instant after 
adding the
   // new master. Hence using ASSERT_EVENTUALLY.
   ASSERT_EVENTUALLY([&] {
     ListMastersResponsePB resp;
@@ -374,108 +589,18 @@ TEST_P(ParameterizedDynamicMultiMasterTest, 
TestAddMasterCatchupFromWAL) {
     ASSERT_STR_CONTAINS(s.message().ToString(), "Master already present");
   }
 
-  // Shutdown the cluster and the new master daemon process.
-  // This allows ExternalMiniCluster to manage the newly added master and 
allows
-  // client to connect to the new master if it's elected the leader.
-  new_master_->Shutdown();
-  cluster_.reset();
-
-  opts_.num_masters = orig_num_masters_ + 1;
-  opts_.master_rpc_addresses = master_hps;
-  ExternalMiniCluster migrated_cluster(opts_);
-  ASSERT_OK(migrated_cluster.Start());
-  
ASSERT_OK(migrated_cluster.WaitForTabletServerCount(migrated_cluster.num_tablet_servers(),
-                                                      
MonoDelta::FromSeconds(5)));
-
-  // Verify the cluster still has the same 3 masters.
-  {
-    ListMastersResponsePB resp;
-    NO_FATALS(RunListMasters(&resp, &migrated_cluster));
-    ASSERT_EQ(orig_num_masters_ + 1, resp.masters_size());
-
-    for (const auto& master : resp.masters()) {
-      ASSERT_EQ(consensus::RaftPeerPB::VOTER, master.member_type());
-      ASSERT_TRUE(master.role() == consensus::RaftPeerPB::LEADER ||
-                  master.role() == consensus::RaftPeerPB::FOLLOWER);
-      ASSERT_EQ(1, master.registration().rpc_addresses_size());
-      HostPort actual_hp = 
HostPortFromPB(master.registration().rpc_addresses(0));
-      ASSERT_TRUE(std::find(master_hps.begin(), master_hps.end(), actual_hp) 
!= master_hps.end());
-    }
-  }
-
-  shared_ptr<KuduClient> client;
-  ASSERT_OK(migrated_cluster.CreateClient(nullptr, &client));
-
-  shared_ptr<KuduTable> table;
-  ASSERT_OK(client->OpenTable(kTableName, &table));
-  ASSERT_EQ(0, CountTableRows(table.get()));
-
-  // Perform an operation that requires replication to masters.
-  ASSERT_OK(CreateTable(&migrated_cluster, "second_table"));
-  ASSERT_OK(client->OpenTable("second_table", &table));
-  ASSERT_EQ(0, CountTableRows(table.get()));
-
-  // Pause master one at a time and create table at the same time which will 
allow
-  // new leader to be elected if the paused master is a leader.
-  // Need at least 3 masters to form consensus and elect a new leader.
-  if (orig_num_masters_ + 1 >= 3) {
-    for (int i = 0; i < orig_num_masters_ + 1; i++) {
-      ASSERT_OK(migrated_cluster.master(i)->Pause());
-      cluster::ScopedResumeExternalDaemon 
resume_daemon(migrated_cluster.master(i));
-      ASSERT_OK(client->OpenTable(kTableName, &table));
-      ASSERT_EQ(0, CountTableRows(table.get()));
-
-      // See MasterFailoverTest.TestCreateTableSync to understand why we must
-      // check for IsAlreadyPresent as well.
-      Status s = CreateTable(&migrated_cluster, Substitute("table-$0", i));
-      ASSERT_TRUE(s.ok() || s.IsAlreadyPresent());
-    }
-  }
+  NO_FATALS(VerifyClusterAfterMasterAddition(master_hps));
 }
 
-// This test starts a cluster with low values for log flush and segment size 
to force GC
-// of the system catalog WAL. When a new master is added, test verifies that 
the new master
-// can't be caught up from WAL and as a result the new master, though added to 
the master Raft
-// config, remains a NON_VOTER.
-TEST_P(ParameterizedDynamicMultiMasterTest, 
TestAddMasterCatchupFromWALNotPossible) {
+// This test goes through the workflow required to copy system catalog to the 
newly added master.
+TEST_P(ParameterizedDynamicMultiMasterTest, TestAddMasterSysCatalogCopy) {
   SKIP_IF_SLOW_NOT_ALLOWED();
 
-  // Using low values of log flush threshold and segment size to trigger GC of 
the sys catalog WAL.
-  NO_FATALS(StartCluster({"--master_support_change_config", 
"--flush_threshold_secs=1",
-                          "--log_segment_size_mb=1"}));
-
-  // Verify that masters are running as VOTERs and collect their addresses to 
be used
-  // for starting the new master.
   vector<HostPort> master_hps;
-  NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
-
-  auto get_sys_catalog_wal_gc_count = [&] {
-    int64_t sys_catalog_wal_gc_count = 0;
-    CHECK_OK(itest::GetInt64Metric(cluster_->master(0)->bound_http_hostport(),
-                                   &METRIC_ENTITY_tablet,
-                                   
master::SysCatalogTable::kSysCatalogTabletId,
-                                   &METRIC_log_gc_duration,
-                                   "total_count",
-                                   &sys_catalog_wal_gc_count));
-    return sys_catalog_wal_gc_count;
-  };
-  auto orig_gc_count = get_sys_catalog_wal_gc_count();
-
-  // Create a bunch of tables to ensure sys catalog WAL gets GC'ed.
-  // Need to create around 1k tables even with lowest flush threshold and log 
segment size.
-  for (int i = 1; i < 1000; i++) {
-    string table_name = "Table.TestAddMasterCatchupFromWALNotPossible." + 
std::to_string(i);
-    ASSERT_OK(CreateTable(cluster_.get(), table_name));
-  }
-
-  int64_t time_left_to_sleep_msecs = 2000;
-  while (time_left_to_sleep_msecs > 0 && orig_gc_count == 
get_sys_catalog_wal_gc_count()) {
-    static const MonoDelta kSleepDuration = MonoDelta::FromMilliseconds(100);
-    SleepFor(kSleepDuration);
-    time_left_to_sleep_msecs -= kSleepDuration.ToMilliseconds();
-  }
-  ASSERT_GT(time_left_to_sleep_msecs, 0) << "Timed out waiting for system 
catalog WAL to be GC'ed";
-
+  NO_FATALS(StartClusterWithSysCatalogGCed(
+      &master_hps,
+      {"--master_consensus_allow_status_msg_for_failed_peer"}));
+  ASSERT_OK(CreateTable(cluster_.get(), kTableName));
   // Bring up the new master and add to the cluster.
   master_hps.emplace_back(reserved_hp_);
   NO_FATALS(StartNewMaster(master_hps));
@@ -497,44 +622,66 @@ TEST_P(ParameterizedDynamicMultiMasterTest, 
TestAddMasterCatchupFromWALNotPossib
   }
 
   // Double check by directly contacting the new master.
-  VerifyNewMasterDirectly({ consensus::RaftPeerPB::LEARNER }, 
consensus::RaftPeerPB::NON_VOTER);
+  NO_FATALS(VerifyNewMasterDirectly({ consensus::RaftPeerPB::LEARNER },
+                                    consensus::RaftPeerPB::NON_VOTER));
 
-  // Verify FAILED_UNRECOVERABLE health error about the new master that can't 
be caught up
-  // from WAL. This health state update may take some round trips between the 
masters and
+  // Verify new master is in FAILED_UNRECOVERABLE state.
+  // This health state update may take some round trips between the masters and
   // hence wrapping it under ASSERT_EVENTUALLY.
   ASSERT_EVENTUALLY([&] {
-    // GetConsensusState() RPC can be made to any master and not necessarily 
the leader master.
-    int leader_master_idx;
-    ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_master_idx));
-    auto leader_master_addr = 
cluster_->master(leader_master_idx)->bound_rpc_addr();
-    consensus::ConsensusServiceProxy consensus_proxy(cluster_->messenger(), 
leader_master_addr,
-                                                     
leader_master_addr.host());
-    consensus::GetConsensusStateRequestPB req;
-    consensus::GetConsensusStateResponsePB resp;
-    RpcController rpc;
-    req.set_dest_uuid(cluster_->master(leader_master_idx)->uuid());
-    req.set_report_health(consensus::INCLUDE_HEALTH_REPORT);
-    ASSERT_OK(consensus_proxy.GetConsensusState(req, &resp, &rpc));
-    ASSERT_FALSE(resp.has_error());
-    ASSERT_EQ(1, resp.tablets_size());
+    NO_FATALS(VerifyNewMasterInFailedUnrecoverableState());
+  });
 
-    // Lookup the new_master from the consensus state of the system catalog.
-    const auto& sys_catalog = resp.tablets(0);
-    ASSERT_EQ(master::SysCatalogTable::kSysCatalogTabletId, 
sys_catalog.tablet_id());
-    const auto& cstate = sys_catalog.cstate();
-    const auto& config = cstate.has_pending_config() ?
-        cstate.pending_config() : cstate.committed_config();
-    ASSERT_EQ(orig_num_masters_ + 1, config.peers_size());
-    int num_new_masters_found = 0;
-    for (const auto& peer : config.peers()) {
-      if (peer.permanent_uuid() == new_master_->uuid()) {
-        ASSERT_EQ(consensus::HealthReportPB::FAILED_UNRECOVERABLE,
-                  peer.health_report().overall_health());
-        num_new_masters_found++;
-      }
-    }
-    ASSERT_EQ(1, num_new_masters_found);
+  // Without system catalog copy, the new master will remain in the 
FAILED_UNRECOVERABLE state.
+  // So lets proceed with the tablet copy process for system catalog.
+  // Shutdown the new master
+  LOG(INFO) << "Shutting down the new master";
+  new_master_->Shutdown();
+
+  LOG(INFO) << "Deleting the system catalog";
+  // Delete sys catalog on local master
+  ASSERT_OK(tools::RunKuduTool({"local_replica", "delete",
+                                master::SysCatalogTable::kSysCatalogTabletId,
+                                "-fs_data_dirs=" + 
JoinStrings(new_master_->data_dirs(), ","),
+                                "-fs_wal_dir=" + new_master_->wal_dir(),
+                                "-clean_unsafe"}));
+
+
+  // Copy from remote system catalog
+  LOG(INFO) << "Copying from remote master: "
+    << cluster_->master(0)->bound_rpc_hostport().ToString();
+  ASSERT_OK(tools::RunKuduTool({"local_replica", "copy_from_remote",
+                                master::SysCatalogTable::kSysCatalogTabletId,
+                                
cluster_->master(0)->bound_rpc_hostport().ToString(),
+                                "-fs_data_dirs=" + 
JoinStrings(new_master_->data_dirs(), ","),
+                                "-fs_wal_dir=" + new_master_->wal_dir()}));
+
+  LOG(INFO) << "Restarting the new master";
+  ASSERT_OK(new_master_->Restart());
+
+  // Wait for the new master to be up and running and the leader master to 
send status only Raft
+  // message to allow the new master to be considered caught up and promoted 
to be being a VOTER.
+  ASSERT_EVENTUALLY([&] {
+    VerifyNewMasterDirectly({ consensus::RaftPeerPB::FOLLOWER, 
consensus::RaftPeerPB::LEADER },
+                            consensus::RaftPeerPB::VOTER);
   });
+
+  // Verify the same state from the leader master
+  NO_FATALS(RunListMasters(&list_resp));
+  ASSERT_EQ(orig_num_masters_ + 1, list_resp.masters_size());
+
+  for (const auto& master : list_resp.masters()) {
+    ASSERT_EQ(1, master.registration().rpc_addresses_size());
+    auto hp = HostPortFromPB(master.registration().rpc_addresses(0));
+    if (hp == reserved_hp_) {
+      ASSERT_EQ(new_master_->uuid(), master.instance_id().permanent_uuid());
+      ASSERT_EQ(consensus::RaftPeerPB::VOTER, master.member_type());
+      ASSERT_TRUE(master.role() == consensus::RaftPeerPB::FOLLOWER ||
+                  master.role() == consensus::RaftPeerPB::LEADER);
+    }
+  }
+
+  NO_FATALS(VerifyClusterAfterMasterAddition(master_hps));
 }
 
 // Test that brings up a single master cluster with 'last_known_addr' not 
populated by
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 7e4699c..1d5d004 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -88,11 +88,20 @@ DEFINE_double(sys_catalog_fail_during_write, 0.0,
               "Fraction of the time when system table writes will fail");
 TAG_FLAG(sys_catalog_fail_during_write, hidden);
 
+// Following flags related to dynamic multi-master are hidden till the feature 
is ready to
+// be advertised.
 DEFINE_string(master_address_add_new_master, "",
               "Address of master to add as a NON_VOTER on creating a 
distributed master config.");
 TAG_FLAG(master_address_add_new_master, unsafe);
 TAG_FLAG(master_address_add_new_master, hidden);
 
+DEFINE_bool(master_consensus_allow_status_msg_for_failed_peer, false,
+            "Allows status-only Raft messages to be sent to a master peer in 
FAILED_UNRECOVERABLE "
+            "state.");
+TAG_FLAG(master_consensus_allow_status_msg_for_failed_peer, advanced);
+TAG_FLAG(master_consensus_allow_status_msg_for_failed_peer, hidden);
+TAG_FLAG(master_consensus_allow_status_msg_for_failed_peer, runtime);
+
 DECLARE_bool(master_support_change_config);
 DECLARE_int64(rpc_max_message_size);
 
@@ -466,9 +475,12 @@ Status SysCatalogTable::SetupTablet(
       [this, tablet_id](const string& reason) {
         this->SysCatalogStateChanged(tablet_id, reason);
       }));
-  RETURN_NOT_OK_SHUTDOWN(tablet_replica_->Init({ /*quiescing*/nullptr,
-                                                 master_->num_raft_leaders(),
-                                                 master_->raft_pool() }),
+
+  consensus::ServerContext server_ctx{/*quiescing*/nullptr,
+                                      master_->num_raft_leaders(),
+                                      master_->raft_pool(),
+                                      
&FLAGS_master_consensus_allow_status_msg_for_failed_peer};
+  RETURN_NOT_OK_SHUTDOWN(tablet_replica_->Init(std::move(server_ctx)),
                          "failed to initialize system catalog replica");
 
   shared_ptr<Tablet> tablet;

Reply via email to