This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit f0446b73630d75f6bf9c11b3fcce8953c557b578
Author: Attila Bukor <[email protected]>
AuthorDate: Tue Jun 30 16:36:54 2020 +0200

    KUDU-3090: Native owner metadata in Kudu
    
    Apache Sentry and Apache Ranger both support permissions granted to
    table owners, but as Sentry integrates with Apache Hive Metastore (HMS)
    and stores its metadata in it, Kudu didn't need to store table ownership
    to support granting permissions to owners.
    
    Apache Ranger on the other hand doesn't depend on HMS and needs Kudu to
    tell it if the owner is attempting to authorize an action, so to enable
    users to grant privileges to owners we need to support ownership
    natively.
    
    This patch adds the basic plumbing for table ownership, synchronizing
    ownership metadata with HMS both using the notification log listener and
    via tooling, and setting the owner on CREATE TABLE and ALTER TABLE
    requests in the C++ client.
    
    The maximum owner length is 128 characters by default which aligns with
    HMS/Apache Impala maximum owner lengths, but it's configurable with the
    max_owner_length flag.
    
    Supporting this in the Java and Python clients, authorizing these
    requests, and support for ownership in authorization will come in
    follow-up patches.
    
    Credit goes to Grant Henke <[email protected]> for the initial
    version of this patch.
    
    Design doc: https://s.apache.org/kudu-ownership-design
    
    Change-Id: I67f5bfdf56d409960365fd5803913a2d3800831d
    Reviewed-on: http://gerrit.cloudera.org:8080/15841
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <[email protected]>
    Reviewed-by: Grant Henke <[email protected]>
---
 src/kudu/client/client-internal.cc               |   8 +-
 src/kudu/client/client-internal.h                |   2 +
 src/kudu/client/client-test.cc                   |  31 +++++++
 src/kudu/client/client.cc                        |  21 ++++-
 src/kudu/client/client.h                         |  20 +++++
 src/kudu/client/client.proto                     |   2 +
 src/kudu/client/scan_token-internal.cc           |   5 +-
 src/kudu/client/table-internal.cc                |   2 +
 src/kudu/client/table-internal.h                 |   9 +-
 src/kudu/client/table_alterer-internal.cc        |   8 +-
 src/kudu/client/table_alterer-internal.h         |   1 +
 src/kudu/client/table_creator-internal.h         |   2 +
 src/kudu/hms/hms_catalog-test.cc                 |  11 +--
 src/kudu/hms/hms_catalog.cc                      |   5 +-
 src/kudu/hms/hms_catalog.h                       |   3 +-
 src/kudu/integration-tests/hms_itest-base.cc     |  10 +++
 src/kudu/integration-tests/hms_itest-base.h      |   5 ++
 src/kudu/integration-tests/master_hms-itest.cc   |  29 +++++++
 src/kudu/integration-tests/registration-test.cc  |   1 +
 src/kudu/master/catalog_manager.cc               | 101 +++++++++++++++++------
 src/kudu/master/catalog_manager.h                |  12 ++-
 src/kudu/master/hms_notification_log_listener.cc |  23 +++++-
 src/kudu/master/master-test.cc                   |  51 ++++++++----
 src/kudu/master/master.proto                     |  12 ++-
 src/kudu/tools/kudu-tool-test.cc                 |  81 +++++++++++++-----
 src/kudu/tools/tool_action_hms.cc                |  46 +++++++++--
 26 files changed, 410 insertions(+), 91 deletions(-)

diff --git a/src/kudu/client/client-internal.cc 
b/src/kudu/client/client-internal.cc
index a0f9550..524d50d 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -499,6 +499,7 @@ Status KuduClient::Data::OpenTable(KuduClient* client,
   string table_id;
   string table_name;
   int num_replicas;
+  string owner;
   PartitionSchema partition_schema;
   map<string, string> extra_configs;
   MonoTime deadline = MonoTime::Now() + default_admin_operation_timeout_;
@@ -510,6 +511,7 @@ Status KuduClient::Data::OpenTable(KuduClient* client,
                                &table_id,
                                &table_name,
                                &num_replicas,
+                               &owner,
                                &extra_configs));
 
   // When the table name is specified, use the caller-provided table name.
@@ -523,7 +525,7 @@ Status KuduClient::Data::OpenTable(KuduClient* client,
   //                   map to reuse KuduTable instances.
   table->reset(new KuduTable(client->shared_from_this(),
                              effective_table_name, table_id, num_replicas,
-                             schema, partition_schema, extra_configs));
+                             owner, schema, partition_schema, extra_configs));
 
   // When opening a table, clear the existing cached non-covered range entries.
   // This avoids surprises where a new table instance won't be able to see the
@@ -541,6 +543,7 @@ Status KuduClient::Data::GetTableSchema(KuduClient* client,
                                         std::string* table_id,
                                         std::string* table_name,
                                         int* num_replicas,
+                                        std::string* owner,
                                         map<string, string>* extra_configs) {
   GetTableSchemaRequestPB req;
   GetTableSchemaResponsePB resp;
@@ -581,6 +584,9 @@ Status KuduClient::Data::GetTableSchema(KuduClient* client,
   if (num_replicas) {
     *num_replicas = resp.num_replicas();
   }
+  if (owner) {
+    *owner = resp.owner();
+  }
   if (extra_configs) {
     map<string, string> result(resp.extra_configs().begin(), 
resp.extra_configs().end());
     *extra_configs = std::move(result);
diff --git a/src/kudu/client/client-internal.h 
b/src/kudu/client/client-internal.h
index bf5b3ce..4f2a85e 100644
--- a/src/kudu/client/client-internal.h
+++ b/src/kudu/client/client-internal.h
@@ -144,6 +144,7 @@ class KuduClient::Data {
   //   'table_id'         The table unique id.
   //   'table_name'       The table unique name.
   //   'num_replicas'     The table replication factor.
+  //   'owner'            The owner of the table.
   //   'extra_configs'    The table's extra configuration properties.
   Status GetTableSchema(KuduClient* client,
                         const MonoTime& deadline,
@@ -153,6 +154,7 @@ class KuduClient::Data {
                         std::string* table_id,
                         std::string* table_name,
                         int* num_replicas,
+                        std::string* owner,
                         std::map<std::string, std::string>* extra_configs);
 
   Status InitLocalHostNames();
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 0637bf9..9d1dd86 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -5372,6 +5372,37 @@ TEST_F(ClientTest, 
TestCreateAndAlterTableWithInvalidComment) {
   }
 }
 
+TEST_F(ClientTest, TestAlterTableChangeOwner) {
+  const string kTableName = "table_owner";
+  const string kOriginalOwner = "alice";
+  const string kNewOwner = "bob";
+
+  // Create table
+  KuduSchema schema;
+  KuduSchemaBuilder schema_builder;
+  
schema_builder.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+  schema_builder.AddColumn("val")->Type(KuduColumnSchema::INT32);
+  ASSERT_OK(schema_builder.Build(&schema));
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  ASSERT_OK(table_creator->table_name(kTableName)
+      .schema(&schema).num_replicas(1).set_range_partition_columns({ "key" })
+      .set_owner(kOriginalOwner).Create());
+  {
+    shared_ptr<KuduTable> table;
+    client_->OpenTable(kTableName, &table);
+    ASSERT_EQ(kOriginalOwner, table->owner());
+  }
+
+  unique_ptr<KuduTableAlterer> 
table_alterer(client_->NewTableAlterer(kTableName));
+  table_alterer->SetOwner(kNewOwner);
+  ASSERT_OK(table_alterer->Alter());
+  {
+    shared_ptr<KuduTable> table;
+    client_->OpenTable(kTableName, &table);
+    ASSERT_EQ(kNewOwner, table->owner());
+  }
+}
+
 enum IntEncoding {
   kPlain,
   kBitShuffle,
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index e8ee412..2612022 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -450,6 +450,7 @@ Status KuduClient::GetTableSchema(const string& table_name,
                                nullptr, // table id
                                nullptr, // table name
                                nullptr, // number of replicas
+                               nullptr, // owner
                                nullptr); // extra configs
 }
 
@@ -737,6 +738,11 @@ KuduTableCreator& 
KuduTableCreator::add_range_partition_split(KuduPartialRow* sp
   return *this;
 }
 
+KuduTableCreator& KuduTableCreator::set_owner(const string& owner) {
+  data_->owner_ = owner;
+  return *this;
+}
+
 KuduTableCreator& KuduTableCreator::split_rows(const vector<const 
KuduPartialRow*>& rows) {
   for (const KuduPartialRow* row : rows) {
     
data_->range_partition_splits_.emplace_back(const_cast<KuduPartialRow*>(row));
@@ -810,6 +816,9 @@ Status KuduTableCreator::Create() {
     req.mutable_extra_configs()->insert(data_->extra_configs_->begin(),
                                         data_->extra_configs_->end());
   }
+  if (data_->owner_ != boost::none) {
+    req.set_owner(data_->owner_.get());
+  }
   RETURN_NOT_OK_PREPEND(SchemaToPB(*data_->schema_->schema_, 
req.mutable_schema(),
                                    SCHEMA_PB_WITHOUT_WRITE_DEFAULT),
                         "Invalid schema");
@@ -901,10 +910,11 @@ KuduTable::KuduTable(const shared_ptr<KuduClient>& client,
                      const string& name,
                      const string& id,
                      int num_replicas,
+                     const string& owner,
                      const KuduSchema& schema,
                      const PartitionSchema& partition_schema,
                      const map<string, string>& extra_configs)
-  : data_(new KuduTable::Data(client, name, id, num_replicas,
+  : data_(new KuduTable::Data(client, name, id, num_replicas, owner,
                               schema, partition_schema, extra_configs)) {
 }
 
@@ -928,6 +938,10 @@ int KuduTable::num_replicas() const {
   return data_->num_replicas_;
 }
 
+const string& KuduTable::owner() const {
+  return data_->owner_;
+}
+
 KuduInsert* KuduTable::NewInsert() {
   return new KuduInsert(shared_from_this());
 }
@@ -1239,6 +1253,11 @@ KuduTableAlterer* KuduTableAlterer::RenameTo(const 
string& new_name) {
   return this;
 }
 
+KuduTableAlterer* KuduTableAlterer::SetOwner(const string& new_owner) {
+  data_->set_owner_to_ = new_owner;
+  return this;
+}
+
 KuduColumnSpec* KuduTableAlterer::AddColumn(const string& name) {
   Data::Step s = { AlterTableRequestPB::ADD_COLUMN,
                    new KuduColumnSpec(name), nullptr, nullptr };
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 6b866bc..29b7e6f 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -874,6 +874,15 @@ class KUDU_EXPORT KuduTableCreator {
   /// @return Reference to the modified table creator.
   KuduTableCreator& add_range_partition_split(KuduPartialRow* split_row);
 
+  /// Set the table owner.
+  ///
+  /// If unspecified, the owner will be the user creating the table.
+  ///
+  /// @param [in] owner
+  ///   The username of the table owner.
+  /// @return Reference to the modified table creator.
+  KuduTableCreator& set_owner(const std::string& owner);
+
   /// @deprecated Use @c add_range_partition_split() instead.
   ///
   /// @param [in] split_rows
@@ -1039,6 +1048,9 @@ class KUDU_EXPORT KuduTable : public 
sp::enable_shared_from_this<KuduTable> {
   /// @return Replication factor of the table.
   int num_replicas() const;
 
+  /// @return Owner of the table (empty string if unset).
+  const std::string& owner() const;
+
   /// @return New @c INSERT operation for this table. It is the caller's
   ///   responsibility to free the result, unless it is passed to
   ///   KuduSession::Apply().
@@ -1255,6 +1267,7 @@ class KUDU_EXPORT KuduTable : public 
sp::enable_shared_from_this<KuduTable> {
             const std::string& name,
             const std::string& id,
             int num_replicas,
+            const std::string& owner,
             const KuduSchema& schema,
             const PartitionSchema& partition_schema,
             const std::map<std::string, std::string>& extra_configs);
@@ -1287,6 +1300,13 @@ class KUDU_EXPORT KuduTableAlterer {
   /// @return Raw pointer to this alterer object.
   KuduTableAlterer* RenameTo(const std::string& new_name);
 
+  /// Set the owner of the table.
+  ///
+  /// @param [in] new_owner
+  ///   The new owner for the table.
+  /// @return Raw pointer to this alterer object.
+  KuduTableAlterer* SetOwner(const std::string& new_owner);
+
   /// Add a new column to the table.
   ///
   /// When adding a column, you must specify the default value of the new
diff --git a/src/kudu/client/client.proto b/src/kudu/client/client.proto
index bad0fe3..be4c106 100644
--- a/src/kudu/client/client.proto
+++ b/src/kudu/client/client.proto
@@ -39,6 +39,8 @@ message TableMetadataPB {
 
   // The table's extra configuration properties.
   map<string, string> extra_configs = 6;
+
+  optional string owner = 7;
 }
 
 // Metdata about a single server.
diff --git a/src/kudu/client/scan_token-internal.cc 
b/src/kudu/client/scan_token-internal.cc
index c8d80c1..132e772 100644
--- a/src/kudu/client/scan_token-internal.cc
+++ b/src/kudu/client/scan_token-internal.cc
@@ -135,8 +135,8 @@ Status KuduScanToken::Data::PBIntoScanner(KuduClient* 
client,
     map<string, string> extra_configs(metadata.extra_configs().begin(),
         metadata.extra_configs().end());
     table.reset(new KuduTable(client->shared_from_this(), 
metadata.table_name(),
-        metadata.table_id(), metadata.num_replicas(), kudu_schema, 
partition_schema,
-        extra_configs));
+        metadata.table_id(), metadata.num_replicas(), metadata.owner(), 
kudu_schema,
+        partition_schema, extra_configs));
   } else {
     TableIdentifierPB table_identifier;
     if (message.has_table_id()) {
@@ -336,6 +336,7 @@ Status 
KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) {
     table_pb.set_table_id(table->id());
     table_pb.set_table_name(table->name());
     table_pb.set_num_replicas(table->num_replicas());
+    table_pb.set_owner(table->owner());
     SchemaPB schema_pb;
     RETURN_NOT_OK(SchemaToPB(KuduSchema::ToSchema(table->schema()), 
&schema_pb));
     *table_pb.mutable_schema() = std::move(schema_pb);
diff --git a/src/kudu/client/table-internal.cc 
b/src/kudu/client/table-internal.cc
index 7114ef8..f3e8ad2 100644
--- a/src/kudu/client/table-internal.cc
+++ b/src/kudu/client/table-internal.cc
@@ -33,6 +33,7 @@ KuduTable::Data::Data(shared_ptr<KuduClient> client,
                       string name,
                       string id,
                       int num_replicas,
+                      string owner,
                       const KuduSchema& schema,
                       PartitionSchema partition_schema,
                       map<string, string> extra_configs)
@@ -40,6 +41,7 @@ KuduTable::Data::Data(shared_ptr<KuduClient> client,
       name_(std::move(name)),
       id_(std::move(id)),
       num_replicas_(num_replicas),
+      owner_(std::move(owner)),
       schema_(schema),
       partition_schema_(std::move(partition_schema)),
       extra_configs_(std::move(extra_configs)) {
diff --git a/src/kudu/client/table-internal.h b/src/kudu/client/table-internal.h
index 8e332d8..5c894d6 100644
--- a/src/kudu/client/table-internal.h
+++ b/src/kudu/client/table-internal.h
@@ -43,6 +43,7 @@ class KuduTable::Data {
        std::string name,
        std::string id,
        int num_replicas,
+       std::string owner,
        const KuduSchema& schema,
        PartitionSchema partition_schema,
        std::map<std::string, std::string> extra_configs);
@@ -69,10 +70,12 @@ class KuduTable::Data {
   const std::string name_;
   const std::string id_;
   const int num_replicas_;
+  const std::string owner_;
 
-  // TODO: figure out how we deal with a schema change from the client 
perspective.
-  // Do we make them call a RefreshSchema() method? Or maybe reopen the table 
and get
-  // a new KuduTable instance (which would simplify the object lifecycle a 
little?)
+  // TODO(unknown): figure out how we deal with a schema change from the client
+  // perspective. Do we make them call a RefreshSchema() method? Or maybe
+  // reopen the table and get a new KuduTable instance (which would simplify 
the
+  // object lifecycle a little?)
   const KuduSchema schema_;
   const PartitionSchema partition_schema_;
 
diff --git a/src/kudu/client/table_alterer-internal.cc 
b/src/kudu/client/table_alterer-internal.cc
index 846150a..4f4db75 100644
--- a/src/kudu/client/table_alterer-internal.cc
+++ b/src/kudu/client/table_alterer-internal.cc
@@ -57,7 +57,10 @@ Status 
KuduTableAlterer::Data::ToRequest(AlterTableRequestPB* req) {
     return status_;
   }
 
-  if (!rename_to_ && !new_extra_configs_ && steps_.empty()) {
+  if (!rename_to_.is_initialized() &&
+      !new_extra_configs_ &&
+      !set_owner_to_.is_initialized() &&
+      steps_.empty()) {
     return Status::InvalidArgument("No alter steps provided");
   }
 
@@ -71,6 +74,9 @@ Status KuduTableAlterer::Data::ToRequest(AlterTableRequestPB* 
req) {
     req->mutable_new_extra_configs()->insert(new_extra_configs_->begin(),
                                              new_extra_configs_->end());
   }
+  if (set_owner_to_.is_initialized()) {
+    req->set_new_table_owner(set_owner_to_.get());
+  }
 
   if (schema_ != nullptr) {
     RETURN_NOT_OK(SchemaToPB(*schema_, req->mutable_schema(),
diff --git a/src/kudu/client/table_alterer-internal.h 
b/src/kudu/client/table_alterer-internal.h
index 2cfa1de..4945a75 100644
--- a/src/kudu/client/table_alterer-internal.h
+++ b/src/kudu/client/table_alterer-internal.h
@@ -74,6 +74,7 @@ class KuduTableAlterer::Data {
   bool wait_;
 
   boost::optional<std::string> rename_to_;
+  boost::optional<std::string> set_owner_to_;
 
   boost::optional<std::map<std::string, std::string>> new_extra_configs_;
 
diff --git a/src/kudu/client/table_creator-internal.h 
b/src/kudu/client/table_creator-internal.h
index 5caee98..a8c6cb9 100644
--- a/src/kudu/client/table_creator-internal.h
+++ b/src/kudu/client/table_creator-internal.h
@@ -60,6 +60,8 @@ class KuduTableCreator::Data {
 
   PartitionSchemaPB partition_schema_;
 
+  boost::optional<std::string> owner_;
+
   boost::optional<int> num_replicas_;
 
   boost::optional<std::string> dimension_label_;
diff --git a/src/kudu/hms/hms_catalog-test.cc b/src/kudu/hms/hms_catalog-test.cc
index f7a5f6e..ec9ae62 100644
--- a/src/kudu/hms/hms_catalog-test.cc
+++ b/src/kudu/hms/hms_catalog-test.cc
@@ -311,7 +311,8 @@ TEST_P(HmsCatalogTestParameterized, TestTableLifecycle) {
   SchemaBuilder b(schema);
   b.AddColumn("new_column", DataType::INT32);
   Schema altered_schema = b.Build();
-  ASSERT_OK(hms_catalog_->AlterTable(kTableId, kTableName, kAlteredTableName, 
altered_schema));
+  ASSERT_OK(hms_catalog_->AlterTable(kTableId, kTableName, kAlteredTableName,
+                                     kOwner, altered_schema));
   NO_FATALS(CheckTableDoesNotExist(kHmsDatabase, kHmsTableName));
   NO_FATALS(CheckTable(kHmsDatabase, kHmsAlteredTableName, kTableId, kOwner, 
altered_schema));
 
@@ -353,14 +354,14 @@ TEST_F(HmsCatalogTest, TestExternalTable) {
   NO_FATALS(CheckExternalTable());
 
   // Try and rename the Kudu table to the external table name.
-  s = hms_catalog_->AlterTable(kTableId, "default.a", "default.ext", schema);
+  s = hms_catalog_->AlterTable(kTableId, "default.a", "default.ext", 
boost::none, schema);
   EXPECT_TRUE(s.IsIllegalState()) << s.ToString();
   NO_FATALS(CheckExternalTable());
   NO_FATALS(CheckTable("default", "a", kTableId, boost::none, schema));
 
   // Try and rename the external table. This shouldn't succeed because the 
Table
   // ID doesn't match.
-  s = hms_catalog_->AlterTable(kTableId, "default.ext", "default.b", schema);
+  s = hms_catalog_->AlterTable(kTableId, "default.ext", "default.b", 
boost::none, schema);
   EXPECT_TRUE(s.IsNotFound()) << s.ToString();
   NO_FATALS(CheckExternalTable());
   NO_FATALS(CheckTable("default", "a", kTableId, boost::none, schema));
@@ -436,7 +437,7 @@ TEST_F(HmsCatalogTest, TestReconnect) {
   Status s = hms_catalog_->CreateTable(kTableId, "default.b", kOwner, schema);
   EXPECT_TRUE(s.IsNetworkError()) << s.ToString();
 
-  s = hms_catalog_->AlterTable(kTableId, "default.a", "default.c", schema);
+  s = hms_catalog_->AlterTable(kTableId, "default.a", "default.c", kOwner, 
schema);
   EXPECT_TRUE(s.IsNetworkError()) << s.ToString();
 
   // Start the HMS back up and ensure that the same operations succeed.
@@ -449,7 +450,7 @@ TEST_F(HmsCatalogTest, TestReconnect) {
   NO_FATALS(CheckTable(kHmsDatabase, "a", kTableId, kOwner, schema));
   NO_FATALS(CheckTable(kHmsDatabase, "d", kTableId, kOwner, schema));
 
-  EXPECT_OK(hms_catalog_->AlterTable(kTableId, "default.a", "default.c", 
schema));
+  EXPECT_OK(hms_catalog_->AlterTable(kTableId, "default.a", "default.c", 
kOwner, schema));
   NO_FATALS(CheckTable(kHmsDatabase, "c", kTableId, kOwner, schema));
   NO_FATALS(CheckTableDoesNotExist(kHmsDatabase, "a"));
 }
diff --git a/src/kudu/hms/hms_catalog.cc b/src/kudu/hms/hms_catalog.cc
index 69805a7..96a2c12 100644
--- a/src/kudu/hms/hms_catalog.cc
+++ b/src/kudu/hms/hms_catalog.cc
@@ -145,7 +145,7 @@ void HmsCatalog::Stop() {
 
 Status HmsCatalog::CreateTable(const string& id,
                                const string& name,
-                               optional<const string&> owner,
+                               const optional<const string&>& owner,
                                const Schema& schema,
                                const string& table_type) {
   hive::Table table;
@@ -254,6 +254,7 @@ Status HmsCatalog::GetKuduTables(vector<hive::Table>* 
kudu_tables) {
 Status HmsCatalog::AlterTable(const string& id,
                               const string& name,
                               const string& new_name,
+                              optional<const string&> owner,
                               const Schema& schema,
                               const bool& check_id) {
   Slice hms_database;
@@ -288,7 +289,7 @@ Status HmsCatalog::AlterTable(const string& id,
       }
 
       // Overwrite fields in the table that have changed, including the new 
name.
-      RETURN_NOT_OK(PopulateTable(id, new_name, table.owner, schema, 
master_addresses_,
+      RETURN_NOT_OK(PopulateTable(id, new_name, owner, schema, 
master_addresses_,
           table.tableType, &table));
       return client->AlterTable(hms_database.ToString(), hms_table.ToString(),
                                 table, EnvironmentContext(check_id));
diff --git a/src/kudu/hms/hms_catalog.h b/src/kudu/hms/hms_catalog.h
index 95c44c1..e87ea9d 100644
--- a/src/kudu/hms/hms_catalog.h
+++ b/src/kudu/hms/hms_catalog.h
@@ -72,7 +72,7 @@ class HmsCatalog {
   // Fails the HMS is unreachable, or a table with the same name is already 
present.
   Status CreateTable(const std::string& id,
                      const std::string& name,
-                     boost::optional<const std::string&> owner,
+                     const boost::optional<const std::string&>& owner,
                      const Schema& schema,
                      const std::string& table_type = 
hms::HmsClient::kManagedTable)
                      WARN_UNUSED_RESULT;
@@ -102,6 +102,7 @@ class HmsCatalog {
   Status AlterTable(const std::string& id,
                     const std::string& name,
                     const std::string& new_name,
+                    boost::optional<const std::string&> owner,
                     const Schema& schema,
                     const bool& check_id = true) WARN_UNUSED_RESULT;
 
diff --git a/src/kudu/integration-tests/hms_itest-base.cc 
b/src/kudu/integration-tests/hms_itest-base.cc
index 8957fc0..f4be1db 100644
--- a/src/kudu/integration-tests/hms_itest-base.cc
+++ b/src/kudu/integration-tests/hms_itest-base.cc
@@ -151,6 +151,15 @@ Status HmsITestHarness::RenameHmsTable(const string& 
database_name,
   return hms_client_->AlterTable(database_name, old_table_name, table);
 }
 
+Status HmsITestHarness::ChangeHmsOwner(const string& database_name,
+                                       const string& table_name,
+                                       const string& new_table_owner) {
+  hive::Table table;
+  RETURN_NOT_OK(hms_client_->GetTable(database_name, table_name, &table));
+  table.owner = new_table_owner;
+  return hms_client_->AlterTable(database_name, table_name, table);
+}
+
 Status HmsITestHarness::AlterHmsTableDropColumns(const string& database_name,
                                                  const string& table_name) {
     hive::Table table;
@@ -203,6 +212,7 @@ void HmsITestHarness::CheckTable(const string& 
database_name,
     ASSERT_OK(GetLoggedInUser(&username));
   }
   ASSERT_EQ(hms_table.owner, username);
+  ASSERT_EQ(table->owner(), username);
 
   const auto& schema = table->schema();
   ASSERT_EQ(schema.num_columns(), hms_table.sd.cols.size());
diff --git a/src/kudu/integration-tests/hms_itest-base.h 
b/src/kudu/integration-tests/hms_itest-base.h
index 91d8dbe..784f6c8 100644
--- a/src/kudu/integration-tests/hms_itest-base.h
+++ b/src/kudu/integration-tests/hms_itest-base.h
@@ -76,6 +76,11 @@ class HmsITestHarness {
   Status AlterHmsTableExternalPurge(const std::string& database_name,
                                     const std::string& table_name) 
WARN_UNUSED_RESULT;
 
+  // Changes the table owner in the HMS catalog.
+  Status ChangeHmsOwner(const std::string& database_name,
+                        const std::string& table_name,
+                        const std::string& new_table_owner) WARN_UNUSED_RESULT;
+
   // Checks that the Kudu table schema and the HMS table entry in their
   // respective catalogs are synchronized for a particular table. It also
   // verifies that the table owner is the given user (if not provided,
diff --git a/src/kudu/integration-tests/master_hms-itest.cc 
b/src/kudu/integration-tests/master_hms-itest.cc
index 5bca58a..8768c7b 100644
--- a/src/kudu/integration-tests/master_hms-itest.cc
+++ b/src/kudu/integration-tests/master_hms-itest.cc
@@ -47,7 +47,9 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+using boost::make_optional;
 using boost::none;
+using boost::optional;
 using kudu::client::KuduTable;
 using kudu::client::KuduTableAlterer;
 using kudu::client::sp::shared_ptr;
@@ -119,6 +121,12 @@ class MasterHmsTest : public ExternalMiniClusterITestBase {
     return harness_.RenameHmsTable(database_name, old_table_name, 
new_table_name);
   }
 
+  Status ChangeHmsOwner(const std::string& database_name,
+                        const std::string& table_name,
+                        const std::string& new_table_owner) {
+    return harness_.ChangeHmsOwner(database_name, table_name, new_table_owner);
+  }
+
   Status AlterHmsTableDropColumns(const std::string& database_name,
                                   const std::string& table_name) {
     return harness_.AlterHmsTableDropColumns(database_name, table_name);
@@ -292,6 +300,27 @@ TEST_F(MasterHmsTest, TestRenameTable) {
   NO_FATALS(CheckTableDoesNotExist("db1", "t2"));
 }
 
+TEST_F(MasterHmsTest, TestAlterTableOwner) {
+  // Create the Kudu table.
+  ASSERT_OK(CreateKuduTable("default", "userTable"));
+  NO_FATALS(CheckTable("default", "userTable", /*user=*/ none));
+
+  // Change the owner through the HMS, and ensure the owner is handled in Kudu.
+  const char* const user_a = "user_a";
+  ASSERT_OK(ChangeHmsOwner("default", "userTable", user_a));
+  ASSERT_EVENTUALLY([&] {
+    NO_FATALS(CheckTable("default", "userTable", make_optional<const 
string&>(user_a)));
+  });
+
+  // Change the owner through Kudu, and ensure the owner is reflected in HMS.
+  const char* const user_b = "user_b";
+  unique_ptr<KuduTableAlterer> 
table_alterer(client_->NewTableAlterer("default.userTable"));
+  ASSERT_OK(table_alterer->SetOwner(user_b)->Alter());
+  ASSERT_EVENTUALLY([&] {
+    NO_FATALS(CheckTable("default", "userTable", make_optional<const 
string&>(user_b)));
+  });
+}
+
 TEST_F(MasterHmsTest, TestAlterTable) {
   // Create the Kudu table.
   ASSERT_OK(CreateKuduTable("default", "a"));
diff --git a/src/kudu/integration-tests/registration-test.cc 
b/src/kudu/integration-tests/registration-test.cc
index ad47366..bf29446 100644
--- a/src/kudu/integration-tests/registration-test.cc
+++ b/src/kudu/integration-tests/registration-test.cc
@@ -98,6 +98,7 @@ void CreateTableForTesting(MiniMaster* mini_master,
 
     req.set_name(table_name);
     req.set_num_replicas(1);
+    req.set_owner("jdoe");
     ASSERT_OK(SchemaToPB(schema, req.mutable_schema()));
     CatalogManager* catalog = mini_master->master()->catalog_manager();
     CatalogManager::ScopedLeaderSharedLock l(catalog);
diff --git a/src/kudu/master/catalog_manager.cc 
b/src/kudu/master/catalog_manager.cc
index 02f2af4..fd684ef 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -60,6 +60,7 @@
 #include <vector>
 
 #include <boost/optional/optional.hpp>
+#include <boost/optional/optional_io.hpp> // IWYU pragma: keep
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <google/protobuf/stubs/common.h>
@@ -183,7 +184,14 @@ DEFINE_int32(max_identifier_length, 256,
              "Maximum length of the name of a column or table.");
 
 DEFINE_int32(max_column_comment_length, 256,
-             "Maximum length of the comment of a column");
+             "Maximum length of the comment of a column.");
+
+DEFINE_int32(max_owner_length, 128,
+             "Maximum length of the name of a table owner.");
+
+DEFINE_bool(allow_empty_owner, false,
+            "Allow empty owner. Only for testing.");
+TAG_FLAG(allow_empty_owner, hidden);
 
 // Tag as unsafe because we end up writing schemas in every WAL entry, etc,
 // and having very long column names would enter untested territory and affect
@@ -1431,12 +1439,24 @@ Status ValidateCommentIdentifier(const string& id) {
   return ValidateLengthAndUTF8(id, FLAGS_max_column_comment_length);
 }
 
+Status ValidateOwner(const string& name) {
+  if (name.empty() && !FLAGS_allow_empty_owner) {
+    return Status::InvalidArgument("empty string is not a valid owner");
+  }
+
+  return ValidateLengthAndUTF8(name, FLAGS_max_owner_length);
+}
+
 // Validate the client-provided schema and name.
 Status ValidateClientSchema(const optional<string>& name,
+                            const optional<string>& owner,
                             const Schema& schema) {
   if (name) {
     RETURN_NOT_OK_PREPEND(ValidateIdentifier(name.get()), "invalid table 
name");
   }
+  if (owner) {
+    RETURN_NOT_OK_PREPEND(ValidateOwner(*owner), "invalid owner name");
+  }
   for (int i = 0; i < schema.num_columns(); i++) {
     RETURN_NOT_OK_PREPEND(ValidateIdentifier(schema.column(i).name()),
                           "invalid column name");
@@ -1493,6 +1513,13 @@ Status CatalogManager::CreateTable(const 
CreateTableRequestPB* orig_req,
   LOG(INFO) << Substitute("Servicing CreateTable request from $0:\n$1",
                           RequestorString(rpc), SecureDebugString(req));
 
+  optional<const string&> user = rpc ?
+      make_optional<const string&>(rpc->remote_user().username()) : none;
+  // Default the owner if it isn't set.
+  if (user && !req.has_owner()) {
+    req.set_owner(*user);
+  }
+
   // Do some fix-up of any defaults specified on columns.
   // Clients are only expected to pass the default value in the 'read_default'
   // field, but we need to write the schema to disk including the default
@@ -1506,10 +1533,9 @@ Status CatalogManager::CreateTable(const 
CreateTableRequestPB* orig_req,
   // a. Validate the user request.
   const string& normalized_table_name = NormalizeTableName(req.name());
   if (rpc) {
-    const string& user = rpc->remote_user().username();
-    const string& owner = req.has_owner() ? req.owner() : user;
+    DCHECK_NE(boost::none, user);
     RETURN_NOT_OK(SetupError(
-        authz_provider_->AuthorizeCreateTable(normalized_table_name, user, 
owner),
+        authz_provider_->AuthorizeCreateTable(normalized_table_name, 
user.get(), req.owner()),
         resp, MasterErrorPB::NOT_AUTHORIZED));
   }
 
@@ -1522,7 +1548,7 @@ Status CatalogManager::CreateTable(const 
CreateTableRequestPB* orig_req,
   Schema client_schema;
   RETURN_NOT_OK(SchemaFromPB(req.schema(), &client_schema));
 
-  RETURN_NOT_OK(SetupError(ValidateClientSchema(normalized_table_name, 
client_schema),
+  RETURN_NOT_OK(SetupError(ValidateClientSchema(normalized_table_name, 
req.owner(), client_schema),
                            resp, MasterErrorPB::INVALID_SCHEMA));
   if (client_schema.has_column_ids()) {
     return SetupError(Status::InvalidArgument("user requests should not have 
Column IDs"),
@@ -1735,8 +1761,7 @@ Status CatalogManager::CreateTable(const 
CreateTableRequestPB* orig_req,
   // since this step validates that the table name is available in the HMS 
catalog.
   if (hms_catalog_) {
     CHECK(rpc);
-    const string& owner = req.has_owner() ? req.owner() : 
rpc->remote_user().username();
-    Status s = hms_catalog_->CreateTable(table->id(), normalized_table_name, 
owner, schema);
+    Status s = hms_catalog_->CreateTable(table->id(), normalized_table_name, 
req.owner(), schema);
     if (!s.ok()) {
       s = s.CloneAndPrepend(Substitute("an error occurred while creating table 
$0 in the HMS",
                                        normalized_table_name));
@@ -1855,6 +1880,9 @@ scoped_refptr<TableInfo> CatalogManager::CreateTableInfo(
   metadata->set_create_timestamp(time(nullptr));
   (*metadata->mutable_extra_config()) = std::move(extra_config_pb);
   table->RegisterMetrics(master_->metric_registry(), metadata->name());
+  if (req.has_owner()) {
+    metadata->set_owner(req.owner());
+  }
   return table;
 }
 
@@ -2462,7 +2490,7 @@ Status CatalogManager::AlterTableRpc(const 
AlterTableRequestPB& req,
     // Rename the table in the HMS.
     RETURN_NOT_OK(SetupError(hms_catalog_->AlterTable(
             table->id(), l.data().name(), normalized_new_table_name,
-            schema),
+            l.data().owner(), schema),
         resp, MasterErrorPB::HIVE_METASTORE_ERROR));
 
     // Unlock the table, and wait for the notification log listener to handle
@@ -2493,15 +2521,21 @@ Status CatalogManager::AlterTableRpc(const 
AlterTableRequestPB& req,
   return AlterTable(req, resp, /*hms_notification_log_event_id=*/ none, user);
 }
 
-Status CatalogManager::RenameTableHms(const string& table_id,
-                                      const string& table_name,
-                                      const string& new_table_name,
-                                      int64_t notification_log_event_id) {
+Status CatalogManager::AlterTableHms(const string& table_id,
+                                     const string& table_name,
+                                     const optional<string>& new_table_name,
+                                     const optional<string>& new_table_owner,
+                                     int64_t notification_log_event_id) {
   AlterTableRequestPB req;
   AlterTableResponsePB resp;
   req.mutable_table()->set_table_id(table_id);
   req.mutable_table()->set_table_name(table_name);
-  req.set_new_table_name(new_table_name);
+  if (new_table_name) {
+    req.set_new_table_name(new_table_name.get());
+  }
+  if (new_table_owner) {
+    req.set_new_table_owner(new_table_owner.get());
+  }
 
   // Use empty user to skip the authorization validation since the operation
   // originates from catalog manager. Moreover, this avoids duplicate effort,
@@ -2583,9 +2617,9 @@ Status CatalogManager::AlterTable(const 
AlterTableRequestPB& req,
   DCHECK_EQ(new_schema.find_column_by_id(next_col_id),
             static_cast<int>(Schema::kColumnNotFound));
 
-  // Just validate the schema, not the name (validated below).
+  // Just validate the schema, not the name and owner (validated below).
   RETURN_NOT_OK(SetupError(
-        ValidateClientSchema(none, new_schema),
+        ValidateClientSchema(none, none, new_schema),
         resp, MasterErrorPB::INVALID_SCHEMA));
 
   // 4. Validate and try to acquire the new table name.
@@ -2638,7 +2672,17 @@ Status CatalogManager::AlterTable(const 
AlterTableRequestPB& req,
     }
   });
 
-  // 5. Alter table partitioning.
+  // 5. Alter the table owner.
+  // TODO(abukor): Only the owner (or superuser) can alter the owner?
+  if (req.has_new_table_owner()) {
+    RETURN_NOT_OK(SetupError(
+          ValidateOwner(req.new_table_owner()).CloneAndAppend("invalid owner 
name"),
+          resp, MasterErrorPB::INVALID_SCHEMA));
+
+    l.mutable_data()->pb.set_owner(req.new_table_owner());
+  }
+
+  // 6. Alter table partitioning.
   vector<scoped_refptr<TabletInfo>> tablets_to_add;
   vector<scoped_refptr<TabletInfo>> tablets_to_drop;
   if (!alter_partitioning_steps.empty()) {
@@ -2652,7 +2696,7 @@ Status CatalogManager::AlterTable(const 
AlterTableRequestPB& req,
           resp, MasterErrorPB::UNKNOWN_ERROR));
   }
 
-  // 6. Alter table's extra configuration properties.
+  // 7. Alter table's extra configuration properties.
   if (!req.new_extra_configs().empty()) {
     TRACE("Apply alter extra-config");
     Map<string, string> new_extra_configs;
@@ -2668,9 +2712,11 @@ Status CatalogManager::AlterTable(const 
AlterTableRequestPB& req,
 
   // Set to true if columns are altered, added or dropped.
   bool has_schema_changes = !alter_schema_steps.empty();
-  // Set to true if there are schema changes, or the table is renamed.
-  bool has_metadata_changes =
-      has_schema_changes || req.has_new_table_name() || 
!req.new_extra_configs().empty();
+  // Set to true if there are schema changes, the table is renamed, the owner 
changed,
+  // or extra configuration has changed.
+  bool has_metadata_changes = has_schema_changes ||
+      req.has_new_table_name() || req.has_new_table_owner() ||
+      !req.new_extra_configs().empty();
   // Set to true if there are partitioning changes.
   bool has_partitioning_changes = !alter_partitioning_steps.empty();
   // Set to true if metadata changes need to be applied to existing tablets.
@@ -2682,7 +2728,7 @@ Status CatalogManager::AlterTable(const 
AlterTableRequestPB& req,
     return Status::OK();
   }
 
-  // 7. Serialize the schema and increment the version number.
+  // 8. Serialize the schema and increment the version number.
   if (has_metadata_changes_for_existing_tablets && 
!l.data().pb.has_fully_applied_schema()) {
     
l.mutable_data()->pb.mutable_fully_applied_schema()->CopyFrom(l.data().pb.schema());
   }
@@ -2707,7 +2753,7 @@ Status CatalogManager::AlterTable(const 
AlterTableRequestPB& req,
   TabletMetadataGroupLock tablets_to_add_lock(LockMode::WRITE);
   TabletMetadataGroupLock tablets_to_drop_lock(LockMode::RELEASED);
 
-  // 8. Update sys-catalog with the new table schema and tablets to add/drop.
+  // 9. Update sys-catalog with the new table schema and tablets to add/drop.
   TRACE("Updating metadata on disk");
   {
     SysCatalogTable::Actions actions;
@@ -2737,7 +2783,7 @@ Status CatalogManager::AlterTable(const 
AlterTableRequestPB& req,
     }
   }
 
-  // 9. Commit the in-memory state.
+  // 10. Commit the in-memory state.
   TRACE("Committing alterations to in-memory state");
   {
     // Commit new tablet in-memory state. This doesn't require taking the 
global
@@ -2793,17 +2839,18 @@ Status CatalogManager::AlterTable(const 
AlterTableRequestPB& req,
   // the tablet again.
   tablets_to_drop_lock.Commit();
 
-  // If there are schema changes, then update the entry in the Hive Metastore.
+  // If there are schema changes or the owner changed, then update the entry 
in the Hive Metastore.
   // This is done on a best-effort basis, since Kudu is the source of truth for
   // table schema information, and the table has already been altered in the
   // Kudu catalog via the successful sys-table write above.
-  if (hms_catalog_ && has_schema_changes) {
+  if (hms_catalog_ && (has_schema_changes || req.has_new_table_owner())) {
     // Sanity check: if there are schema changes then this is necessarily not a
     // table rename, since we split out the rename portion into its own
     // 'transaction' which is serialized through the HMS.
     DCHECK(!req.has_new_table_name());
     WARN_NOT_OK(hms_catalog_->AlterTable(
-          table->id(), normalized_table_name, normalized_table_name, 
new_schema),
+          table->id(), normalized_table_name, normalized_table_name,
+          l.mutable_data()->owner(), new_schema),
         Substitute("failed to alter HiveMetastore schema for table $0, "
                    "HMS schema information will be stale", table->ToString()));
   }
@@ -2891,6 +2938,8 @@ Status CatalogManager::GetTableSchema(const 
GetTableSchemaRequestPB* req,
   resp->set_table_id(table->id());
   resp->mutable_partition_schema()->CopyFrom(l.data().pb.partition_schema());
   resp->set_table_name(l.data().pb.name());
+  resp->set_owner(l.data().pb.owner());
+
   RETURN_NOT_OK(ExtraConfigPBToPBMap(l.data().pb.extra_config(), 
resp->mutable_extra_configs()));
 
   return Status::OK();
diff --git a/src/kudu/master/catalog_manager.h 
b/src/kudu/master/catalog_manager.h
index b71ea1a..6eca87f 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -244,6 +244,11 @@ struct PersistentTableInfo {
     return pb.name();
   }
 
+  // Return the table's owner.
+  const std::string& owner() const {
+    return pb.owner();
+  }
+
   // Helper to set the state of the tablet with a custom message.
   void set_state(SysTablesEntryPB::State state, const std::string& msg);
 
@@ -618,11 +623,12 @@ class CatalogManager : public 
tserver::TabletReplicaLookupIf {
                        AlterTableResponsePB* resp,
                        rpc::RpcContext* rpc);
 
-  // Rename the specified table in response to an 'ALTER TABLE RENAME' HMS
+  // Alter the specified table in response to an 'ALTER TABLE' HMS
   // notification log listener event.
-  Status RenameTableHms(const std::string& table_id,
+  Status AlterTableHms(const std::string& table_id,
                         const std::string& table_name,
-                        const std::string& new_table_name,
+                        const boost::optional<std::string>& new_table_name,
+                        const boost::optional<std::string>& new_table_owner,
                         int64_t notification_log_event_id) WARN_UNUSED_RESULT;
 
   // Get the information about an in-progress alter operation. If 'user' is
diff --git a/src/kudu/master/hms_notification_log_listener.cc 
b/src/kudu/master/hms_notification_log_listener.cc
index 8acaade..cbd14bf 100644
--- a/src/kudu/master/hms_notification_log_listener.cc
+++ b/src/kudu/master/hms_notification_log_listener.cc
@@ -26,6 +26,7 @@
 #include <utility>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <rapidjson/document.h>
@@ -64,6 +65,7 @@ 
TAG_FLAG(hive_metastore_notification_log_poll_inject_latency_ms, hidden);
 TAG_FLAG(hive_metastore_notification_log_poll_inject_latency_ms, unsafe);
 TAG_FLAG(hive_metastore_notification_log_poll_inject_latency_ms, runtime);
 
+using boost::optional;
 using rapidjson::Document;
 using rapidjson::Value;
 using std::string;
@@ -374,14 +376,27 @@ Status 
HmsNotificationLogListenerTask::HandleAlterTableEvent(const hive::Notific
   string before_table_name = Substitute("$0.$1", before_table.dbName, 
before_table.tableName);
   string after_table_name = Substitute("$0.$1", event.dbName, event.tableName);
 
-  if (before_table_name == after_table_name) {
-    VLOG(2) << "Ignoring non-rename alter table event on table "
+  optional<string> new_table_name;
+  if (before_table_name != after_table_name) {
+    new_table_name = after_table_name;
+  }
+
+  optional<string> new_table_owner;
+  if (before_table.owner != after_table.owner) {
+    new_table_owner = after_table.owner;
+  }
+
+  if (!new_table_name && !new_table_owner) {
+    VLOG(2) << "Ignoring alter table event on table "
             << *table_id << " " << before_table_name;
     return Status::OK();
   }
 
-  RETURN_NOT_OK(catalog_manager_->RenameTableHms(*table_id, before_table_name,
-                                                 after_table_name, 
event.eventId));
+  RETURN_NOT_OK(catalog_manager_->AlterTableHms(*table_id,
+                                                before_table_name,
+                                                new_table_name,
+                                                new_table_owner,
+                                                event.eventId));
   *durable_event_id = event.eventId;
   return Status::OK();
 }
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index 875975b..cc590a7 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -93,6 +93,8 @@
 #include "kudu/util/test_util.h"
 #include "kudu/util/version_info.h"
 
+using boost::none;
+using boost::optional;
 using kudu::consensus::ReplicaManagementInfoPB;
 using kudu::pb_util::SecureDebugString;
 using kudu::pb_util::SecureShortDebugString;
@@ -165,7 +167,8 @@ class MasterTest : public KuduTest {
   Status CreateTable(const string& table_name,
                      const Schema& schema,
                      const vector<KuduPartialRow>& split_rows,
-                     const vector<pair<KuduPartialRow, KuduPartialRow>>& 
bounds);
+                     const vector<pair<KuduPartialRow, KuduPartialRow>>& 
bounds,
+                     const optional<string>& owner);
 
   shared_ptr<Messenger> client_messenger_;
   unique_ptr<MiniMaster> mini_master_;
@@ -529,13 +532,14 @@ Status MasterTest::CreateTable(const string& table_name,
   KuduPartialRow split2(&schema);
   RETURN_NOT_OK(split2.SetInt32("key", 20));
 
-  return CreateTable(table_name, schema, { split1, split2 }, {});
+  return CreateTable(table_name, schema, { split1, split2 }, {}, boost::none);
 }
 
 Status MasterTest::CreateTable(const string& table_name,
                                const Schema& schema,
                                const vector<KuduPartialRow>& split_rows,
-                               const vector<pair<KuduPartialRow, 
KuduPartialRow>>& bounds) {
+                               const vector<pair<KuduPartialRow, 
KuduPartialRow>>& bounds,
+                               const optional<string>& owner) {
 
   CreateTableRequestPB req;
   CreateTableResponsePB resp;
@@ -552,6 +556,9 @@ Status MasterTest::CreateTable(const string& table_name,
     encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, bound.second);
   }
 
+  if (owner) {
+    req.set_owner(*owner);
+  }
   if (!bounds.empty()) {
     controller.RequireServerFeature(MasterFeatures::RANGE_PARTITION_BOUNDS);
   }
@@ -669,7 +676,7 @@ TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) {
     ASSERT_OK(split1.SetInt32("key", 1));
     KuduPartialRow split2(&kTableSchema);
     ASSERT_OK(split2.SetInt32("key", 2));
-    Status s = CreateTable(kTableName, kTableSchema, { split1, split1, split2 
}, {});
+    Status s = CreateTable(kTableName, kTableSchema, { split1, split1, split2 
}, {}, none);
     ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(), "duplicate split row");
   }
@@ -679,7 +686,7 @@ TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) {
     KuduPartialRow split1(&kTableSchema);
     ASSERT_OK(split1.SetInt32("key", 1));
     KuduPartialRow split2(&kTableSchema);
-    Status s = CreateTable(kTableName, kTableSchema, { split1, split2 }, {});
+    Status s = CreateTable(kTableName, kTableSchema, { split1, split2 }, {}, 
none);
     ASSERT_TRUE(s.IsInvalidArgument());
     ASSERT_STR_CONTAINS(s.ToString(),
                         "Invalid argument: split rows must contain a value for 
at "
@@ -691,7 +698,7 @@ TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) {
     KuduPartialRow split(&kTableSchema);
     ASSERT_OK(split.SetInt32("key", 1));
     ASSERT_OK(split.SetInt32("val", 1));
-    Status s = CreateTable(kTableName, kTableSchema, { split }, {});
+    Status s = CreateTable(kTableName, kTableSchema, { split }, {}, none);
     ASSERT_TRUE(s.IsInvalidArgument());
     ASSERT_STR_CONTAINS(s.ToString(),
                         "Invalid argument: split rows may only contain values "
@@ -708,7 +715,8 @@ TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) {
     ASSERT_OK(b_lower.SetInt32("key", 50));
     ASSERT_OK(b_upper.SetInt32("key", 150));
     Status s = CreateTable(kTableName, kTableSchema, { }, { { a_lower, a_upper 
},
-                                                            { b_lower, b_upper 
} });
+                                                            { b_lower, b_upper 
} },
+                                                            none);
     ASSERT_TRUE(s.IsInvalidArgument());
     ASSERT_STR_CONTAINS(s.ToString(), "Invalid argument: overlapping range 
partition");
   }
@@ -722,7 +730,8 @@ TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) {
     ASSERT_OK(split.SetInt32("key", 200));
 
     Status s = CreateTable(kTableName, kTableSchema, { split },
-                           { { bound_lower, bound_upper } });
+                           { { bound_lower, bound_upper } },
+                           none);
     ASSERT_TRUE(s.IsInvalidArgument());
     ASSERT_STR_CONTAINS(s.ToString(), "Invalid argument: split out of bounds");
   }
@@ -736,7 +745,8 @@ TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) {
     ASSERT_OK(split.SetInt32("key", -120));
 
     Status s = CreateTable(kTableName, kTableSchema, { split },
-                           { { bound_lower, bound_upper } });
+                           { { bound_lower, bound_upper } },
+                           none);
     ASSERT_TRUE(s.IsInvalidArgument());
     ASSERT_STR_CONTAINS(s.ToString(), "Invalid argument: split out of bounds");
   }
@@ -746,7 +756,7 @@ TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) {
     ASSERT_OK(bound_lower.SetInt32("key", 150));
     ASSERT_OK(bound_upper.SetInt32("key", 0));
 
-    Status s = CreateTable(kTableName, kTableSchema, { }, { { bound_lower, 
bound_upper } });
+    Status s = CreateTable(kTableName, kTableSchema, { }, { { bound_lower, 
bound_upper } }, none);
     ASSERT_TRUE(s.IsInvalidArgument());
     ASSERT_STR_CONTAINS(s.ToString(),
                         "Invalid argument: range partition lower bound must be 
"
@@ -758,7 +768,7 @@ TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) {
     ASSERT_OK(bound_lower.SetInt32("key", 0));
     ASSERT_OK(bound_upper.SetInt32("key", 0));
 
-    Status s = CreateTable(kTableName, kTableSchema, { }, { { bound_lower, 
bound_upper } });
+    Status s = CreateTable(kTableName, kTableSchema, { }, { { bound_lower, 
bound_upper } }, none);
     ASSERT_TRUE(s.IsInvalidArgument());
     ASSERT_STR_CONTAINS(s.ToString(),
                         "Invalid argument: range partition lower bound must be 
"
@@ -773,7 +783,8 @@ TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) {
     KuduPartialRow split(&kTableSchema);
     ASSERT_OK(split.SetInt32("key", 0));
 
-    Status s = CreateTable(kTableName, kTableSchema, { split }, { { 
bound_lower, bound_upper } });
+    Status s = CreateTable(kTableName, kTableSchema, { split }, { { 
bound_lower, bound_upper } },
+                           none);
     ASSERT_TRUE(s.IsInvalidArgument());
     ASSERT_STR_CONTAINS(s.ToString(), "Invalid argument: split matches lower 
or upper bound");
   }
@@ -786,7 +797,8 @@ TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) {
     KuduPartialRow split(&kTableSchema);
     ASSERT_OK(split.SetInt32("key", 10));
 
-    Status s = CreateTable(kTableName, kTableSchema, { split }, { { 
bound_lower, bound_upper } });
+    Status s = CreateTable(kTableName, kTableSchema, { split }, { { 
bound_lower, bound_upper } },
+                           none);
     ASSERT_TRUE(s.IsInvalidArgument());
     ASSERT_STR_CONTAINS(s.ToString(), "Invalid argument: split matches lower 
or upper bound");
   }
@@ -798,13 +810,24 @@ TEST_F(MasterTest, TestCreateTableInvalidKeyType) {
   const DataType types[] = { BOOL, FLOAT, DOUBLE };
   for (DataType type : types) {
     const Schema kTableSchema({ ColumnSchema("key", type) }, 1);
-    Status s = CreateTable(kTableName, kTableSchema, vector<KuduPartialRow>(), 
{});
+    Status s = CreateTable(kTableName, kTableSchema, vector<KuduPartialRow>(), 
{}, none);
     ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(),
         "key column may not have type of BOOL, FLOAT, or DOUBLE");
   }
 }
 
+TEST_F(MasterTest, TestCreateTableOwnerNameTooLong) {
+  const char *kTableName = "testb";
+  const string kOwnerName = 
"abcdefghijklmnopqrstuvwxyz01234567899abcdefghijklmnopqrstuvw"
+    
"xyz01234567899abcdefghijklmnopqrstuvwxyz01234567899abcdefghijklmnopqrstuvwxyz01234567899";
+
+  const Schema kTableSchema({ ColumnSchema("key", INT32), ColumnSchema("val", 
INT32) }, 1);
+  Status s = CreateTable(kTableName, kTableSchema, { }, { }, kOwnerName);
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "invalid owner name");
+}
+
 // Regression test for KUDU-253/KUDU-592: crash if the schema passed to 
CreateTable
 // is invalid.
 TEST_F(MasterTest, TestCreateTableInvalidSchema) {
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index a4a2b32..906941f 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -195,6 +195,9 @@ message SysTablesEntryPB {
   optional int64 alter_timestamp = 11;
   // The table's extra configuration properties.
   optional TableExtraConfigPB extra_config = 12;
+
+  // The user that owns the table.
+  optional string owner = 13;
 }
 
 // The on-disk entry in the sys.catalog table ("metadata" column) to represent
@@ -487,8 +490,8 @@ message CreateTableRequestPB {
   optional PartitionSchemaPB partition_schema = 7;
   optional int32 num_replicas = 4;
 
-  // If set, uses the provided value as the table owner when creating the table
-  // in external catalogs such as the Hive Metastore.
+  // If set, uses the provided value as the table owner when creating the 
table.
+  // Otherwise, the owner is defaulted to the user that created the table.
   optional string owner = 8;
 
   // The table's extra configuration properties.
@@ -676,6 +679,8 @@ message AlterTableRequestPB {
   optional bool modify_external_catalogs = 5 [default = true];
 
   map<string, string> new_extra_configs = 6;
+
+  optional string new_table_owner = 7;
 }
 
 message AlterTableResponsePB {
@@ -739,6 +744,9 @@ message GetTableSchemaResponsePB {
 
   // The table's extra configuration properties.
   map<string, string> extra_configs = 9;
+
+  // The user that owns the table.
+  optional string owner = 10;
 }
 
 message ConnectToMasterRequestPB {
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 9dcd278..427f1a7 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -3805,7 +3805,8 @@ Status CreateHmsTable(HmsClient* client,
 }
 
 Status CreateKuduTable(const shared_ptr<KuduClient>& kudu_client,
-                       const string& table_name) {
+                       const string& table_name,
+                       const optional<string>& owner = {}) {
   KuduSchemaBuilder schema_builder;
   schema_builder.AddColumn("foo")
       ->Type(client::KuduColumnSchema::INT32)
@@ -3814,11 +3815,14 @@ Status CreateKuduTable(const shared_ptr<KuduClient>& 
kudu_client,
   KuduSchema schema;
   RETURN_NOT_OK(schema_builder.Build(&schema));
   unique_ptr<client::KuduTableCreator> 
table_creator(kudu_client->NewTableCreator());
-  return table_creator->table_name(table_name)
+  table_creator->table_name(table_name)
               .schema(&schema)
               .num_replicas(1)
-              .set_range_partition_columns({ "foo" })
-              .Create();
+              .set_range_partition_columns({ "foo" });
+  if (owner) {
+    table_creator->set_owner(*owner);
+  }
+  return table_creator->Create();
 }
 
 bool IsValidTableName(const string& table_name) {
@@ -3897,9 +3901,11 @@ TEST_P(ToolTestKerberosParameterized, TestHmsDowngrade) {
 // Kerberos is enabled in order to test the tools work in secure clusters.
 TEST_P(ToolTestKerberosParameterized, TestCheckAndAutomaticFixHmsMetadata) {
   string kUsername = "alice";
+  string kOtherUsername = "bob";
   ExternalMiniClusterOptions opts;
   opts.hms_mode = HmsMode::DISABLE_HIVE_METASTORE;
   opts.enable_kerberos = EnableKerberos();
+  opts.extra_master_flags.emplace_back("--allow_empty_owner=true");
   NO_FATALS(StartExternalMiniCluster(std::move(opts)));
 
   string master_addr = cluster_->master()->bound_rpc_addr().ToString();
@@ -3928,7 +3934,7 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
 
   // Control case: the check tool should not flag this managed table.
   shared_ptr<KuduTable> control;
-  ASSERT_OK(CreateKuduTable(kudu_client, "default.control"));
+  ASSERT_OK(CreateKuduTable(kudu_client, "default.control", kUsername));
   ASSERT_OK(kudu_client->OpenTable("default.control", &control));
   ASSERT_OK(hms_catalog.CreateTable(
         control->id(), control->name(), kUsername,
@@ -3936,7 +3942,7 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
 
   // Control case: the check tool should not flag this external synchronized 
table.
   shared_ptr<KuduTable> control_external;
-  ASSERT_OK(CreateKuduTable(kudu_client, "default.control_external"));
+  ASSERT_OK(CreateKuduTable(kudu_client, "default.control_external", 
kUsername));
   ASSERT_OK(kudu_client->OpenTable("default.control_external", 
&control_external));
   ASSERT_OK(hms_catalog.CreateTable(
       control_external->id(), control_external->name(), kUsername,
@@ -3950,7 +3956,7 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
 
   // Test case: Upper-case names are handled specially in a few places.
   shared_ptr<KuduTable> test_uppercase;
-  ASSERT_OK(CreateKuduTable(kudu_client, "default.UPPERCASE"));
+  ASSERT_OK(CreateKuduTable(kudu_client, "default.UPPERCASE", kUsername));
   ASSERT_OK(kudu_client->OpenTable("default.UPPERCASE", &test_uppercase));
   ASSERT_OK(hms_catalog.CreateTable(
         test_uppercase->id(), test_uppercase->name(), kUsername,
@@ -3958,7 +3964,7 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
 
   // Test case: inconsistent schema.
   shared_ptr<KuduTable> inconsistent_schema;
-  ASSERT_OK(CreateKuduTable(kudu_client, "default.inconsistent_schema"));
+  ASSERT_OK(CreateKuduTable(kudu_client, "default.inconsistent_schema", 
kUsername));
   ASSERT_OK(kudu_client->OpenTable("default.inconsistent_schema", 
&inconsistent_schema));
   ASSERT_OK(hms_catalog.CreateTable(
         inconsistent_schema->id(), inconsistent_schema->name(), kUsername,
@@ -3966,7 +3972,7 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
 
   // Test case: inconsistent name.
   shared_ptr<KuduTable> inconsistent_name;
-  ASSERT_OK(CreateKuduTable(kudu_client, "default.inconsistent_name"));
+  ASSERT_OK(CreateKuduTable(kudu_client, "default.inconsistent_name", 
kUsername));
   ASSERT_OK(kudu_client->OpenTable("default.inconsistent_name", 
&inconsistent_name));
   ASSERT_OK(hms_catalog.CreateTable(
         inconsistent_name->id(), "default.inconsistent_name_hms", kUsername,
@@ -3974,7 +3980,7 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
 
   // Test case: inconsistent master addresses.
   shared_ptr<KuduTable> inconsistent_master_addrs;
-  ASSERT_OK(CreateKuduTable(kudu_client, "default.inconsistent_master_addrs"));
+  ASSERT_OK(CreateKuduTable(kudu_client, "default.inconsistent_master_addrs", 
kUsername));
   ASSERT_OK(kudu_client->OpenTable("default.inconsistent_master_addrs",
         &inconsistent_master_addrs));
   HmsCatalog invalid_hms_catalog(Substitute("$0,extra_masters",
@@ -3986,7 +3992,7 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
 
   // Test case: bad table id.
   shared_ptr<KuduTable> bad_id;
-  ASSERT_OK(CreateKuduTable(kudu_client, "default.bad_id"));
+  ASSERT_OK(CreateKuduTable(kudu_client, "default.bad_id", kUsername));
   ASSERT_OK(kudu_client->OpenTable("default.bad_id", &bad_id));
   ASSERT_OK(hms_catalog.CreateTable(
       "not_a_table_id", "default.bad_id", kUsername,
@@ -4013,18 +4019,18 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
         master_addr, HmsClient::kManagedTable, kUsername));
 
   // Test case: orphan table in Kudu.
-  ASSERT_OK(CreateKuduTable(kudu_client, "default.kudu_orphan"));
+  ASSERT_OK(CreateKuduTable(kudu_client, "default.kudu_orphan", 
static_cast<string>("")));
 
   // Test case: legacy managed table.
   shared_ptr<KuduTable> legacy_managed;
-  ASSERT_OK(CreateKuduTable(kudu_client, "impala::default.legacy_managed"));
+  ASSERT_OK(CreateKuduTable(kudu_client, "impala::default.legacy_managed", 
kUsername));
   ASSERT_OK(kudu_client->OpenTable("impala::default.legacy_managed", 
&legacy_managed));
   ASSERT_OK(CreateLegacyHmsTable(&hms_client, "default", "legacy_managed",
       "impala::default.legacy_managed", master_addr, HmsClient::kManagedTable, 
kUsername));
 
   // Test case: Legacy external purge table.
   shared_ptr<KuduTable> legacy_purge;
-  ASSERT_OK(CreateKuduTable(kudu_client, "impala::default.legacy_purge"));
+  ASSERT_OK(CreateKuduTable(kudu_client, "impala::default.legacy_purge", 
kUsername));
   ASSERT_OK(kudu_client->OpenTable("impala::default.legacy_purge", 
&legacy_purge));
   ASSERT_OK(CreateLegacyHmsTable(&hms_client, "default", "legacy_purge",
       "impala::default.legacy_purge", master_addr, HmsClient::kExternalTable, 
kUsername));
@@ -4040,14 +4046,15 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
 
   // Test case: legacy managed table with no owner.
   shared_ptr<KuduTable> legacy_no_owner;
-  ASSERT_OK(CreateKuduTable(kudu_client, "impala::default.legacy_no_owner"));
+  ASSERT_OK(CreateKuduTable(kudu_client, "impala::default.legacy_no_owner",
+                            static_cast<string>("")));
   ASSERT_OK(kudu_client->OpenTable("impala::default.legacy_no_owner", 
&legacy_no_owner));
   ASSERT_OK(CreateLegacyHmsTable(&hms_client, "default", "legacy_no_owner",
         "impala::default.legacy_no_owner", master_addr, 
HmsClient::kManagedTable, boost::none));
 
   // Test case: legacy managed table with a Hive-incompatible name (no 
database).
   shared_ptr<KuduTable> legacy_hive_incompatible_name;
-  ASSERT_OK(CreateKuduTable(kudu_client, "legacy_hive_incompatible_name"));
+  ASSERT_OK(CreateKuduTable(kudu_client, "legacy_hive_incompatible_name", 
kUsername));
   ASSERT_OK(kudu_client->OpenTable("legacy_hive_incompatible_name",
         &legacy_hive_incompatible_name));
   ASSERT_OK(CreateLegacyHmsTable(&hms_client, "default", 
"legacy_hive_incompatible_name",
@@ -4058,7 +4065,31 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
   hive::Database db;
   db.name = "my_db";
   ASSERT_OK(hms_client.CreateDatabase(db));
-  ASSERT_OK(CreateKuduTable(kudu_client, "my_db.table"));
+  ASSERT_OK(CreateKuduTable(kudu_client, "my_db.table", kUsername));
+
+  // Test case: no owner in HMS
+  shared_ptr<KuduTable> no_owner_in_hms;
+  ASSERT_OK(CreateKuduTable(kudu_client, "default.no_owner_in_hms", 
kUsername));
+  ASSERT_OK(kudu_client->OpenTable("default.no_owner_in_hms", 
&no_owner_in_hms));
+  ASSERT_OK(hms_catalog.CreateTable(
+      no_owner_in_hms->id(), no_owner_in_hms->name(), boost::none,
+      KuduSchema::ToSchema(no_owner_in_hms->schema())));
+
+  // Test case: no owner in Kudu
+  shared_ptr<KuduTable> no_owner_in_kudu;
+  ASSERT_OK(CreateKuduTable(kudu_client, "default.no_owner_in_kudu", 
static_cast<string>("")));
+  ASSERT_OK(kudu_client->OpenTable("default.no_owner_in_kudu", 
&no_owner_in_kudu));
+  ASSERT_OK(hms_catalog.CreateTable(
+      no_owner_in_kudu->id(), no_owner_in_kudu->name(), kUsername,
+      KuduSchema::ToSchema(no_owner_in_kudu->schema())));
+
+  // Test case: different owner in Kudu and HMS
+  shared_ptr<KuduTable> different_owner;
+  ASSERT_OK(CreateKuduTable(kudu_client, "default.different_owner", 
kUsername));
+  ASSERT_OK(kudu_client->OpenTable("default.different_owner", 
&different_owner));
+  ASSERT_OK(hms_catalog.CreateTable(
+      different_owner->id(), different_owner->name(), kOtherUsername,
+      KuduSchema::ToSchema(different_owner->schema())));
 
   unordered_set<string> consistent_tables = {
     "default.control",
@@ -4071,6 +4102,9 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
     "default.inconsistent_name",
     "default.inconsistent_master_addrs",
     "default.bad_id",
+    "default.different_owner",
+    "default.no_owner_in_hms",
+    "default.no_owner_in_kudu",
     "default.orphan_hms_table",
     "default.orphan_hms_table_external",
     "default.orphan_hms_table_legacy_managed",
@@ -4167,6 +4201,9 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
     "default.inconsistent_name",
     "default.inconsistent_master_addrs",
     "default.bad_id",
+    "default.different_owner",
+    "default.no_owner_in_hms",
+    "default.no_owner_in_kudu",
   });
   NO_FATALS(check());
 
@@ -4199,6 +4236,7 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
     "default.bad_id",
     "default.control",
     "default.control_external",
+    "default.different_owner",
     "default.inconsistent_master_addrs",
     "default.inconsistent_name_hms",
     "default.inconsistent_schema",
@@ -4207,15 +4245,20 @@ TEST_P(ToolTestKerberosParameterized, 
TestCheckAndAutomaticFixHmsMetadata) {
     "default.legacy_managed",
     "default.legacy_no_owner",
     "default.legacy_purge",
+    "default.no_owner_in_hms",
+    "default.no_owner_in_kudu",
     "default.uppercase",
     "my_db.table",
   }), kudu_tables);
 
-  // Check that table ownership is preserved in upgraded legacy tables.
-  for (auto p : vector<pair<string, string>>({
+  // Check that table ownership is preserved in upgraded legacy tables and
+  // properly synchronized between Kudu and HMS.
+  for (const auto& p : vector<pair<string, string>>({
         make_pair("legacy_managed", kUsername),
         make_pair("legacy_purge", kUsername),
         make_pair("legacy_no_owner", ""),
+        make_pair("no_owner_in_hms", kUsername),
+        make_pair("different_owner", kUsername),
   })) {
     hive::Table table;
     ASSERT_OK(hms_client.GetTable("default", p.first, &table));
diff --git a/src/kudu/tools/tool_action_hms.cc 
b/src/kudu/tools/tool_action_hms.cc
index 12172ad..b66a00d 100644
--- a/src/kudu/tools/tool_action_hms.cc
+++ b/src/kudu/tools/tool_action_hms.cc
@@ -108,6 +108,16 @@ Status RenameTableInKuduCatalog(KuduClient* kudu_client,
                 ->Alter();
 }
 
+// Only alter the table in Kudu but not in the Hive Metastore.
+Status ChangeOwnerInKuduCatalog(KuduClient* kudu_client,
+                                const string& name,
+                                const string& owner) {
+  unique_ptr<KuduTableAlterer> alterer(kudu_client->NewTableAlterer(name));
+  return alterer->SetOwner(owner)
+                ->modify_external_catalogs(false)
+                ->Alter();
+}
+
 Status Init(const RunnerContext& context,
             shared_ptr<KuduClient>* kudu_client,
             unique_ptr<HmsCatalog>* hms_catalog,
@@ -183,8 +193,9 @@ bool IsSynced(const set<string>& master_addresses,
       master_addresses != static_cast<set<string>>(Split(*hms_masters_field, 
","))) {
     return false;
   }
-  Status s = HmsCatalog::PopulateTable(kudu_table.id(), kudu_table.name(), 
hms_table.owner, schema,
-                                       *hms_masters_field, 
hms_table.tableType, &hms_table_copy);
+  Status s = HmsCatalog::PopulateTable(kudu_table.id(), kudu_table.name(), 
kudu_table.owner(),
+                                       schema, *hms_masters_field, 
hms_table.tableType,
+                                       &hms_table_copy);
   return s.ok() && hms_table_copy == hms_table;
 }
 
@@ -214,10 +225,12 @@ Status PrintTables(const string& master_addrs,
   DataTable table({
       "Kudu table",
       "Kudu table ID",
+      "Kudu owner",
       "Kudu master addresses",
       "HMS database",
       "HMS table",
       "HMS table type",
+      "HMS owner",
       Substitute("HMS $0", HmsClient::kKuduTableNameKey),
       Substitute("HMS $0", HmsClient::kKuduTableIdKey),
       Substitute("HMS $0", HmsClient::kKuduMasterAddrsKey),
@@ -229,21 +242,23 @@ Status PrintTables(const string& master_addrs,
       const KuduTable& kudu_table = *pair.first;
       row.emplace_back(kudu_table.name());
       row.emplace_back(kudu_table.id());
+      row.emplace_back(kudu_table.owner());
       row.emplace_back(master_addrs);
     } else {
-      row.resize(3);
+      row.resize(4);
     }
     if (pair.second) {
       hive::Table& hms_table = *pair.second;
       row.emplace_back(hms_table.dbName);
       row.emplace_back(hms_table.tableName);
       row.emplace_back(hms_table.tableType);
+      row.emplace_back(hms_table.owner);
       row.emplace_back(hms_table.parameters[HmsClient::kKuduTableNameKey]);
       row.emplace_back(hms_table.parameters[HmsClient::kKuduTableIdKey]);
       row.emplace_back(hms_table.parameters[HmsClient::kKuduMasterAddrsKey]);
       row.emplace_back(hms_table.parameters[HmsClient::kStorageHandlerKey]);
     } else {
-      row.resize(10);
+      row.resize(12);
     }
     table.AddRow(std::move(row));
   }
@@ -635,7 +650,7 @@ Status FixHmsMetadata(const RunnerContext& context) {
       if (FLAGS_dryrun) {
         LOG(INFO) << "[dryrun] Creating HMS table for Kudu table " << 
TableIdent(*kudu_table);
       } else {
-        Status s = hms_catalog->CreateTable(table_id, table_name, boost::none, 
schema);
+        Status s = hms_catalog->CreateTable(table_id, table_name, 
kudu_table->owner(), schema);
         if (s.IsAlreadyPresent()) {
           LOG(ERROR) << "Failed to create HMS table for Kudu table "
                      << TableIdent(*kudu_table)
@@ -730,6 +745,7 @@ Status FixHmsMetadata(const RunnerContext& context) {
       const KuduTable& kudu_table = *table_pair.first;
       const hive::Table& hms_table = table_pair.second;
       string hms_table_name = Substitute("$0.$1", hms_table.dbName, 
hms_table.tableName);
+      string owner = kudu_table.owner();
 
       if (hms_table_name != kudu_table.name()) {
         // Update the Kudu table name to match the HMS table name.
@@ -752,6 +768,22 @@ Status FixHmsMetadata(const RunnerContext& context) {
         }
       }
 
+      // If the HMS table has an owner and Kudu does not, update the Kudu 
table owner to match
+      // the HMS table owner. Otherwise the metadata step below will ensure 
the Kudu owner
+      // is updated in the HMS.
+      if (hms_table.owner != owner && owner.empty()) {
+        if (FLAGS_dryrun) {
+          LOG(INFO) << "[dryrun] Changing owner of " << TableIdent(kudu_table)
+                    << " to " << hms_table.owner << " in Kudu catalog.";
+        } else {
+          RETURN_NOT_OK_PREPEND(
+              ChangeOwnerInKuduCatalog(kudu_client.get(), kudu_table.name(), 
hms_table.owner),
+              Substitute("failed to change owner of $0 to $1 in Kudu catalog",
+                TableIdent(kudu_table), hms_table.owner));
+          owner = hms_table.owner;
+        }
+      }
+
       // Update the HMS table metadata to match Kudu.
       if (FLAGS_dryrun) {
         LOG(INFO) << "[dryrun] Refreshing HMS table metadata for Kudu table "
@@ -760,8 +792,8 @@ Status FixHmsMetadata(const RunnerContext& context) {
         auto schema = KuduSchema::ToSchema(kudu_table.schema());
         RETURN_NOT_OK_PREPEND(
             // Disable table ID checking to support fixing tables with 
unsynchronized IDs.
-            hms_catalog->AlterTable(kudu_table.id(), hms_table_name, 
hms_table_name, schema,
-                /* check_id */ false),
+            hms_catalog->AlterTable(kudu_table.id(), hms_table_name, 
hms_table_name,
+                owner, schema, /* check_id */ false),
             Substitute("failed to refresh HMS table metadata for Kudu table 
$0",
               TableIdent(kudu_table)));
       }

Reply via email to