This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new e5579c9  KUDU-3192: [hms] Leverage the cluster ID in HMS sync
e5579c9 is described below

commit e5579c9b7465f9d9a164d42709f1a9a07acdff59
Author: Grant Henke <[email protected]>
AuthorDate: Tue Sep 29 16:36:08 2020 -0500

    KUDU-3192: [hms] Leverage the cluster ID in HMS sync
    
    This patch propagates the cluster ID to the HMS entries for
    Kudu tables and leverages the cluster ID to ignore irrelevant
    notifications from the HMS.
    
    Additionally the `kudu hms` tool is updated to identify
    and repair tables that do not have the cluster ID set.
    
    Change-Id: I865b418a3cc4e11c889cc4757cd940831c43af17
    Reviewed-on: http://gerrit.cloudera.org:8080/16494
    Reviewed-by: Andrew Wong <[email protected]>
    Tested-by: Grant Henke <[email protected]>
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/hms/hms_catalog-test.cc                 |  67 +++++++++-----
 src/kudu/hms/hms_catalog.cc                      |  19 ++--
 src/kudu/hms/hms_catalog.h                       |   4 +
 src/kudu/hms/hms_client-test.cc                  |  23 +++--
 src/kudu/hms/hms_client.cc                       |   1 +
 src/kudu/hms/hms_client.h                        |   1 +
 src/kudu/integration-tests/hms_itest-base.cc     |   1 +
 src/kudu/integration-tests/master_hms-itest.cc   |  63 +++++++++++++
 src/kudu/master/catalog_manager.cc               |  20 +++--
 src/kudu/master/catalog_manager.h                |   6 ++
 src/kudu/master/hms_notification_log_listener.cc |  24 +++++
 src/kudu/master/master.cc                        |   2 +-
 src/kudu/master/master.h                         |   9 +-
 src/kudu/master/master_service.cc                |   4 +-
 src/kudu/master/sys_catalog-test.cc              |   4 +-
 src/kudu/tools/kudu-tool-test.cc                 | 109 +++++++++++------------
 src/kudu/tools/tool_action_hms.cc                |  33 ++++---
 17 files changed, 264 insertions(+), 126 deletions(-)

diff --git a/src/kudu/hms/hms_catalog-test.cc b/src/kudu/hms/hms_catalog-test.cc
index ec9ae62..eac64c7 100644
--- a/src/kudu/hms/hms_catalog-test.cc
+++ b/src/kudu/hms/hms_catalog-test.cc
@@ -211,6 +211,7 @@ class HmsCatalogTest : public KuduTest {
   void CheckTable(const string& database_name,
                   const string& table_name,
                   const string& table_id,
+                  const string& cluster_id,
                   const boost::optional<const string&>& owner,
                   const Schema& schema) {
     hive::Table table;
@@ -222,6 +223,7 @@ class HmsCatalogTest : public KuduTest {
       EXPECT_TRUE(table.owner.empty());
     }
     EXPECT_EQ(table.parameters[HmsClient::kKuduTableIdKey], table_id);
+    EXPECT_EQ(table.parameters[HmsClient::kKuduClusterIdKey], cluster_id);
     EXPECT_EQ(table.parameters[HmsClient::kKuduMasterAddrsKey], kMasterAddrs);
     EXPECT_EQ(table.parameters[HmsClient::kStorageHandlerKey], 
HmsClient::kKuduStorageHandler);
 
@@ -289,6 +291,7 @@ INSTANTIATE_TEST_CASE_P(HmsCatalogTests, 
HmsCatalogTestParameterized,
 // Test creating, altering, and dropping a table with the HMS Catalog.
 TEST_P(HmsCatalogTestParameterized, TestTableLifecycle) {
   const string kTableId = "table-id";
+  const string kClusterId = "cluster-id";
   const string kHmsDatabase = "default";
   const string kHmsTableName = "table_name";
   const string kTableName = Substitute("$0.$1", kHmsDatabase, kHmsTableName);
@@ -299,22 +302,26 @@ TEST_P(HmsCatalogTestParameterized, TestTableLifecycle) {
   Schema schema = AllTypesSchema();
 
   // Create the table.
-  ASSERT_OK(hms_catalog_->CreateTable(kTableId, kTableName, kOwner, schema));
-  NO_FATALS(CheckTable(kHmsDatabase, kHmsTableName, kTableId, kOwner, schema));
+  ASSERT_OK(hms_catalog_->CreateTable(kTableId, kTableName,
+      kClusterId, kOwner, schema));
+  NO_FATALS(CheckTable(kHmsDatabase, kHmsTableName, kTableId,
+      kClusterId, kOwner, schema));
 
   // Create the table again, and check that the expected failure occurs.
-  Status s = hms_catalog_->CreateTable(kTableId, kTableName, kOwner, schema);
+  Status s = hms_catalog_->CreateTable(kTableId, kTableName,
+      kClusterId, kOwner, schema);
   ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString();
-  NO_FATALS(CheckTable(kHmsDatabase, kHmsTableName, kTableId, kOwner, schema));
+  NO_FATALS(CheckTable(kHmsDatabase, kHmsTableName, kTableId, kClusterId, 
kOwner, schema));
 
   // Alter the table.
   SchemaBuilder b(schema);
   b.AddColumn("new_column", DataType::INT32);
   Schema altered_schema = b.Build();
   ASSERT_OK(hms_catalog_->AlterTable(kTableId, kTableName, kAlteredTableName,
-                                     kOwner, altered_schema));
+      kClusterId, kOwner, altered_schema));
   NO_FATALS(CheckTableDoesNotExist(kHmsDatabase, kHmsTableName));
-  NO_FATALS(CheckTable(kHmsDatabase, kHmsAlteredTableName, kTableId, kOwner, 
altered_schema));
+  NO_FATALS(CheckTable(kHmsDatabase, kHmsAlteredTableName, kTableId, 
kClusterId, kOwner,
+      altered_schema));
 
   // Drop the table.
   ASSERT_OK(hms_catalog_->DropTable(kTableId, kAlteredTableName));
@@ -326,6 +333,7 @@ TEST_P(HmsCatalogTestParameterized, TestTableLifecycle) {
 // belong to external tables from other systems.
 TEST_F(HmsCatalogTest, TestExternalTable) {
   const string kTableId = "table-id";
+  const string kClusterId = "cluster-id";
 
   // Create the external table (default.ext).
   hive::Table external_table;
@@ -345,26 +353,30 @@ TEST_F(HmsCatalogTest, TestExternalTable) {
 
   // Create the Kudu table (default.a).
   Schema schema = AllTypesSchema();
-  ASSERT_OK(hms_catalog_->CreateTable(kTableId, "default.a", boost::none, 
schema));
-  NO_FATALS(CheckTable("default", "a", kTableId, boost::none, schema));
+  ASSERT_OK(hms_catalog_->CreateTable(kTableId, "default.a",
+      kClusterId, boost::none, schema));
+  NO_FATALS(CheckTable("default", "a", kTableId, kClusterId, boost::none, 
schema));
 
   // Try and create a Kudu table with the same name as the external table.
-  Status s = hms_catalog_->CreateTable(kTableId, "default.ext", boost::none, 
schema);
+  Status s = hms_catalog_->CreateTable(kTableId, "default.ext",
+      kClusterId, boost::none, schema);
   EXPECT_TRUE(s.IsAlreadyPresent()) << s.ToString();
   NO_FATALS(CheckExternalTable());
 
   // Try and rename the Kudu table to the external table name.
-  s = hms_catalog_->AlterTable(kTableId, "default.a", "default.ext", 
boost::none, schema);
+  s = hms_catalog_->AlterTable(kTableId, "default.a", "default.ext",
+      kClusterId, boost::none, schema);
   EXPECT_TRUE(s.IsIllegalState()) << s.ToString();
   NO_FATALS(CheckExternalTable());
-  NO_FATALS(CheckTable("default", "a", kTableId, boost::none, schema));
+  NO_FATALS(CheckTable("default", "a", kTableId, kClusterId, boost::none, 
schema));
 
   // Try and rename the external table. This shouldn't succeed because the 
Table
   // ID doesn't match.
-  s = hms_catalog_->AlterTable(kTableId, "default.ext", "default.b", 
boost::none, schema);
+  s = hms_catalog_->AlterTable(kTableId, "default.ext", "default.b",
+      kClusterId, boost::none, schema);
   EXPECT_TRUE(s.IsNotFound()) << s.ToString();
   NO_FATALS(CheckExternalTable());
-  NO_FATALS(CheckTable("default", "a", kTableId, boost::none, schema));
+  NO_FATALS(CheckTable("default", "a", kTableId, kClusterId, boost::none, 
schema));
   NO_FATALS(CheckTableDoesNotExist("default", "b"));
 
   // Try and drop the external table as if it were a Kudu table.
@@ -380,6 +392,7 @@ TEST_F(HmsCatalogTest, TestExternalTable) {
 }
 
 TEST_F(HmsCatalogTest, TestGetKuduTables) {
+  const string kClusterId = "cluster-id";
   const string kHmsDatabase = "db";
   const string kManagedTableName = "managed_table";
   const string kExternalTableName = "external_table";
@@ -398,7 +411,8 @@ TEST_F(HmsCatalogTest, TestGetKuduTables) {
   ASSERT_OK(CreateLegacyTable("db", "external_table", 
HmsClient::kExternalTable));
   ASSERT_OK(hms_client_->GetTable("db", "external_table", &table));
 
-  ASSERT_OK(hms_catalog_->CreateTable("fake-id", "db.table", kOwner, 
Schema()));
+  ASSERT_OK(hms_catalog_->CreateTable("fake-id", "db.table",
+      kClusterId, kOwner, Schema()));
 
   hive::Table non_kudu_table;
   non_kudu_table.dbName = "db";
@@ -421,12 +435,13 @@ TEST_F(HmsCatalogTest, TestReconnect) {
   // MiniHms does not support HA, since it relies on a single-process Derby 
database.
 
   const string kTableId = "table-id";
+  const string kClusterId = "cluster-id";
   const string kHmsDatabase = "default";
   const string kOwner = "alice";
   Schema schema = AllTypesSchema();
-  ASSERT_OK(hms_catalog_->CreateTable(kTableId, "default.a", kOwner, schema));
-  NO_FATALS(CheckTable(kHmsDatabase, "a", kTableId, kOwner, schema));
-
+  ASSERT_OK(hms_catalog_->CreateTable(kTableId, "default.a",
+      kClusterId, kOwner, schema));
+  NO_FATALS(CheckTable(kHmsDatabase, "a", kTableId, kClusterId, kOwner, 
schema));
   // Shutdown the HMS and try a few operations.
   ASSERT_OK(StopHms());
 
@@ -434,24 +449,28 @@ TEST_F(HmsCatalogTest, TestReconnect) {
   // while the HMS is unavailable results in a non-linear number of reconnect
   // attempts.
 
-  Status s = hms_catalog_->CreateTable(kTableId, "default.b", kOwner, schema);
+  Status s = hms_catalog_->CreateTable(kTableId, "default.b",
+      kClusterId, kOwner, schema);
   EXPECT_TRUE(s.IsNetworkError()) << s.ToString();
 
-  s = hms_catalog_->AlterTable(kTableId, "default.a", "default.c", kOwner, 
schema);
+  s = hms_catalog_->AlterTable(kTableId, "default.a", "default.c",
+      kClusterId, kOwner, schema);
   EXPECT_TRUE(s.IsNetworkError()) << s.ToString();
 
   // Start the HMS back up and ensure that the same operations succeed.
   ASSERT_OK(StartHms());
   ASSERT_EVENTUALLY([&] {
     // HmsCatalog throttles reconnections, so it's necessary to wait out the 
backoff.
-    ASSERT_OK(hms_catalog_->CreateTable(kTableId, "default.d", kOwner, 
schema));
+    ASSERT_OK(hms_catalog_->CreateTable(kTableId, "default.d",
+        kClusterId, kOwner, schema));
   });
 
-  NO_FATALS(CheckTable(kHmsDatabase, "a", kTableId, kOwner, schema));
-  NO_FATALS(CheckTable(kHmsDatabase, "d", kTableId, kOwner, schema));
+  NO_FATALS(CheckTable(kHmsDatabase, "a", kTableId, kClusterId, kOwner, 
schema));
+  NO_FATALS(CheckTable(kHmsDatabase, "d", kTableId, kClusterId, kOwner, 
schema));
 
-  EXPECT_OK(hms_catalog_->AlterTable(kTableId, "default.a", "default.c", 
kOwner, schema));
-  NO_FATALS(CheckTable(kHmsDatabase, "c", kTableId, kOwner, schema));
+  EXPECT_OK(hms_catalog_->AlterTable(kTableId, "default.a", "default.c",
+      kClusterId, kOwner, schema));
+  NO_FATALS(CheckTable(kHmsDatabase, "c", kTableId, kClusterId, kOwner, 
schema));
   NO_FATALS(CheckTableDoesNotExist(kHmsDatabase, "a"));
 }
 
diff --git a/src/kudu/hms/hms_catalog.cc b/src/kudu/hms/hms_catalog.cc
index 96a2c12..8eb8b30 100644
--- a/src/kudu/hms/hms_catalog.cc
+++ b/src/kudu/hms/hms_catalog.cc
@@ -145,11 +145,13 @@ void HmsCatalog::Stop() {
 
 Status HmsCatalog::CreateTable(const string& id,
                                const string& name,
+                               const string& cluster_id,
                                const optional<const string&>& owner,
                                const Schema& schema,
                                const string& table_type) {
   hive::Table table;
-  RETURN_NOT_OK(PopulateTable(id, name, owner, schema, master_addresses_, 
table_type, &table));
+  RETURN_NOT_OK(PopulateTable(id, name, owner, schema, cluster_id, 
master_addresses_,
+      table_type, &table));
   return ha_client_.Execute([&] (HmsClient* client) {
       return client->CreateTable(table, EnvironmentContext());
   });
@@ -175,6 +177,7 @@ Status HmsCatalog::DropTable(const string& name, const 
hive::EnvironmentContext&
 }
 
 Status HmsCatalog::UpgradeLegacyImpalaTable(const string& id,
+                                            const string& cluster_id,
                                             const string& db_name,
                                             const string& tb_name,
                                             const Schema& schema) {
@@ -195,8 +198,8 @@ Status HmsCatalog::UpgradeLegacyImpalaTable(const string& 
id,
     if (!HmsClient::IsSynchronized(table)) {
       table.parameters[HmsClient::kStorageHandlerKey] = 
HmsClient::kKuduStorageHandler;
     } else {
-      RETURN_NOT_OK(PopulateTable(id, Substitute("$0.$1", db_name, tb_name),
-                                  table.owner, schema, master_addresses_, 
table.tableType, &table));
+      RETURN_NOT_OK(PopulateTable(id, Substitute("$0.$1", db_name, tb_name), 
table.owner, schema,
+                                  cluster_id, master_addresses_, 
table.tableType, &table));
     }
     return client->AlterTable(db_name, tb_name, table, EnvironmentContext());
   });
@@ -215,8 +218,9 @@ Status HmsCatalog::DowngradeToLegacyImpalaTable(const 
string& name) {
     }
     // Downgrade the storage handler.
     table.parameters[HmsClient::kStorageHandlerKey] = 
HmsClient::kLegacyKuduStorageHandler;
-    // Remove the Kudu-specific field 'kudu.table_id'.
+    // Remove the Kudu-specific fields 'kudu.table_id' and `kudu.cluster_id`
     EraseKeyReturnValuePtr(&table.parameters, HmsClient::kKuduTableIdKey);
+    EraseKeyReturnValuePtr(&table.parameters, HmsClient::kKuduClusterIdKey);
     return client->AlterTable(table.dbName, table.tableName, table, 
EnvironmentContext());
   });
 }
@@ -254,6 +258,7 @@ Status HmsCatalog::GetKuduTables(vector<hive::Table>* 
kudu_tables) {
 Status HmsCatalog::AlterTable(const string& id,
                               const string& name,
                               const string& new_name,
+                              const string& cluster_id,
                               optional<const string&> owner,
                               const Schema& schema,
                               const bool& check_id) {
@@ -289,7 +294,7 @@ Status HmsCatalog::AlterTable(const string& id,
       }
 
       // Overwrite fields in the table that have changed, including the new 
name.
-      RETURN_NOT_OK(PopulateTable(id, new_name, owner, schema, 
master_addresses_,
+      RETURN_NOT_OK(PopulateTable(id, new_name, owner, schema, cluster_id, 
master_addresses_,
           table.tableType, &table));
       return client->AlterTable(hms_database.ToString(), hms_table.ToString(),
                                 table, EnvironmentContext(check_id));
@@ -360,6 +365,7 @@ Status HmsCatalog::PopulateTable(const string& id,
                                  const string& name,
                                  const optional<const string&>& owner,
                                  const Schema& schema,
+                                 const string& cluster_id,
                                  const string& master_addresses,
                                  const string& table_type,
                                  hive::Table* table) {
@@ -387,9 +393,10 @@ Status HmsCatalog::PopulateTable(const string& id,
     EraseKeyReturnValuePtr(&table->parameters, HmsClient::kExternalTableKey);
   }
 
-  // Only set the table id on synchronized tables.
+  // Only set the table id and cluster id on synchronized tables.
   if (HmsClient::IsSynchronized(*table)) {
     table->parameters[HmsClient::kKuduTableIdKey] = id;
+    table->parameters[HmsClient::kKuduClusterIdKey] = cluster_id;
   }
 
   // Add the Kudu-specific parameters. This intentionally avoids overwriting
diff --git a/src/kudu/hms/hms_catalog.h b/src/kudu/hms/hms_catalog.h
index e87ea9d..f162afe 100644
--- a/src/kudu/hms/hms_catalog.h
+++ b/src/kudu/hms/hms_catalog.h
@@ -72,6 +72,7 @@ class HmsCatalog {
   // Fails the HMS is unreachable, or a table with the same name is already 
present.
   Status CreateTable(const std::string& id,
                      const std::string& name,
+                     const std::string& cluster_id,
                      const boost::optional<const std::string&>& owner,
                      const Schema& schema,
                      const std::string& table_type = 
hms::HmsClient::kManagedTable)
@@ -102,6 +103,7 @@ class HmsCatalog {
   Status AlterTable(const std::string& id,
                     const std::string& name,
                     const std::string& new_name,
+                    const std::string& cluster_id,
                     boost::optional<const std::string&> owner,
                     const Schema& schema,
                     const bool& check_id = true) WARN_UNUSED_RESULT;
@@ -111,6 +113,7 @@ class HmsCatalog {
   // This method will fail if the HMS is unreachable, if the table is not a
   // legacy table, or if the table entry in not in the HMS.
   Status UpgradeLegacyImpalaTable(const std::string& id,
+                                  const std::string& cluster_id,
                                   const std::string& db_name,
                                   const std::string& tb_name,
                                   const Schema& schema) WARN_UNUSED_RESULT;
@@ -163,6 +166,7 @@ class HmsCatalog {
                               const std::string& name,
                               const boost::optional<const std::string&>& owner,
                               const Schema& schema,
+                              const std::string& cluster_id,
                               const std::string& master_addresses,
                               const std::string& table_type,
                               hive::Table* table) WARN_UNUSED_RESULT;
diff --git a/src/kudu/hms/hms_client-test.cc b/src/kudu/hms/hms_client-test.cc
index e5e0044..487234c 100644
--- a/src/kudu/hms/hms_client-test.cc
+++ b/src/kudu/hms/hms_client-test.cc
@@ -57,10 +57,11 @@ class HmsClientTest : public KuduTest,
                       public 
::testing::WithParamInterface<optional<SaslProtection::Type>> {
  public:
 
-  Status CreateTable(HmsClient* client,
+  static Status CreateTable(HmsClient* client,
                      const string& database_name,
                      const string& table_name,
-                     const string& table_id) {
+                     const string& table_id,
+                     const string& cluster_id) {
     hive::Table table;
     table.dbName = database_name;
     table.tableName = table_name;
@@ -68,6 +69,7 @@ class HmsClientTest : public KuduTest,
     table.__set_parameters({
         make_pair(HmsClient::kKuduTableIdKey, table_id),
         make_pair(HmsClient::kKuduTableNameKey, table_name),
+        make_pair(HmsClient::kKuduClusterIdKey, cluster_id),
         make_pair(HmsClient::kKuduMasterAddrsKey, string("TODO")),
         make_pair(HmsClient::kStorageHandlerKey, 
HmsClient::kKuduStorageHandler)
     });
@@ -156,14 +158,16 @@ TEST_P(HmsClientTest, TestHmsOperations) {
 
   string table_name = "my_table";
   string table_id = "table-id";
+  string cluster_id = "cluster-id";
 
   // Check that the HMS rejects Kudu tables without a table ID.
-  ASSERT_STR_CONTAINS(CreateTable(&client, database_name, table_name, 
"").ToString(),
+  ASSERT_STR_CONTAINS(CreateTable(&client, database_name, table_name, "", 
cluster_id).ToString(),
                       "Kudu table entry must contain a table ID");
 
   // Create a table.
-  ASSERT_OK(CreateTable(&client, database_name, table_name, table_id));
-  ASSERT_TRUE(CreateTable(&client, database_name, table_name, 
table_id).IsAlreadyPresent());
+  ASSERT_OK(CreateTable(&client, database_name, table_name, table_id, 
cluster_id));
+  ASSERT_TRUE(CreateTable(&client, database_name, table_name,
+      table_id, cluster_id).IsAlreadyPresent());
 
   // Retrieve a table.
   hive::Table my_table;
@@ -171,6 +175,7 @@ TEST_P(HmsClientTest, TestHmsOperations) {
   EXPECT_EQ(database_name, my_table.dbName) << "my_table: " << my_table;
   EXPECT_EQ(table_name, my_table.tableName);
   EXPECT_EQ(table_id, my_table.parameters[HmsClient::kKuduTableIdKey]);
+  EXPECT_EQ(cluster_id, my_table.parameters[HmsClient::kKuduClusterIdKey]);
   EXPECT_EQ(HmsClient::kKuduStorageHandler, 
my_table.parameters[HmsClient::kStorageHandlerKey]);
   EXPECT_EQ(HmsClient::kManagedTable, my_table.tableType);
 
@@ -201,10 +206,12 @@ TEST_P(HmsClientTest, TestHmsOperations) {
 
   // Create a table with an uppercase name.
   string uppercase_table_name = "my_UPPERCASE_Table";
-  ASSERT_OK(CreateTable(&client, database_name, uppercase_table_name, 
"uppercase-table-id"));
+  ASSERT_OK(CreateTable(&client, database_name, uppercase_table_name,
+      "uppercase-table-id", cluster_id));
 
   // Create a table with an illegal utf-8 name.
-  ASSERT_TRUE(CreateTable(&client, database_name, "☃ sculptures 😉", 
table_id).IsInvalidArgument());
+  ASSERT_TRUE(CreateTable(&client, database_name, "☃ sculptures 😉",
+      table_id, cluster_id).IsInvalidArgument());
 
   // Create a non-Kudu table.
   hive::Table non_kudu_table;
@@ -453,7 +460,7 @@ TEST_F(HmsClientTest, TestCaseSensitivity) {
   ASSERT_OK(client.CreateDatabase(db));
 
   // Create a table.
-  ASSERT_OK(CreateTable(&client, "my_db", "Foo", "abc123"));
+  ASSERT_OK(CreateTable(&client, "my_db", "Foo", "abc123", "Bar"));
 
   hive::Table table;
   ASSERT_OK(client.GetTable("my_db", "Foo", &table));
diff --git a/src/kudu/hms/hms_client.cc b/src/kudu/hms/hms_client.cc
index e945861..84eb50f 100644
--- a/src/kudu/hms/hms_client.cc
+++ b/src/kudu/hms/hms_client.cc
@@ -107,6 +107,7 @@ const char* const HmsClient::kLegacyKuduStorageHandler =
   "com.cloudera.kudu.hive.KuduStorageHandler";
 const char* const HmsClient::kLegacyTablePrefix = "impala::";
 const char* const HmsClient::kKuduTableIdKey = "kudu.table_id";
+const char* const HmsClient::kKuduClusterIdKey = "kudu.cluster_id";
 const char* const HmsClient::kKuduTableNameKey = "kudu.table_name";
 const char* const HmsClient::kKuduMasterAddrsKey = "kudu.master_addresses";
 const char* const HmsClient::kKuduMasterEventKey = "kudu.master_event";
diff --git a/src/kudu/hms/hms_client.h b/src/kudu/hms/hms_client.h
index e87082f..59aff12 100644
--- a/src/kudu/hms/hms_client.h
+++ b/src/kudu/hms/hms_client.h
@@ -61,6 +61,7 @@ class HmsClient {
   static const char* const kLegacyKuduStorageHandler;
   static const char* const kLegacyTablePrefix;
   static const char* const kKuduTableIdKey;
+  static const char* const kKuduClusterIdKey;
   static const char* const kKuduTableNameKey;
   static const char* const kKuduMasterAddrsKey;
   static const char* const kKuduMasterEventKey;
diff --git a/src/kudu/integration-tests/hms_itest-base.cc 
b/src/kudu/integration-tests/hms_itest-base.cc
index f4be1db..5cc9e49 100644
--- a/src/kudu/integration-tests/hms_itest-base.cc
+++ b/src/kudu/integration-tests/hms_itest-base.cc
@@ -221,6 +221,7 @@ void HmsITestHarness::CheckTable(const string& 
database_name,
     ASSERT_EQ(schema.Column(idx).comment(), hms_table.sd.cols[idx].comment);
   }
   ASSERT_EQ(table->id(), 
hms_table.parameters[hms::HmsClient::kKuduTableIdKey]);
+  ASSERT_EQ(table->client()->cluster_id(), 
hms_table.parameters[hms::HmsClient::kKuduClusterIdKey]);
   ASSERT_TRUE(boost::iequals(table->name(),
       hms_table.parameters[hms::HmsClient::kKuduTableNameKey]));
   ASSERT_EQ(HostPort::ToCommaSeparatedString(cluster->master_rpc_addrs()),
diff --git a/src/kudu/integration-tests/master_hms-itest.cc 
b/src/kudu/integration-tests/master_hms-itest.cc
index e8ff507..2efa9ec 100644
--- a/src/kudu/integration-tests/master_hms-itest.cc
+++ b/src/kudu/integration-tests/master_hms-itest.cc
@@ -33,6 +33,7 @@
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
 #include "kudu/common/common.pb.h"
 #include "kudu/common/table_util.h"
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/hms/hive_metastore_types.h"
 #include "kudu/hms/hms_client.h"
@@ -559,6 +560,68 @@ TEST_F(MasterHmsTest, TestIgnoreExternalTables) {
   ASSERT_EQ(kManagedTableName, ext.parameters[HmsClient::kKuduTableNameKey]);
 }
 
+TEST_F(MasterHmsTest, TestIgnoreOtherClusterIds) {
+  // Set up a few tables.
+  ASSERT_OK(CreateKuduTable("default", "same"));
+  ASSERT_OK(CreateKuduTable("default", "other"));
+  ASSERT_OK(CreateKuduTable("default", "noid"));
+  shared_ptr<KuduTable> kudu_table;
+  hive::Table hive_table;
+
+  // Alter the cluster id for the `other` table so that it no longer matches 
the cluster.
+  // This event and any future events should be ignored.
+  ASSERT_OK(harness_.hms_client()->GetTable("default", "other", &hive_table));
+  hive_table.parameters[HmsClient::kKuduClusterIdKey] = "other_cluster_id";
+  ASSERT_OK(harness_.hms_client()->AlterTable("default", "other", hive_table));
+
+  // Remove the cluster id for the `noid` table to represent a table created 
before
+  // cluster id support was added.
+  ASSERT_OK(harness_.hms_client()->GetTable("default", "noid", &hive_table));
+  EraseKeyReturnValuePtr(&hive_table.parameters, HmsClient::kKuduClusterIdKey);
+  ASSERT_OK(harness_.hms_client()->AlterTable("default", "noid", hive_table));
+
+  // Alter the tables in the HMS, the `same` and `noid` tables should be 
updated in Kudu.
+  ASSERT_OK(harness_.hms_client()->GetTable("default", "same", &hive_table));
+  hive_table.tableName = "same1";
+  ASSERT_OK(harness_.hms_client()->AlterTable("default", "same", hive_table));
+  ASSERT_OK(harness_.hms_client()->GetTable("default", "other", &hive_table));
+  hive_table.tableName = "other1";
+  ASSERT_OK(harness_.hms_client()->AlterTable("default", "other", hive_table));
+  ASSERT_OK(harness_.hms_client()->GetTable("default", "noid", &hive_table));
+  hive_table.tableName = "noid1";
+  ASSERT_OK(harness_.hms_client()->AlterTable("default", "noid", hive_table));
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(client_->OpenTable("default.same1", &kudu_table));
+    ASSERT_OK(client_->OpenTable("default.other", &kudu_table));
+    ASSERT_OK(client_->OpenTable("default.noid1", &kudu_table));
+    Status s = client_->OpenTable("default.other1", &kudu_table);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  });
+
+  // Alter the tables back.
+  ASSERT_OK(harness_.hms_client()->GetTable("default", "same1", &hive_table));
+  hive_table.tableName = "same";
+  ASSERT_OK(harness_.hms_client()->AlterTable("default", "same1", hive_table));
+  ASSERT_OK(harness_.hms_client()->GetTable("default", "other1", &hive_table));
+  hive_table.tableName = "other";
+  ASSERT_OK(harness_.hms_client()->AlterTable("default", "other1", 
hive_table));
+  ASSERT_OK(harness_.hms_client()->GetTable("default", "noid1", &hive_table));
+  hive_table.tableName = "noid";
+  ASSERT_OK(harness_.hms_client()->AlterTable("default", "noid1", hive_table));
+
+  // Drop the tables in the HMS, the `same` and `noid` tables should be 
dropped in Kudu.
+  ASSERT_OK(harness_.hms_client()->DropTable("default", "same"));
+  ASSERT_OK(harness_.hms_client()->DropTable("default", "other"));
+  ASSERT_OK(harness_.hms_client()->DropTable("default", "noid"));
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(client_->OpenTable("default.other", &kudu_table));
+    Status s = client_->OpenTable("default.same", &kudu_table);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    s = client_->OpenTable("default.noid", &kudu_table);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  });
+}
+
 TEST_F(MasterHmsTest, TestUppercaseIdentifiers) {
   ASSERT_OK(CreateKuduTable("default", "MyTable"));
   NO_FATALS(CheckTable("default", "MyTable", /*user=*/none));
diff --git a/src/kudu/master/catalog_manager.cc 
b/src/kudu/master/catalog_manager.cc
index ea122f9..5438e41 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -910,7 +910,8 @@ Status CatalogManager::InitClusterId() {
   // Once the cluster ID is loaded or stored, store it in a variable for
   // fast lookup.
   if (s.ok()) {
-     master_->set_cluster_id(cluster_id);
+    std::lock_guard<simple_spinlock> l(cluster_id_lock_);
+    cluster_id_ = cluster_id;
   }
 
   return s;
@@ -1276,7 +1277,8 @@ Status CatalogManager::PrepareFollowerClusterId() {
     LOG_WITH_PREFIX(INFO) << kDescription << ": success";
     // Once the cluster ID is loaded or stored, store it in a variable for
     // fast lookup.
-    master_->set_cluster_id(cluster_id);
+    std::lock_guard<simple_spinlock> l(cluster_id_lock_);
+    cluster_id_ = cluster_id;
   } else {
     LOG_WITH_PREFIX(WARNING) << kDescription << ": " << s.ToString();
   }
@@ -1331,7 +1333,7 @@ Status CatalogManager::PrepareFollowerTokenVerifier() {
 Status CatalogManager::PrepareFollower(MonoTime* last_tspk_run) {
   leader_lock_.AssertAcquiredForReading();
   // Load the cluster ID.
-  if (master_->cluster_id().empty()) {
+  if (GetClusterId().empty()) {
     RETURN_NOT_OK(PrepareFollowerClusterId());
   }
   // Load the CA certificate and CA private key.
@@ -1872,7 +1874,8 @@ Status CatalogManager::CreateTable(const 
CreateTableRequestPB* orig_req,
   // since this step validates that the table name is available in the HMS 
catalog.
   if (hms_catalog_ && is_user_table) {
     CHECK(rpc);
-    Status s = hms_catalog_->CreateTable(table->id(), normalized_table_name, 
req.owner(), schema);
+    Status s = hms_catalog_->CreateTable(table->id(), normalized_table_name, 
GetClusterId(),
+        req.owner(), schema);
     if (!s.ok()) {
       s = s.CloneAndPrepend(Substitute("an error occurred while creating table 
$0 in the HMS",
                                        normalized_table_name));
@@ -2630,7 +2633,7 @@ Status CatalogManager::AlterTableRpc(const 
AlterTableRequestPB& req,
     // Rename the table in the HMS.
     RETURN_NOT_OK(SetupError(hms_catalog_->AlterTable(
             table->id(), l.data().name(), normalized_new_table_name,
-            l.data().owner(), schema),
+            GetClusterId(), l.data().owner(), schema),
         resp, MasterErrorPB::HIVE_METASTORE_ERROR));
 
     // Unlock the table, and wait for the notification log listener to handle
@@ -3001,7 +3004,7 @@ Status CatalogManager::AlterTable(const 
AlterTableRequestPB& req,
     DCHECK(!req.has_new_table_name());
     WARN_NOT_OK(hms_catalog_->AlterTable(
           table->id(), normalized_table_name, normalized_table_name,
-          l.mutable_data()->owner(), new_schema),
+          GetClusterId(), l.mutable_data()->owner(), new_schema),
         Substitute("failed to alter HiveMetastore schema for table $0, "
                    "HMS schema information will be stale", table->ToString()));
   }
@@ -4603,6 +4606,11 @@ Status CatalogManager::ProcessTabletReport(
   return Status::OK();
 }
 
+string CatalogManager::GetClusterId() const {
+  std::lock_guard<simple_spinlock> l(cluster_id_lock_);
+  return cluster_id_;
+}
+
 int64_t CatalogManager::GetLatestNotificationLogEventId() {
   DCHECK(hms_catalog_);
   leader_lock_.AssertAcquiredForReading();
diff --git a/src/kudu/master/catalog_manager.h 
b/src/kudu/master/catalog_manager.h
index 9d19224..86e16ce 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -715,6 +715,8 @@ class CatalogManager : public 
tserver::TabletReplicaLookupIf {
                              TabletReportUpdatesPB* full_report_update,
                              rpc::RpcContext* rpc);
 
+  std::string GetClusterId() const;
+
   // Returns the latest handled Hive Metastore notification log event ID.
   int64_t GetLatestNotificationLogEventId();
 
@@ -1216,6 +1218,10 @@ class CatalogManager : public 
tserver::TabletReplicaLookupIf {
   // LRU cache to store responses generated by GetTableLocations().
   std::unique_ptr<TableLocationsCache> table_locations_cache_;
 
+  // Lock protecting cluster_id_.
+  mutable simple_spinlock cluster_id_lock_;
+  std::string cluster_id_;
+
   DISALLOW_COPY_AND_ASSIGN(CatalogManager);
 };
 
diff --git a/src/kudu/master/hms_notification_log_listener.cc 
b/src/kudu/master/hms_notification_log_listener.cc
index 517a489..b2845a4 100644
--- a/src/kudu/master/hms_notification_log_listener.cc
+++ b/src/kudu/master/hms_notification_log_listener.cc
@@ -352,6 +352,18 @@ Status 
HmsNotificationLogListenerTask::HandleAlterTableEvent(const hive::Notific
     return Status::OK();
   }
 
+  // If there is not a cluster ID, for maximum compatibility we should assume 
this is an older
+  // Kudu table without a cluster ID set. This is safe because we still 
validate the table ID
+  // which is universally unique.
+  const string* cluster_id =
+      FindOrNull(before_table.parameters, hms::HmsClient::kKuduClusterIdKey);
+  if (cluster_id && *cluster_id != catalog_manager_->GetClusterId()) {
+    // Not for this cluster; skip it.
+    VLOG(2) << Substitute("Ignoring alter event for table $0 of cluster $1",
+        before_table.tableName, *cluster_id);
+    return Status::OK();
+  }
+
   hive::Table after_table;
   RETURN_NOT_OK(DeserializeTable(event, message, "tableObjAfterJson", 
&after_table));
 
@@ -422,6 +434,18 @@ Status 
HmsNotificationLogListenerTask::HandleDropTableEvent(const hive::Notifica
     return Status::OK();
   }
 
+  // If there is not a cluster ID, for maximum compatibility we should assume 
this is an older
+  // Kudu table without a cluster ID set. This is safe because we still 
validate the table ID
+  // which is universally unique.
+  const string* cluster_id =
+      FindOrNull(table.parameters, hms::HmsClient::kKuduClusterIdKey);
+  if (cluster_id && *cluster_id != catalog_manager_->GetClusterId()) {
+    // Not for this cluster; skip it.
+    VLOG(2) << Substitute("Ignoring alter event for table $0 of cluster $1",
+                          table.tableName, *cluster_id);
+    return Status::OK();
+  }
+
   const string* table_id = FindOrNull(table.parameters, 
hms::HmsClient::kKuduTableIdKey);
   if (!table_id) {
     return Status::IllegalState("missing Kudu table ID");
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index 764550c..66ed824 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -359,7 +359,7 @@ Status Master::ListMasters(vector<ServerEntryPB>* masters) 
const {
     
local_entry.mutable_instance_id()->CopyFrom(catalog_manager_->NodeInstance());
     RETURN_NOT_OK(GetMasterRegistration(local_entry.mutable_registration()));
     local_entry.set_role(RaftPeerPB::LEADER);
-    local_entry.set_cluster_id(cluster_id_);
+    local_entry.set_cluster_id(catalog_manager_->GetClusterId());
     local_entry.set_member_type(RaftPeerPB::VOTER);
     masters->emplace_back(std::move(local_entry));
     return Status::OK();
diff --git a/src/kudu/master/master.h b/src/kudu/master/master.h
index e922e9d..6522126 100644
--- a/src/kudu/master/master.h
+++ b/src/kudu/master/master.h
@@ -19,7 +19,6 @@
 #include <atomic>
 #include <cstdint>
 #include <memory>
-#include <string>
 #include <vector>
 
 #include "kudu/common/wire_protocol.pb.h"
@@ -36,6 +35,7 @@ class HostPort;
 class MaintenanceManager;
 class MonoDelta;
 class ThreadPool;
+
 namespace rpc {
 class RpcContext;
 }  // namespace rpc
@@ -78,8 +78,6 @@ class Master : public kserver::KuduServer {
   Status WaitUntilCatalogManagerIsLeaderAndReadyForTests(const MonoDelta& 
timeout)
       WARN_UNUSED_RESULT;
 
-  const std::string& cluster_id() const { return cluster_id_; }
-
   MasterCertAuthority* cert_authority() { return cert_authority_.get(); }
 
   security::TokenSigner* token_signer() { return token_signer_.get(); }
@@ -141,9 +139,6 @@ class Master : public kserver::KuduServer {
   // safe in a particular case.
   void ShutdownImpl();
 
-  // Set the cluster ID on this master for fast lookup.
-  void set_cluster_id(const std::string& cluster_id) { cluster_id_ = 
cluster_id; }
-
   enum MasterState {
     kStopped,
     kInitialized,
@@ -152,8 +147,6 @@ class Master : public kserver::KuduServer {
 
   MasterState state_;
 
-  std::string cluster_id_ = "";
-
   std::unique_ptr<MasterCertAuthority> cert_authority_;
   std::unique_ptr<security::TokenSigner> token_signer_;
   std::unique_ptr<CatalogManager> catalog_manager_;
diff --git a/src/kudu/master/master_service.cc 
b/src/kudu/master/master_service.cc
index e1d8987..f1d30c9 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -658,7 +658,7 @@ void MasterServiceImpl::GetMasterRegistration(const 
GetMasterRegistrationRequest
   const auto& role_and_member = 
server_->catalog_manager()->GetRoleAndMemberType();
   resp->set_role(role_and_member.first);
   resp->set_member_type(role_and_member.second);
-  resp->set_cluster_id(server_->cluster_id());
+  resp->set_cluster_id(server_->catalog_manager()->GetClusterId());
   rpc->RespondSuccess();
 }
 
@@ -754,7 +754,7 @@ void MasterServiceImpl::ConnectToMaster(const 
ConnectToMasterRequestPB* /*req*/,
                            : consensus::RaftPeerPB::FOLLOWER);
 
   // Add the cluster ID.
-  resp->set_cluster_id(server_->cluster_id());
+  resp->set_cluster_id(server_->catalog_manager()->GetClusterId());
 
   rpc->RespondSuccess();
 }
diff --git a/src/kudu/master/sys_catalog-test.cc 
b/src/kudu/master/sys_catalog-test.cc
index bbcc027..feea412 100644
--- a/src/kudu/master/sys_catalog-test.cc
+++ b/src/kudu/master/sys_catalog-test.cc
@@ -439,8 +439,8 @@ TEST_F(SysCatalogTest, LoadClusterID) {
   ASSERT_TRUE(cluster_id_entry.has_cluster_id());
   ASSERT_TRUE(!cluster_id_entry.cluster_id().empty());
 
-  ASSERT_EQ(cluster_id_entry.cluster_id(), master_->cluster_id());
-  const string init_id = master_->cluster_id();
+  string init_id = master_->catalog_manager()->GetClusterId();
+  ASSERT_EQ(cluster_id_entry.cluster_id(), init_id);
 
   // Check that if a cluster ID is already present,
   // it cannot be overwritten using SysCatalogTable::AddClusterIdEntry().
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 4ac0210..ab24435 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -3858,37 +3858,6 @@ Status CreateLegacyHmsTable(HmsClient* client,
   return client->CreateTable(table);
 }
 
-Status CreateHmsTable(HmsClient* client,
-                      const string& database_name,
-                      const string& table_name,
-                      const string& table_type,
-                      const string& master_addresses,
-                      const string& table_id) {
-  hive::Table table;
-  table.dbName = database_name;
-  table.tableName = table_name;
-  table.tableType = table_type;
-
-  // TODO(HIVE-19253): Used along with table type to indicate an external 
table.
-  if (table_type == HmsClient::kExternalTable) {
-    table.parameters[HmsClient::kExternalTableKey] = "TRUE";
-  }
-
-  // Only set the table id on managed tables.
-  if (table_type == HmsClient::kManagedTable) {
-    table.parameters[HmsClient::kKuduTableIdKey] = table_id;
-  }
-
-  table.parameters[HmsClient::kKuduTableNameKey] =
-      Substitute("$0.$1", database_name, table_name);
-  table.parameters[HmsClient::kKuduMasterAddrsKey] = master_addresses;
-  table.parameters[HmsClient::kStorageHandlerKey] = 
HmsClient::kKuduStorageHandler;
-
-  hive::EnvironmentContext env_ctx;
-  env_ctx.__set_properties({ make_pair(HmsClient::kKuduMasterEventKey, "true") 
});
-  return client->CreateTable(table, env_ctx);
-}
-
 Status CreateKuduTable(const shared_ptr<KuduClient>& kudu_client,
                        const string& table_name,
                        const optional<string>& owner = {}) {
@@ -3933,9 +3902,12 @@ void ValidateHmsEntries(HmsClient* hms_client,
     ASSERT_TRUE(boost::iequals(kudu_table->name(),
                                
hms_table.parameters[hms::HmsClient::kKuduTableNameKey]));
     ASSERT_EQ(kudu_table->id(), 
hms_table.parameters[HmsClient::kKuduTableIdKey]);
+    ASSERT_EQ(kudu_table->client()->cluster_id(),
+              hms_table.parameters[HmsClient::kKuduClusterIdKey]);
   } else {
     ASSERT_TRUE(ContainsKey(hms_table.parameters, 
HmsClient::kKuduTableNameKey));
     ASSERT_FALSE(ContainsKey(hms_table.parameters, 
HmsClient::kKuduTableIdKey));
+    ASSERT_FALSE(ContainsKey(hms_table.parameters, 
HmsClient::kKuduClusterIdKey));
   }
 }
 
@@ -4043,7 +4015,7 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
   ASSERT_OK(CreateKuduTable(kudu_client, "default.control", kUsername));
   ASSERT_OK(kudu_client->OpenTable("default.control", &control));
   ASSERT_OK(hms_catalog.CreateTable(
-        control->id(), control->name(), kUsername,
+        control->id(), control->name(), kudu_client->cluster_id(), kUsername,
         KuduSchema::ToSchema(control->schema())));
 
   // Control case: the check tool should not flag this external synchronized 
table.
@@ -4051,11 +4023,12 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
   ASSERT_OK(CreateKuduTable(kudu_client, "default.control_external", 
kUsername));
   ASSERT_OK(kudu_client->OpenTable("default.control_external", 
&control_external));
   ASSERT_OK(hms_catalog.CreateTable(
-      control_external->id(), control_external->name(), kUsername,
-      KuduSchema::ToSchema(control_external->schema()), 
HmsClient::kExternalTable));
+      control_external->id(), control_external->name(), 
kudu_client->cluster_id(),
+      kUsername, KuduSchema::ToSchema(control_external->schema()), 
HmsClient::kExternalTable));
   hive::Table hms_control_external;
   ASSERT_OK(hms_client.GetTable("default", "control_external", 
&hms_control_external));
   hms_control_external.parameters[HmsClient::kKuduTableIdKey] = 
control_external->id();
+  hms_control_external.parameters[HmsClient::kKuduClusterIdKey] = 
kudu_client->cluster_id();
   hms_control_external.parameters[HmsClient::kExternalPurgeKey] = "true";
   ASSERT_OK(hms_client.AlterTable("default", "control_external",
       hms_control_external, master_ctx));
@@ -4065,24 +4038,32 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
   ASSERT_OK(CreateKuduTable(kudu_client, "default.UPPERCASE", kUsername));
   ASSERT_OK(kudu_client->OpenTable("default.UPPERCASE", &test_uppercase));
   ASSERT_OK(hms_catalog.CreateTable(
-        test_uppercase->id(), test_uppercase->name(), kUsername,
-        KuduSchema::ToSchema(test_uppercase->schema())));
+        test_uppercase->id(), test_uppercase->name(), 
kudu_client->cluster_id(),
+        kUsername, KuduSchema::ToSchema(test_uppercase->schema())));
 
   // Test case: inconsistent schema.
   shared_ptr<KuduTable> inconsistent_schema;
   ASSERT_OK(CreateKuduTable(kudu_client, "default.inconsistent_schema", 
kUsername));
   ASSERT_OK(kudu_client->OpenTable("default.inconsistent_schema", 
&inconsistent_schema));
   ASSERT_OK(hms_catalog.CreateTable(
-        inconsistent_schema->id(), inconsistent_schema->name(), kUsername,
-        SchemaBuilder().Build()));
+        inconsistent_schema->id(), inconsistent_schema->name(), 
kudu_client->cluster_id(),
+        kUsername, SchemaBuilder().Build()));
 
   // Test case: inconsistent name.
   shared_ptr<KuduTable> inconsistent_name;
   ASSERT_OK(CreateKuduTable(kudu_client, "default.inconsistent_name", 
kUsername));
   ASSERT_OK(kudu_client->OpenTable("default.inconsistent_name", 
&inconsistent_name));
   ASSERT_OK(hms_catalog.CreateTable(
-        inconsistent_name->id(), "default.inconsistent_name_hms", kUsername,
-        KuduSchema::ToSchema(inconsistent_name->schema())));
+      inconsistent_name->id(), "default.inconsistent_name_hms", 
kudu_client->cluster_id(),
+      kUsername, KuduSchema::ToSchema(inconsistent_name->schema())));
+
+  // Test case: inconsistent cluster id.
+  shared_ptr<KuduTable> inconsistent_cluster;
+  ASSERT_OK(CreateKuduTable(kudu_client, "default.inconsistent_cluster", 
kUsername));
+  ASSERT_OK(kudu_client->OpenTable("default.inconsistent_cluster", 
&inconsistent_cluster));
+  ASSERT_OK(hms_catalog.CreateTable(
+      inconsistent_cluster->id(), "default.inconsistent_cluster", 
"inconsistent_cluster",
+      kUsername, KuduSchema::ToSchema(inconsistent_cluster->schema())));
 
   // Test case: inconsistent master addresses.
   shared_ptr<KuduTable> inconsistent_master_addrs;
@@ -4093,7 +4074,8 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
       cluster_->master_rpc_addrs()[0].ToString()));
   ASSERT_OK(invalid_hms_catalog.Start());
   ASSERT_OK(invalid_hms_catalog.CreateTable(
-        inconsistent_master_addrs->id(), inconsistent_master_addrs->name(), 
kUsername,
+        inconsistent_master_addrs->id(), inconsistent_master_addrs->name(),
+        kudu_client->cluster_id(), kUsername,
         KuduSchema::ToSchema(inconsistent_master_addrs->schema())));
 
   // Test case: bad table id.
@@ -4101,19 +4083,19 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
   ASSERT_OK(CreateKuduTable(kudu_client, "default.bad_id", kUsername));
   ASSERT_OK(kudu_client->OpenTable("default.bad_id", &bad_id));
   ASSERT_OK(hms_catalog.CreateTable(
-      "not_a_table_id", "default.bad_id", kUsername,
-      KuduSchema::ToSchema(bad_id->schema())));
+      "not_a_table_id", "default.bad_id", kudu_client->cluster_id(),
+      kUsername, KuduSchema::ToSchema(bad_id->schema())));
 
   // Test case: orphan table in the HMS.
   ASSERT_OK(hms_catalog.CreateTable(
-        "orphan-hms-table-id", "default.orphan_hms_table", kUsername,
-        SchemaBuilder().Build()));
+        "orphan-hms-table-id", "default.orphan_hms_table", 
"orphan-hms-cluster-id",
+        kUsername, SchemaBuilder().Build()));
 
   // Test case: orphan table in the HMS with different, but functionally
   // the same master addresses.
   ASSERT_OK(hms_catalog.CreateTable(
-      "orphan-hms-masters-id", "default.orphan_hms_table_masters", kUsername,
-      SchemaBuilder().Build()));
+      "orphan-hms-masters-id", "default.orphan_hms_table_masters", 
"orphan-hms-cluster-id",
+      kUsername, SchemaBuilder().Build()));
   // Reverse the address order and duplicate the addresses.
   vector<string> modified_addrs;
   for (const auto& hp : cluster_->master_rpc_addrs()) {
@@ -4127,7 +4109,8 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
 
   // Test case: orphan external synchronized table in the HMS.
   ASSERT_OK(hms_catalog.CreateTable(
-      "orphan-hms-table-id-external", "default.orphan_hms_table_external", 
kUsername,
+      "orphan-hms-table-id-external", "default.orphan_hms_table_external",
+      "orphan-hms-cluster-id-external", kUsername,
       SchemaBuilder().Build(), HmsClient::kExternalTable));
   hive::Table hms_orphan_external;
   ASSERT_OK(hms_client.GetTable("default", "orphan_hms_table_external", 
&hms_orphan_external));
@@ -4195,24 +4178,24 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
   ASSERT_OK(CreateKuduTable(kudu_client, "default.no_owner_in_hms", 
kUsername));
   ASSERT_OK(kudu_client->OpenTable("default.no_owner_in_hms", 
&no_owner_in_hms));
   ASSERT_OK(hms_catalog.CreateTable(
-      no_owner_in_hms->id(), no_owner_in_hms->name(), boost::none,
-      KuduSchema::ToSchema(no_owner_in_hms->schema())));
+      no_owner_in_hms->id(), no_owner_in_hms->name(), 
kudu_client->cluster_id(),
+      boost::none, KuduSchema::ToSchema(no_owner_in_hms->schema())));
 
   // Test case: no owner in Kudu
   shared_ptr<KuduTable> no_owner_in_kudu;
   ASSERT_OK(CreateKuduTable(kudu_client, "default.no_owner_in_kudu", 
static_cast<string>("")));
   ASSERT_OK(kudu_client->OpenTable("default.no_owner_in_kudu", 
&no_owner_in_kudu));
   ASSERT_OK(hms_catalog.CreateTable(
-      no_owner_in_kudu->id(), no_owner_in_kudu->name(), kUsername,
-      KuduSchema::ToSchema(no_owner_in_kudu->schema())));
+      no_owner_in_kudu->id(), no_owner_in_kudu->name(), 
kudu_client->cluster_id(),
+      kUsername, KuduSchema::ToSchema(no_owner_in_kudu->schema())));
 
   // Test case: different owner in Kudu and HMS
   shared_ptr<KuduTable> different_owner;
   ASSERT_OK(CreateKuduTable(kudu_client, "default.different_owner", 
kUsername));
   ASSERT_OK(kudu_client->OpenTable("default.different_owner", 
&different_owner));
   ASSERT_OK(hms_catalog.CreateTable(
-      different_owner->id(), different_owner->name(), kOtherUsername,
-      KuduSchema::ToSchema(different_owner->schema())));
+      different_owner->id(), different_owner->name(), 
kudu_client->cluster_id(),
+      kOtherUsername, KuduSchema::ToSchema(different_owner->schema())));
 
   unordered_set<string> consistent_tables = {
     "default.control",
@@ -4223,6 +4206,7 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
     "default.UPPERCASE",
     "default.inconsistent_schema",
     "default.inconsistent_name",
+    "default.inconsistent_cluster",
     "default.inconsistent_master_addrs",
     "default.bad_id",
     "default.different_owner",
@@ -4325,6 +4309,7 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
     "default.UPPERCASE",
     "default.inconsistent_schema",
     "default.inconsistent_name",
+    "default.inconsistent_cluster",
     "default.inconsistent_master_addrs",
     "default.bad_id",
     "default.different_owner",
@@ -4341,6 +4326,7 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
     "uppercase",
     "inconsistent_schema",
     "inconsistent_name_hms",
+    "inconsistent_cluster",
     "inconsistent_master_addrs",
     "bad_id",
     "kudu_orphan",
@@ -4363,6 +4349,7 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
     "default.control",
     "default.control_external",
     "default.different_owner",
+    "default.inconsistent_cluster",
     "default.inconsistent_master_addrs",
     "default.inconsistent_name_hms",
     "default.inconsistent_schema",
@@ -4427,10 +4414,12 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndManualFixHmsMetadata) {
   ASSERT_OK(CreateKuduTable(kudu_client, "default.duplicate_hms_tables"));
   ASSERT_OK(kudu_client->OpenTable("default.duplicate_hms_tables", 
&duplicate_hms_tables));
   ASSERT_OK(hms_catalog.CreateTable(
-        duplicate_hms_tables->id(), "default.duplicate_hms_tables", kUsername,
+        duplicate_hms_tables->id(), "default.duplicate_hms_tables",
+        kudu_client->cluster_id(), kUsername,
         KuduSchema::ToSchema(duplicate_hms_tables->schema())));
   ASSERT_OK(hms_catalog.CreateTable(
-        duplicate_hms_tables->id(), "default.duplicate_hms_tables_2", 
kUsername,
+        duplicate_hms_tables->id(), "default.duplicate_hms_tables_2",
+        kudu_client->cluster_id(), kUsername,
         KuduSchema::ToSchema(duplicate_hms_tables->schema())));
 
   // Test case: Kudu tables Hive-incompatible names.
@@ -4710,10 +4699,12 @@ TEST_F(ToolTest, TestHmsList) {
 
   string kUsername = "alice";
   ASSERT_OK(hms_catalog.CreateTable(
-      "1", "default.table1", kUsername, 
KuduSchema::ToSchema(simple_table->schema()),
-          hms::HmsClient::kManagedTable));
+      "1", "default.table1", "cluster-id", kUsername,
+      KuduSchema::ToSchema(simple_table->schema()),
+      hms::HmsClient::kManagedTable));
   ASSERT_OK(hms_catalog.CreateTable(
-      "2", "default.table2", boost::none, 
KuduSchema::ToSchema(simple_table->schema()),
+      "2", "default.table2", "cluster-id", boost::none,
+      KuduSchema::ToSchema(simple_table->schema()),
       hms::HmsClient::kExternalTable));
 
   // Test the output when HMS integration is disabled.
diff --git a/src/kudu/tools/tool_action_hms.cc 
b/src/kudu/tools/tool_action_hms.cc
index 883ceec..8f53c0c 100644
--- a/src/kudu/tools/tool_action_hms.cc
+++ b/src/kudu/tools/tool_action_hms.cc
@@ -188,7 +188,6 @@ bool IsSynced(const set<string>& master_addresses,
   DCHECK(!master_addresses.empty());
   auto schema = KuduSchema::ToSchema(kudu_table.schema());
   hive::Table hms_table_copy(hms_table);
-  // TODO(ghenke): Get the owner from Kudu?
   const string* hms_masters_field = FindOrNull(hms_table.parameters,
                                                HmsClient::kKuduMasterAddrsKey);
   if (!hms_masters_field ||
@@ -196,8 +195,8 @@ bool IsSynced(const set<string>& master_addresses,
     return false;
   }
   Status s = HmsCatalog::PopulateTable(kudu_table.id(), kudu_table.name(), 
kudu_table.owner(),
-                                       schema, *hms_masters_field, 
hms_table.tableType,
-                                       &hms_table_copy);
+                                       schema, 
kudu_table.client()->cluster_id(),
+                                       *hms_masters_field, 
hms_table.tableType, &hms_table_copy);
   return s.ok() && hms_table_copy == hms_table;
 }
 
@@ -227,6 +226,7 @@ Status PrintTables(const string& master_addrs,
   DataTable table({
       "Kudu table",
       "Kudu table ID",
+      "Kudu cluster ID",
       "Kudu owner",
       "Kudu master addresses",
       "HMS database",
@@ -235,21 +235,25 @@ Status PrintTables(const string& master_addrs,
       "HMS owner",
       Substitute("HMS $0", HmsClient::kKuduTableNameKey),
       Substitute("HMS $0", HmsClient::kKuduTableIdKey),
+      Substitute("HMS $0", HmsClient::kKuduClusterIdKey),
       Substitute("HMS $0", HmsClient::kKuduMasterAddrsKey),
       Substitute("HMS $0", HmsClient::kStorageHandlerKey),
   });
   for (auto& pair : tables) {
     vector<string> row;
     if (pair.first) {
+      // Note: If adding or removing fields, be sure to adjust `row.resize` 
below.
       const KuduTable& kudu_table = *pair.first;
       row.emplace_back(kudu_table.name());
       row.emplace_back(kudu_table.id());
+      row.emplace_back(kudu_table.client()->cluster_id());
       row.emplace_back(kudu_table.owner());
       row.emplace_back(master_addrs);
     } else {
-      row.resize(4);
+      row.resize(5);
     }
     if (pair.second) {
+      // Note: If adding or removing fields, be sure to adjust `row.resize` 
below.
       hive::Table& hms_table = *pair.second;
       row.emplace_back(hms_table.dbName);
       row.emplace_back(hms_table.tableName);
@@ -257,10 +261,11 @@ Status PrintTables(const string& master_addrs,
       row.emplace_back(hms_table.owner);
       row.emplace_back(hms_table.parameters[HmsClient::kKuduTableNameKey]);
       row.emplace_back(hms_table.parameters[HmsClient::kKuduTableIdKey]);
+      row.emplace_back(hms_table.parameters[HmsClient::kKuduClusterIdKey]);
       row.emplace_back(hms_table.parameters[HmsClient::kKuduMasterAddrsKey]);
       row.emplace_back(hms_table.parameters[HmsClient::kStorageHandlerKey]);
     } else {
-      row.resize(12);
+      row.resize(14);
     }
     table.AddRow(std::move(row));
   }
@@ -296,6 +301,10 @@ Status PrintHMSTables(vector<hive::Table> tables, ostream& 
out) {
       for (auto& hms_table : tables) {
         values.emplace_back(hms_table.parameters[HmsClient::kKuduTableIdKey]);
       }
+    } else if (boost::iequals(column, HmsClient::kKuduClusterIdKey)) {
+      for (auto& hms_table : tables) {
+        
values.emplace_back(hms_table.parameters[HmsClient::kKuduClusterIdKey]);
+      }
     } else if (boost::iequals(column, HmsClient::kKuduMasterAddrsKey)) {
       for (auto& hms_table : tables) {
         
values.emplace_back(hms_table.parameters[HmsClient::kKuduMasterAddrsKey]);
@@ -652,6 +661,7 @@ Status FixHmsMetadata(const RunnerContext& context) {
   if (FLAGS_create_missing_hms_tables) {
     for (const auto& kudu_table : report.missing_hms_tables) {
       const string& table_id = kudu_table->id();
+      const string& cluster_id = kudu_table->client()->cluster_id();
       const string& table_name = kudu_table->name();
       auto schema = KuduSchema::ToSchema(kudu_table->schema());
       string normalized_table_name(table_name.data(), table_name.size());
@@ -660,7 +670,8 @@ Status FixHmsMetadata(const RunnerContext& context) {
       if (FLAGS_dryrun) {
         LOG(INFO) << "[dryrun] Creating HMS table for Kudu table " << 
TableIdent(*kudu_table);
       } else {
-        Status s = hms_catalog->CreateTable(table_id, table_name, 
kudu_table->owner(), schema);
+        Status s = hms_catalog->CreateTable(table_id, table_name, cluster_id,
+            kudu_table->owner(), schema);
         if (s.IsAlreadyPresent()) {
           LOG(ERROR) << "Failed to create HMS table for Kudu table "
                      << TableIdent(*kudu_table)
@@ -714,8 +725,8 @@ Status FixHmsMetadata(const RunnerContext& context) {
                   << hms_table_name;
       } else {
         RETURN_NOT_OK_PREPEND(hms_catalog->UpgradeLegacyImpalaTable(
-                  kudu_table.id(), hms_table.dbName, hms_table.tableName,
-                  KuduSchema::ToSchema(kudu_table.schema())),
+                  kudu_table.id(), kudu_table.client()->cluster_id(), 
hms_table.dbName,
+                  hms_table.tableName, 
KuduSchema::ToSchema(kudu_table.schema())),
             Substitute("failed to upgrade legacy Impala HMS metadata for table 
$0",
               hms_table_name));
       }
@@ -803,7 +814,8 @@ Status FixHmsMetadata(const RunnerContext& context) {
         RETURN_NOT_OK_PREPEND(
             // Disable table ID checking to support fixing tables with 
unsynchronized IDs.
             hms_catalog->AlterTable(kudu_table.id(), hms_table_name, 
hms_table_name,
-                owner, schema, /* check_id */ false),
+                                    kudu_table.client()->cluster_id(), owner, 
schema,
+                                    /* check_id */ false),
             Substitute("failed to refresh HMS table metadata for Kudu table 
$0",
               TableIdent(kudu_table)));
       }
@@ -939,9 +951,10 @@ unique_ptr<Mode> BuildHmsMode() {
                                                   
HmsClient::kKuduTableNameKey),
                                 Substitute("Comma-separated list of HMS entry 
fields to "
                                            "include in output.\nPossible 
values: database, "
-                                           "table, type, owner, $0, $1, $2, 
$3",
+                                           "table, type, owner, $0, $1, $2, 
$3, $4",
                                            HmsClient::kKuduTableNameKey,
                                            HmsClient::kKuduTableIdKey,
+                                           HmsClient::kKuduClusterIdKey,
                                            HmsClient::kKuduMasterAddrsKey,
                                            HmsClient::kStorageHandlerKey))
           .AddOptionalParameter("format")

Reply via email to