This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f29e0423 [tools] update schema if needed when rebuilding master
5f29e0423 is described below

commit 5f29e04230dadd840b12887423fb76ac62d96538
Author: Yingchun Lai <[email protected]>
AuthorDate: Fri May 6 00:47:36 2022 +0800

    [tools] update schema if needed when rebuilding master
    
    Before this patch, if tservers had outdated schemas with lower
    version for some tables (e.g. when the cluster wasn't healthy), the
    'kudu master unsafe_rebuild' tool might rebuild the system catalog
    with outdated schemas for the unhealthy tables.
    
    This patch optimizes the rebuild logic, so the tool now picks up
    schema of the highest version for an unhealthy table when rebuilding
    the system catalog.
    
    Change-Id: Iec99d57115228b521ba645b8e19c7057a4bb5d3d
    Reviewed-on: http://gerrit.cloudera.org:8080/18496
    Tested-by: Yingchun Lai <[email protected]>
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/tools/kudu-admin-test.cc  | 102 ++++++++++++++++++++++++++++---------
 src/kudu/tools/master_rebuilder.cc |  67 +++++++++++++++++-------
 2 files changed, 125 insertions(+), 44 deletions(-)

diff --git a/src/kudu/tools/kudu-admin-test.cc 
b/src/kudu/tools/kudu-admin-test.cc
index f5314ca3e..120b0da84 100644
--- a/src/kudu/tools/kudu-admin-test.cc
+++ b/src/kudu/tools/kudu-admin-test.cc
@@ -3037,7 +3037,10 @@ namespace {
 constexpr const char* kPrincipal = "oryx";
 
 vector<string> RebuildMasterCmd(const ExternalMiniCluster& cluster,
+                                int tserver_num,
                                 bool is_secure, bool log_to_stderr = false) {
+  CHECK_GT(tserver_num, 0);
+  CHECK_LE(tserver_num, cluster.num_tablet_servers());
   vector<string> command = {
     "master",
     "unsafe_rebuild",
@@ -3052,7 +3055,7 @@ vector<string> RebuildMasterCmd(const 
ExternalMiniCluster& cluster,
   if (is_secure) {
     command.emplace_back(Substitute("--sasl_protocol_name=$0", kPrincipal));
   }
-  for (int i = 0; i < cluster.num_tablet_servers(); i++) {
+  for (int i = 0; i < tserver_num; i++) {
     auto* ts = cluster.tablet_server(i);
     command.emplace_back(ts->bound_rpc_hostport().ToString());
   }
@@ -3085,7 +3088,8 @@ TEST_F(AdminCliTest, TestRebuildMasterWhenNonEmpty) {
   NO_FATALS(cluster_->master()->Shutdown());
   string stdout;
   string stderr;
-  Status s = RunKuduTool(RebuildMasterCmd(*cluster_, /*is_secure*/false, 
/*log_to_stderr*/true),
+  Status s = RunKuduTool(RebuildMasterCmd(*cluster_, FLAGS_num_tablet_servers,
+                                          /*is_secure*/false, 
/*log_to_stderr*/true),
                          &stdout, &stderr);
   ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
   ASSERT_STR_CONTAINS(stderr, "must be empty");
@@ -3093,7 +3097,8 @@ TEST_F(AdminCliTest, TestRebuildMasterWhenNonEmpty) {
   // Delete the contents of the old master from disk. This should allow the
   // tool to run.
   ASSERT_OK(cluster_->master()->DeleteFromDisk());
-  ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, /*is_secure*/false, 
/*log_to_stderr*/true),
+  ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, FLAGS_num_tablet_servers,
+                                         /*is_secure*/false, 
/*log_to_stderr*/true),
                         &stdout, &stderr));
   ASSERT_STR_NOT_CONTAINS(stderr, "must be empty");
   ASSERT_STR_CONTAINS(stdout,
@@ -3136,7 +3141,8 @@ TEST_F(AdminCliTest, TestRebuildMasterWithTombstones) {
   ASSERT_OK(cluster_->master()->DeleteFromDisk());
   string stdout;
   string stderr;
-  ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, /*is_secure*/false, 
/*log_to_stderr*/true),
+  ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, FLAGS_num_tablet_servers,
+                                         /*is_secure*/false, 
/*log_to_stderr*/true),
                                          &stdout, &stderr));
   ASSERT_STR_CONTAINS(stderr, Substitute("Skipping replica of tablet $0 of 
table $1",
                                          tablet_id, kTable));
@@ -3165,39 +3171,56 @@ TEST_F(AdminCliTest, TestRebuildMasterWithTombstones) {
   NO_FATALS(cv.CheckCluster());
 }
 
-TEST_F(AdminCliTest, TestRebuildMasterAndAddColumns) {
-  FLAGS_num_tablet_servers = 1;
-  FLAGS_num_replicas = 1;
+TEST_F(AdminCliTest, TestAddColumnsAndRebuildMaster) {
+  FLAGS_num_tablet_servers = 3;
+  FLAGS_num_replicas = 3;
 
   NO_FATALS(BuildAndStart());
 
+  // Add a column and shutdown a tserver, the tserver holds a schema with a 
lower version.
   {
     unique_ptr<KuduTableAlterer> 
table_alterer(client_->NewTableAlterer(kTableId));
     table_alterer->AddColumn("old_column_0")->Type(KuduColumnSchema::INT32);
     ASSERT_OK(table_alterer->Alter());
   }
+  NO_FATALS(cluster_->tablet_server(0)->Shutdown());
+
+  // Add another column, the latest schema has a higher version.
+  {
+    unique_ptr<KuduTableAlterer> 
table_alterer(client_->NewTableAlterer(kTableId));
+    table_alterer->AddColumn("old_column_1")->Type(KuduColumnSchema::INT32);
+    ASSERT_OK(table_alterer->Alter());
+  }
 
   // Shut down the master and wipe out its data.
   NO_FATALS(cluster_->master()->Shutdown());
   ASSERT_OK(cluster_->master()->DeleteFromDisk());
 
+  // Restart the shutdown tserver, which holds a lower version schema.
+  ASSERT_OK(cluster_->tablet_server(0)->Restart());
+
   // Rebuild the master with the tool.
+  // The tool will firstly use schema on tserver-0 which holds an outdated 
schema, then
+  // use the newer schema on tserver-1 to rebuild master.
   string stdout;
-  ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, /*is_secure*/false, 
/*log_to_stderr*/true),
+  ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, 2, /*is_secure*/false, 
/*log_to_stderr*/true),
                         &stdout));
-  ASSERT_STR_CONTAINS(stdout, "Rebuilt from 1 tablet servers, of which 0 had 
errors");
-  ASSERT_STR_CONTAINS(stdout, "Rebuilt from 1 replicas, of which 0 had 
errors");
+  ASSERT_STR_CONTAINS(stdout, "Rebuilt from 2 tablet servers, of which 0 had 
errors");
+  ASSERT_STR_CONTAINS(stdout, "Rebuilt from 2 replicas, of which 0 had 
errors");
 
   // Restart the master and the tablet servers.
   // The tablet servers must be restarted so they accept the new master's 
certs.
-  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+  for (int i = 0; i < FLAGS_num_tablet_servers; i++) {
     cluster_->tablet_server(i)->Shutdown();
   }
   ASSERT_OK(cluster_->Restart());
+  WaitForTSAndReplicas();
 
-  ASSERT_OK(cluster_->WaitForTabletsRunning(cluster_->tablet_server(0),
-                                            1,
-                                            MonoDelta::FromSeconds(10)));
+  // Wait the cluster to become healthy.
+  string master_address = cluster_->master()->bound_rpc_addr().ToString();
+  ASSERT_EVENTUALLY([&]() {
+    ASSERT_OK(RunKuduTool({"cluster", "ksck", master_address}, nullptr, 
nullptr));
+  });
 
   // The client has to be rebuilt since there's a new master.
   KuduClientBuilder builder;
@@ -3210,19 +3233,46 @@ TEST_F(AdminCliTest, TestRebuildMasterAndAddColumns) {
     ASSERT_OK(table_alterer->Alter());
   }
 
-  const string& ts_addr = 
cluster_->tablet_server(0)->bound_rpc_addr().ToString();
-  ASSERT_EVENTUALLY([&]() {
-    string stdout;
-    ASSERT_OK(RunKuduTool({"remote_replica", "list", ts_addr, 
"-include_schema"} , &stdout));
-    ASSERT_STR_CONTAINS(stdout, "new_column_0");
-  });
+  // Check master and all tservers have the latest schema.
+  ASSERT_OK(RunKuduTool({"table", "describe", master_address, kTableId}, 
&stdout));
+  ASSERT_STR_MATCHES(stdout, "old_column_0.*old_column_1.*new_column_0");
+  for (int i = 0; i < FLAGS_num_tablet_servers; i++) {
+    const string& ts_addr = 
cluster_->tablet_server(i)->bound_rpc_addr().ToString();
+    ASSERT_OK(RunKuduTool({"remote_replica", "list", ts_addr, 
"-include_schema"}, &stdout));
+    ASSERT_STR_MATCHES(stdout, "old_column_0.*old_column_1.*new_column_0");
+  }
 
-  // Check the altered table is readable.
+  // Check the altered table schema in client view, and check it is writable 
and readable.
   {
-    vector<string> rows;
+    KuduSchema schema;
+    ASSERT_OK(client_->GetTableSchema(kTableId, &schema));
+    ASSERT_EQ(6, schema.num_columns());
+    // Here we use the first column to initialize an object of KuduColumnSchema
+    // for there is no default constructor for it.
+    KuduColumnSchema col_schema = schema.Column(0);
+    ASSERT_TRUE(schema.HasColumn("old_column_0", &col_schema));
+    ASSERT_TRUE(schema.HasColumn("old_column_1", &col_schema));
+    ASSERT_TRUE(schema.HasColumn("new_column_0", &col_schema));
+
     client::sp::shared_ptr<KuduTable> table;
     ASSERT_OK(client_->OpenTable(kTableId, &table));
+
+    TestWorkload workload(cluster_.get());
+    workload.set_table_name(kTableId);
+    workload.set_num_write_threads(1);
+    workload.set_schema(table->schema());
+    workload.Setup();
+    workload.Start();
+    while (workload.rows_inserted() < 1000) {
+      SleepFor(MonoDelta::FromMilliseconds(10));
+    }
+    workload.StopAndJoin();
+
+    vector<string> rows;
     ScanTableToStrings(table.get(), &rows);
+    for (const auto& row : rows) {
+      ASSERT_STR_MATCHES(row, "old_column_0.*old_column_1.*new_column_0");
+    }
   }
 }
 
@@ -3292,7 +3342,8 @@ TEST_P(SecureClusterAdminCliParamTest, TestRebuildMaster) 
{
 
   // Rebuild the master with the tool.
   string stdout;
-  ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, is_secure), &stdout));
+  ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, FLAGS_num_tablet_servers, 
is_secure),
+                        &stdout));
   ASSERT_STR_CONTAINS(stdout,
                       "Rebuilt from 3 tablet servers, of which 0 had errors");
   ASSERT_STR_CONTAINS(stdout, "Rebuilt from 3 replicas, of which 0 had 
errors");
@@ -3340,7 +3391,7 @@ TEST_P(SecureClusterAdminCliParamTest, 
TestRebuildMasterAndAddColumns) {
   FLAGS_num_tablet_servers = 3;
   FLAGS_num_replicas = 3;
 
-  // Create a table and insert some rows
+  // Create a table and insert some rows.
   NO_FATALS(MakeTestTable(kTableName, kNumRows, /*num_replicas*/3, 
cluster_.get()));
 
   // Shut down the master and wipe out its data.
@@ -3349,7 +3400,8 @@ TEST_P(SecureClusterAdminCliParamTest, 
TestRebuildMasterAndAddColumns) {
 
   // Rebuild the master with the tool.
   string stdout;
-  ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, is_secure), &stdout));
+  ASSERT_OK(RunKuduTool(RebuildMasterCmd(*cluster_, FLAGS_num_tablet_servers, 
is_secure),
+                        &stdout));
   ASSERT_STR_CONTAINS(stdout,
                       "Rebuilt from 3 tablet servers, of which 0 had errors");
   ASSERT_STR_CONTAINS(stdout, "Rebuilt from 3 replicas, of which 0 had 
errors");
diff --git a/src/kudu/tools/master_rebuilder.cc 
b/src/kudu/tools/master_rebuilder.cc
index d6267b500..2a21d99b9 100644
--- a/src/kudu/tools/master_rebuilder.cc
+++ b/src/kudu/tools/master_rebuilder.cc
@@ -135,6 +135,7 @@ Status MasterRebuilder::RebuildMaster() {
       }
       Status s = CheckTableAndTabletConsistency(replica);
       if (!s.ok()) {
+        // TODO(yingchun): should abort rebuilding master if any fatal error 
happened.
         LOG(WARNING) << Substitute("Failed to process metadata for replica of 
tablet $0 "
                                    "of table $1 on tablet server $2 in state 
$3: $4",
                                    tablet_id, table_name, tserver_addr, 
state_str, s.ToString());
@@ -220,52 +221,80 @@ Status MasterRebuilder::CheckTableConsistency(
     const ListTabletsResponsePB::StatusAndSchemaPB& replica) {
   const string& tablet_id = replica.tablet_status().tablet_id();
   const string& table_name = replica.tablet_status().table_name();
-  scoped_refptr<TableInfo> table = FindOrDie(tables_by_name_, table_name);
+  Schema schema_from_replica;
+  RETURN_NOT_OK(SchemaFromPB(replica.schema(), &schema_from_replica));
 
-  TableMetadataLock l_table(table.get(), LockMode::WRITE);
+  scoped_refptr<TableInfo> table = FindOrDie(tables_by_name_, table_name);
+  table->mutable_metadata()->StartMutation();
   SysTablesEntryPB* metadata = &table->mutable_metadata()->mutable_dirty()->pb;
 
   // Update next_column_id if needed.
-  Schema schema_from_replica;
-  RETURN_NOT_OK(SchemaFromPB(replica.schema(), &schema_from_replica));
   if (schema_from_replica.max_col_id() + 1 > metadata->next_column_id()) {
     metadata->set_next_column_id(schema_from_replica.max_col_id() + 1);
   }
 
-  // Update schema_version if needed.
+  // Update schema if needed.
+  // Suppose the schema with a higher version would be newer, we will replace
+  // the schema_version, SchemaPB and PartitionSchemaPB.
   if (replica.has_schema_version() &&
       replica.schema_version() > metadata->version()) {
     metadata->set_version(replica.schema_version());
+    metadata->mutable_schema()->CopyFrom(replica.schema());
+    metadata->mutable_partition_schema()->CopyFrom(replica.partition_schema());
   }
 
-  // Check the schemas match.
+  // Obtain Schema and PartitionSchema before CommitMutation, since
+  // CommitMutation will reset metadata.
+  int table_version = metadata->version();
   Schema schema_from_table;
   RETURN_NOT_OK(SchemaFromPB(metadata->schema(), &schema_from_table));
+  PartitionSchema pschema_from_table;
+  RETURN_NOT_OK(PartitionSchema::FromPB(
+      metadata->partition_schema(), schema_from_table, &pschema_from_table));
+  table->mutable_metadata()->CommitMutation();
+
+  // Check for the consistency.
+  // Only matched when they have the same version.
+  if (replica.has_schema_version() &&
+      replica.schema_version() != table_version) {
+    LOG(WARNING) << Substitute(
+        "Ignoring mismatched schema version for tablet $0 of table $1", 
tablet_id, table_name);
+    LOG(WARNING) << Substitute("Table schema version: $0", table_version);
+    LOG(WARNING) << Substitute("Mismatched schema version: $0", 
replica.schema_version());
+    return Status::OK();
+  }
+
+  string error_message;
+  // Check the schemas match.
   if (schema_from_table != schema_from_replica) {
-    LOG(WARNING) << Substitute("Schema mismatch for tablet $0 of table $1", 
tablet_id, table_name);
+    error_message = "schema mismatch";
+    LOG(WARNING) << Substitute(
+        "Schema mismatch for tablet $0 of table $1", tablet_id, table_name);
     LOG(WARNING) << Substitute("Table schema: $0", 
schema_from_table.ToString());
     LOG(WARNING) << Substitute("Mismatched schema: $0", 
schema_from_replica.ToString());
-    return Status::Corruption("inconsistent replica: schema mismatch");
   }
 
   // Check the partition schemas match.
-  PartitionSchema pschema_from_table;
   PartitionSchema pschema_from_replica;
-  RETURN_NOT_OK(PartitionSchema::FromPB(metadata->partition_schema(),
-                                        schema_from_table,
-                                        &pschema_from_table));
-  RETURN_NOT_OK(PartitionSchema::FromPB(replica.partition_schema(),
-                                        schema_from_replica,
-                                        &pschema_from_replica));
+  RETURN_NOT_OK(PartitionSchema::FromPB(
+      replica.partition_schema(), schema_from_replica, &pschema_from_replica));
   if (pschema_from_table != pschema_from_replica) {
-    LOG(WARNING) << Substitute("Partition schema mismatch for tablet $0 of 
table $1",
-                               tablet_id, table_name);
+    if (!error_message.empty()) {
+      error_message += ", ";
+    }
+    error_message += "partition schema mismatch";
+    LOG(WARNING) << Substitute(
+        "Partition schema mismatch for tablet $0 of table $1", tablet_id, 
table_name);
     LOG(WARNING) << Substitute("First seen partition schema: $0",
                                
pschema_from_table.DebugString(schema_from_table));
     LOG(WARNING) << Substitute("Mismatched partition schema $0",
                                
pschema_from_replica.DebugString(schema_from_replica));
-    return Status::Corruption("inconsistent replica: partition schema 
mismatch");
   }
+
+  if (!error_message.empty()) {
+    return Status::Corruption(Substitute("inconsistent replica: $0", 
error_message));
+  }
+
   return Status::OK();
 }
 
@@ -318,7 +347,7 @@ Status MasterRebuilder::CheckTabletConsistency(
     LOG(WARNING) << Substitute("First seen partition: $0",
                                metadata.partition().DebugString());
     LOG(WARNING) << Substitute("Mismatched partition $0",
-                               
replica.tablet_status().partition().DebugString());;
+                               
replica.tablet_status().partition().DebugString());
     return Status::Corruption("inconsistent replica: partition mismatch");
   }
 

Reply via email to