This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e94aa7  KUDU-3090 Add ownership privileges
0e94aa7 is described below

commit 0e94aa78465c6e7e83dca1da9015afb5ac95cb09
Author: Attila Bukor <[email protected]>
AuthorDate: Fri Jul 10 16:05:46 2020 +0200

    KUDU-3090 Add ownership privileges
    
    Apache Ranger supports granting privileges to resource owners by using a
    special "{OWNER}" username. This patch integrates Kudu's ownership model
    with Ranger.
    
    Change-Id: Id9c36b7d84863403d7d538cafc709d2aebd0b109
    Reviewed-on: http://gerrit.cloudera.org:8080/16072
    Tested-by: Kudu Jenkins
    Reviewed-by: Grant Henke <[email protected]>
    Reviewed-by: Andrew Wong <[email protected]>
---
 .../ranger/authorization/RangerKuduAuthorizer.java |  25 +++-
 src/kudu/integration-tests/master_authz-itest.cc   | 134 ++++++++++++++++-----
 src/kudu/integration-tests/ts_authz-itest.cc       |  28 +++--
 src/kudu/master/authz_provider.h                   |  16 ++-
 src/kudu/master/catalog_manager.cc                 |  88 ++++++++------
 src/kudu/master/default_authz_provider.h           |  17 ++-
 src/kudu/master/ranger_authz_provider.cc           |  44 ++++---
 src/kudu/master/ranger_authz_provider.h            |  17 ++-
 src/kudu/ranger/ranger.proto                       |   3 +
 src/kudu/ranger/ranger_client-test.cc              |  73 +++++------
 src/kudu/ranger/ranger_client.cc                   |  28 +++--
 src/kudu/ranger/ranger_client.h                    |  13 +-
 12 files changed, 323 insertions(+), 163 deletions(-)

diff --git 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/authorization/RangerKuduAuthorizer.java
 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/authorization/RangerKuduAuthorizer.java
index bd8c8f8..14477ff 100644
--- 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/authorization/RangerKuduAuthorizer.java
+++ 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/authorization/RangerKuduAuthorizer.java
@@ -182,7 +182,9 @@ public class RangerKuduAuthorizer {
       String db = request.hasDatabase() ? request.getDatabase() : null;
       String table = request.hasTable() ? request.getTable() : null;
       String column = request.hasColumn() ? request.getColumn() : null;
-      boolean admin = request.hasRequiresDelegateAdmin() && 
request.getRequiresDelegateAdmin();
+      boolean requiresAdmin = request.hasRequiresDelegateAdmin() &&
+          request.getRequiresDelegateAdmin();
+      boolean isOwner = request.hasIsOwner() && request.getIsOwner();
       RangerAccessRequest rangerAccessRequest = createRequest(action, user, 
groups, db, table,
                                                               column);
       RangerAccessResult rangerAccessResult = 
plugin.isAccessAllowed(rangerAccessRequest);
@@ -190,7 +192,16 @@ public class RangerKuduAuthorizer {
         LOG.debug(String.format("RangerAccessRequest [%s] receives result 
[%s]",
             rangerAccessResult.getAccessRequest().toString(), 
rangerAccessResult.toString()));
       }
-      if (rangerAccessResult.getIsAllowed() && admin) {
+      if (!rangerAccessResult.getIsAllowed() && isOwner) {
+        rangerAccessRequest = createRequest(action, 
RangerPolicyEngine.RESOURCE_OWNER, groups,
+                                            db, table, column);
+        rangerAccessResult = plugin.isAccessAllowed(rangerAccessRequest);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("RangerAccessRequest [%s] receives result 
[%s]",
+              rangerAccessResult.getAccessRequest().toString(), 
rangerAccessResult.toString()));
+        }
+      }
+      if (rangerAccessResult.getIsAllowed() && requiresAdmin) {
         rangerAccessRequest = createRequest(RangerPolicyEngine.ADMIN_ACCESS, 
user, groups, db,
                                             table, column);
         rangerAccessResult = plugin.isAccessAllowed(rangerAccessRequest);
@@ -198,6 +209,16 @@ public class RangerKuduAuthorizer {
           LOG.debug(String.format("RangerAccessRequest [%s] receives result 
[%s]",
               rangerAccessResult.getAccessRequest().toString(), 
rangerAccessResult.toString()));
         }
+        if (!rangerAccessResult.getIsAllowed() && isOwner) {
+          rangerAccessRequest = createRequest(RangerPolicyEngine.ADMIN_ACCESS,
+                                              
RangerPolicyEngine.RESOURCE_OWNER,
+                                              groups, db, table, column);
+          rangerAccessResult = plugin.isAccessAllowed(rangerAccessRequest);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format("RangerAccessRequest [%s] receives result 
[%s]",
+                rangerAccessResult.getAccessRequest().toString(), 
rangerAccessResult.toString()));
+          }
+        }
       }
 
       Ranger.RangerResponsePB rangerResponsePB = 
Ranger.RangerResponsePB.newBuilder()
diff --git a/src/kudu/integration-tests/master_authz-itest.cc 
b/src/kudu/integration-tests/master_authz-itest.cc
index 3e8f645..9cbbd53 100644
--- a/src/kudu/integration-tests/master_authz-itest.cc
+++ b/src/kudu/integration-tests/master_authz-itest.cc
@@ -94,6 +94,7 @@ using strings::Substitute;
 namespace {
 const char* const kAdminUser = "test-admin";
 const char* const kTestUser = "test-user";
+const char* const kSecondUser = "alice";
 const char* const kImpalaUser = "impala";
 const char* const kDatabaseName = "db";
 const char* const kTableName = "table";
@@ -130,12 +131,15 @@ string HarnessEnumToString(HarnessEnum h) {
 struct PrivilegeParams {
   // NOLINTNEXTLINE(google-explicit-constructor)
   PrivilegeParams(string db_name,
-                  string table_name = "")
+                  string table_name = "",
+                  string user_name = kTestUser)
       : db_name(std::move(db_name)),
-        table_name(std::move(table_name)) {
+        table_name(std::move(table_name)),
+        user_name(std::move(user_name)) {
   }
   const string db_name;
   const string table_name;
+  const string user_name;
 };
 
 class MasterAuthzITestHarness {
@@ -277,6 +281,7 @@ class MasterAuthzITestHarness {
         .schema(&schema)
         .num_replicas(1)
         .set_range_partition_columns({ "key" })
+        .set_owner(kTestUser)
         .Create();
   }
 
@@ -312,8 +317,7 @@ class MasterAuthzITestHarness {
     // Add 'impala' as trusted user who may access the cluster without being
     // authorized.
     opts.extra_master_flags.emplace_back("--trusted_user_acl=impala");
-    opts.extra_master_flags.emplace_back("--user_acl=test-user,impala");
-    opts.extra_master_flags.emplace_back("--ranger_log_level=debug");
+    opts.extra_master_flags.emplace_back("--user_acl=test-user,impala,alice");
     SetUpExternalMiniServiceOpts(&opts);
     return opts;
   }
@@ -375,7 +379,7 @@ class RangerITestHarness : public MasterAuthzITestHarness {
     policy.databases.emplace_back(p.db_name);
     // IsCreateTableDone() requires METADATA on the table level.
     policy.tables.emplace_back("*");
-    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::CREATE}, 
false));
+    policy.items.emplace_back(PolicyItem({p.user_name}, {ActionPB::CREATE}, 
false));
     RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
     return RefreshAuthzPolicies(cluster);
   }
@@ -385,7 +389,7 @@ class RangerITestHarness : public MasterAuthzITestHarness {
     AuthorizationPolicy policy;
     policy.databases.emplace_back(p.db_name);
     policy.tables.emplace_back(p.table_name);
-    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::DROP}, 
false));
+    policy.items.emplace_back(PolicyItem({p.user_name}, {ActionPB::DROP}, 
false));
     RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
     return RefreshAuthzPolicies(cluster);
   }
@@ -402,7 +406,7 @@ class RangerITestHarness : public MasterAuthzITestHarness {
     AuthorizationPolicy policy;
     policy.databases.emplace_back(p.db_name);
     policy.tables.emplace_back(p.table_name);
-    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::ALTER}, 
false));
+    policy.items.emplace_back(PolicyItem({p.user_name}, {ActionPB::ALTER}, 
false));
     RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
     return RefreshAuthzPolicies(cluster);
   }
@@ -413,13 +417,13 @@ class RangerITestHarness : public MasterAuthzITestHarness 
{
     policy_new_table.databases.emplace_back(p.db_name);
     // IsCreateTableDone() requires METADATA on the table level.
     policy_new_table.tables.emplace_back("*");
-    policy_new_table.items.emplace_back(PolicyItem({kTestUser}, 
{ActionPB::CREATE}, false));
+    policy_new_table.items.emplace_back(PolicyItem({p.user_name}, 
{ActionPB::CREATE}, false));
     RETURN_NOT_OK(ranger_->AddPolicy(move(policy_new_table)));
 
     AuthorizationPolicy policy;
     policy.databases.emplace_back(p.db_name);
     policy.tables.emplace_back(p.table_name);
-    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::ALL}, false));
+    policy.items.emplace_back(PolicyItem({p.user_name}, {ActionPB::ALL}, 
false));
     RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
     return RefreshAuthzPolicies(cluster);
   }
@@ -429,7 +433,7 @@ class RangerITestHarness : public MasterAuthzITestHarness {
         override {
     AuthorizationPolicy policy;
     policy.databases.emplace_back(p.db_name);
-    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::METADATA}, 
false));
+    policy.items.emplace_back(PolicyItem({p.user_name}, {ActionPB::METADATA}, 
false));
     RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
     return RefreshAuthzPolicies(cluster);
   }
@@ -439,7 +443,7 @@ class RangerITestHarness : public MasterAuthzITestHarness {
     AuthorizationPolicy policy;
     policy.databases.emplace_back(p.db_name);
     policy.tables.emplace_back(p.table_name);
-    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::METADATA}, 
false));
+    policy.items.emplace_back(PolicyItem({p.user_name}, {ActionPB::METADATA}, 
false));
     RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
     return RefreshAuthzPolicies(cluster);
   }
@@ -449,7 +453,7 @@ class RangerITestHarness : public MasterAuthzITestHarness {
     AuthorizationPolicy policy;
     policy.databases.emplace_back(p.db_name);
     policy.tables.emplace_back(p.table_name);
-    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::ALL}, false));
+    policy.items.emplace_back(PolicyItem({p.user_name}, {ActionPB::ALL}, 
false));
     RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
     return RefreshAuthzPolicies(cluster);
   }
@@ -458,12 +462,12 @@ class RangerITestHarness : public MasterAuthzITestHarness 
{
                                    const unique_ptr<ExternalMiniCluster>& 
cluster) override {
     AuthorizationPolicy db_policy;
     db_policy.databases.emplace_back(p.db_name);
-    db_policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::ALL}, 
false));
+    db_policy.items.emplace_back(PolicyItem({p.user_name}, {ActionPB::ALL}, 
false));
     RETURN_NOT_OK(ranger_->AddPolicy(move(db_policy)));
     AuthorizationPolicy tbl_policy;
     tbl_policy.databases.emplace_back(p.db_name);
     tbl_policy.tables.emplace_back("*");
-    tbl_policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::ALL}, 
false));
+    tbl_policy.items.emplace_back(PolicyItem({p.user_name}, {ActionPB::ALL}, 
false));
     RETURN_NOT_OK(ranger_->AddPolicy(move(tbl_policy)));
     return RefreshAuthzPolicies(cluster);
   }
@@ -473,7 +477,7 @@ class RangerITestHarness : public MasterAuthzITestHarness {
     AuthorizationPolicy policy;
     policy.databases.emplace_back(p.db_name);
     policy.tables.emplace_back(p.table_name);
-    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::ALL}, true));
+    policy.items.emplace_back(PolicyItem({p.user_name}, {ActionPB::ALL}, 
true));
     RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
     return RefreshAuthzPolicies(cluster);
   }
@@ -483,12 +487,12 @@ class RangerITestHarness : public MasterAuthzITestHarness 
{
       const unique_ptr<ExternalMiniCluster>& cluster) override {
     AuthorizationPolicy db_policy;
     db_policy.databases.emplace_back(p.db_name);
-    db_policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::ALL}, 
true));
+    db_policy.items.emplace_back(PolicyItem({p.user_name}, {ActionPB::ALL}, 
true));
     RETURN_NOT_OK(ranger_->AddPolicy(move(db_policy)));
     AuthorizationPolicy tbl_policy;
     tbl_policy.databases.emplace_back(p.db_name);
     tbl_policy.tables.emplace_back("*");
-    tbl_policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::ALL}, 
true));
+    tbl_policy.items.emplace_back(PolicyItem({p.user_name}, {ActionPB::ALL}, 
true));
     RETURN_NOT_OK(ranger_->AddPolicy(move(tbl_policy)));
     return RefreshAuthzPolicies(cluster);
   }
@@ -557,6 +561,7 @@ class MasterAuthzITestBase : public 
ExternalMiniClusterITestBase {
     // Create principals 'impala' and 'kudu'. Configure to use the latter.
     ASSERT_OK(cluster_->kdc()->CreateUserPrincipal(kImpalaUser));
     ASSERT_OK(cluster_->kdc()->CreateUserPrincipal("kudu"));
+    ASSERT_OK(cluster_->kdc()->CreateUserPrincipal(kSecondUser));
     ASSERT_OK(cluster_->kdc()->Kinit("kudu"));
 
     ASSERT_OK(harness_->SetUpExternalServiceClients(cluster_));
@@ -818,7 +823,8 @@ TEST_P(MasterAuthzITest, TestAuthzListTables) {
   ASSERT_EQ(vector<string>({ table_name }), tables);
 
   tables.clear();
-  ASSERT_OK(this->GrantGetMetadataTablePrivilege({ kDatabaseName, kSecondTable 
}));
+  ASSERT_OK(this->GrantGetMetadataTablePrivilege(
+      {kDatabaseName, kSecondTable, "{OWNER}"}));
   ASSERT_OK(this->client_->ListTables(&tables));
   unordered_set<string> tables_set(tables.begin(), tables.end());
   ASSERT_EQ(unordered_set<string>({ table_name, sec_table_name }), tables_set);
@@ -849,13 +855,68 @@ TEST_P(MasterAuthzITest, 
TestAuthzListTablesConcurrentRename) {
   ASSERT_EQ(sec_table_name, tables[0]);
 }
 
+TEST_P(MasterAuthzITest, TestAuthzGiveAwayOwnership) {
+  this->GrantAllWithGrantTablePrivilege({ kDatabaseName, kTableName, 
"{OWNER}"});
+
+  // We need to grant metadata permissions to the user, otherwise the ownership
+  // change would time out due to lack of privileges when checking the alter
+  // table progress.
+  this->GrantGetMetadataTablePrivilege({ kDatabaseName, kTableName, kTestUser 
});
+  const string table_name = Substitute("$0.$1", kDatabaseName, kTableName);
+
+  // Change table owner.
+  {
+    unique_ptr<KuduTableAlterer> alterer(
+        this->client_->NewTableAlterer(table_name));
+    ASSERT_OK(alterer->SetOwner(kSecondUser)->Alter());
+  }
+
+  // Attempt to drop a column after as a non-owner.
+  {
+    unique_ptr<KuduTableAlterer> alterer(
+        this->client_->NewTableAlterer(table_name));
+    Status s = alterer->DropColumn("int8_val")->Alter();
+    ASSERT_TRUE(s.IsNotAuthorized());
+  }
+
+  // Login as the new owner, create a new client, and alter the table.
+  {
+    ASSERT_OK(this->cluster_->kdc()->Kinit(kSecondUser));
+    ASSERT_OK(this->cluster_->CreateClient(nullptr, &this->client_));
+    unique_ptr<KuduTableAlterer> alterer(
+        this->client_->NewTableAlterer(table_name));
+    ASSERT_OK(alterer->DropColumn("int8_val")->Alter());
+  }
+}
+
+class MasterAuthzOwnerITest : public MasterAuthzITestBase,
+                              public 
::testing::WithParamInterface<std::tuple<HarnessEnum,
+                                                                              
std::string>> {
+ public:
+  void SetUp() override {
+    NO_FATALS(MasterAuthzITestBase::SetUp());
+    NO_FATALS(SetUpCluster(std::get<0>(GetParam())));
+  }
+};
+
+INSTANTIATE_TEST_CASE_P(
+    AuthzProvidersWithOwner, MasterAuthzOwnerITest,
+    ::testing::Combine(::testing::Values(kRanger),
+                       ::testing::Values(kTestUser, "{OWNER}")),
+    [](const testing::TestParamInfo<MasterAuthzOwnerITest::ParamType>& info) {
+      string user = std::get<1>(info.param) == "{OWNER}" ? "owner" : 
"nonowner";
+      return Substitute("$0_$1", HarnessEnumToString(std::get<0>(info.param)),
+                        user);
+    });
+
 // Test that when the client passes a table identifier with the table name
 // and table ID refer to different tables, the client needs permission on
 // both tables for returning TABLE_NOT_FOUND error to avoid leaking table
 // existence.
-TEST_P(MasterAuthzITest, TestMismatchedTable) {
+TEST_P(MasterAuthzOwnerITest, TestMismatchedTable) {
   const auto table_name_a = Substitute("$0.$1", kDatabaseName, kTableName);
   const auto table_name_b = Substitute("$0.$1", kDatabaseName, kSecondTable);
+  const string& kUsername = std::get<1>(GetParam());
 
   // Log in as 'test-admin' to get the tablet ID.
   ASSERT_OK(this->cluster_->kdc()->Kinit(kAdminUser));
@@ -872,12 +933,12 @@ TEST_P(MasterAuthzITest, TestMismatchedTable) {
   ASSERT_TRUE(s.IsNotAuthorized());
   ASSERT_STR_MATCHES(s.ToString(), "[Uu]nauthorized action");
 
-  ASSERT_OK(this->GrantGetMetadataTablePrivilege({ kDatabaseName, kTableName 
}));
+  ASSERT_OK(this->GrantGetMetadataTablePrivilege({ kDatabaseName, kTableName, 
kUsername }));
   s = this->GetTableLocationsWithTableId(table_name_b, table_id_a);
   ASSERT_TRUE(s.IsNotAuthorized());
   ASSERT_STR_MATCHES(s.ToString(), "[Uu]nauthorized action");
 
-  ASSERT_OK(this->GrantGetMetadataTablePrivilege({ kDatabaseName, kSecondTable 
}));
+  ASSERT_OK(this->GrantGetMetadataTablePrivilege({ kDatabaseName, 
kSecondTable, kUsername }));
   s = this->GetTableLocationsWithTableId(table_name_b, table_id_a);
   ASSERT_TRUE(s.IsNotFound());
   ASSERT_STR_CONTAINS(s.ToString(), "the table ID refers to a different 
table");
@@ -920,7 +981,7 @@ ostream& operator <<(ostream& out, const AuthzDescriptor& 
d) {
 
 class TestAuthzTable :
     public MasterAuthzITestBase,
-    public ::testing::WithParamInterface<std::tuple<HarnessEnum, 
AuthzDescriptor>> {
+    public ::testing::WithParamInterface<std::tuple<HarnessEnum, 
AuthzDescriptor, std::string>> {
  public:
   void SetUp() override {
     NO_FATALS(MasterAuthzITestBase::SetUp());
@@ -930,11 +991,18 @@ class TestAuthzTable :
 
 TEST_P(TestAuthzTable, TestAuthorizeTable) {
   const AuthzDescriptor& desc = std::get<1>(GetParam());
+  const string& owner = std::get<2>(GetParam());
+  if (desc.funcs.description == "CreateTable" && owner == "{OWNER}") {
+    // This case doesn't make sense semantically as we don't have database
+    // owners which would be required to create a table.
+    return;
+  }
   const auto table_name = Substitute("$0.$1", desc.database, desc.table_name);
   const auto new_table_name = Substitute("$0.$1",
                                          desc.database, desc.new_table_name);
   const OperationParams action_params = { table_name, new_table_name };
-  const PrivilegeParams privilege_params = { desc.database, desc.table_name };
+  const PrivilegeParams privilege_params = { desc.database, desc.table_name,
+                                             std::get<2>(GetParam()) };
 
   // User 'test-user' attempts to operate on the table without proper 
privileges.
   Status s = desc.funcs.do_action(this, action_params);
@@ -1046,15 +1114,17 @@ static const AuthzDescriptor kAuthzCombinations[] = {
       ""
     },
 };
-INSTANTIATE_TEST_CASE_P(AuthzCombinations,
-                        TestAuthzTable,
-                        ::testing::Combine(
-                            ::testing::Values(kRanger),
-                            ::testing::ValuesIn(kAuthzCombinations)),
-                        [] (const 
testing::TestParamInfo<TestAuthzTable::ParamType>& info) {
-                          return Substitute("$0_$1", 
HarnessEnumToString(std::get<0>(info.param)),
-                                            
std::get<1>(info.param).funcs.description);
-                        });
+INSTANTIATE_TEST_CASE_P(
+    AuthzCombinations, TestAuthzTable,
+    ::testing::Combine(::testing::Values(kRanger),
+                       ::testing::ValuesIn(kAuthzCombinations),
+                       ::testing::Values(kTestUser, "{OWNER}")),
+    [](const testing::TestParamInfo<TestAuthzTable::ParamType>& info) {
+      string user = std::get<2>(info.param) == "{OWNER}" ? "owner" : 
"nonowner";
+      return Substitute("$0_$1_$2",
+                        HarnessEnumToString(std::get<0>(info.param)),
+                        std::get<1>(info.param).funcs.description, user);
+    });
 
 class AuthzErrorHandlingTest :
     public MasterAuthzITestBase,
diff --git a/src/kudu/integration-tests/ts_authz-itest.cc 
b/src/kudu/integration-tests/ts_authz-itest.cc
index d3efa34..df0843e 100644
--- a/src/kudu/integration-tests/ts_authz-itest.cc
+++ b/src/kudu/integration-tests/ts_authz-itest.cc
@@ -301,7 +301,9 @@ class TSAuthzITestHarness {
   virtual Status RefreshAuthzPolicies(const unique_ptr<ExternalMiniCluster>& 
cluster) = 0;
 
   // Creates a table named 'table_ident' with 'kNumColsPerTable' columns.
-  Status CreateTable(const string& table_ident, const shared_ptr<KuduClient>& 
client) {
+  Status CreateTable(const string& table_ident,
+                     const shared_ptr<KuduClient>& client,
+                     const string& owner) {
     KuduSchema schema;
     KuduSchemaBuilder b;
     auto iter = cols_.begin();
@@ -313,8 +315,9 @@ class TSAuthzITestHarness {
     unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
     return table_creator->table_name(table_ident)
         .schema(&schema)
-        .set_range_partition_columns({ "col0" })
+        .set_range_partition_columns({"col0"})
         .num_replicas(1)
+        .set_owner(owner)
         .Create();
   }
 
@@ -459,8 +462,8 @@ class TSAuthzITest : public ExternalMiniClusterITestBase,
     ASSERT_OK(cluster_->CreateClient(/*builder*/nullptr, &client_));
   }
 
-  Status CreateTable(const string& table_ident) {
-    return harness_->CreateTable(table_ident, client_);
+  Status CreateTable(const string& table_ident, const string& owner) {
+    return harness_->CreateTable(table_ident, client_, owner);
   }
 
   void TearDown() override {
@@ -481,7 +484,7 @@ TEST_P(TSAuthzITest, TestReadsAndWrites) {
   vector<string> tables;
   for (int i = 0; i < kNumTables; i++) {
     string table_name = Substitute("$0$1", kTablePrefix, i);
-    ASSERT_OK(CreateTable(Substitute("$0.$1", kDb, table_name)));
+    ASSERT_OK(CreateTable(Substitute("$0.$1", kDb, table_name), 
harness_->users_[0]));
     tables.emplace_back(std::move(table_name));
   }
 
@@ -510,18 +513,21 @@ TEST_P(TSAuthzITest, TestReadsAndWrites) {
       clients.emplace_back(std::move(client));
     }
     EmplaceOrDie(&user_to_clients, user, std::move(clients));
+    const string& user_to_grant = i ? user : "{OWNER}";
 
-    // Generate privileges for each user for every table, and grant the
-    // appropriate privileges.
+    // Generate privileges for each user for every table, and grant
+    // the appropriate privileges.
     TableNameToPrivileges table_to_privileges;
     for (const string& table_name : tables) {
       RWPrivileges granted_privileges = GeneratePrivileges(cols, &prng);
       for (const auto& wp : granted_privileges.table_write_privileges) {
-        ASSERT_OK(harness_->GrantTablePrivilege(kDb, table_name, user, 
WritePrivilegeToString(wp),
+        ASSERT_OK(harness_->GrantTablePrivilege(kDb, table_name, user_to_grant,
+                                                WritePrivilegeToString(wp),
                                                 /*admin=*/false, cluster_));
       }
       for (const auto& col : granted_privileges.column_scan_privileges) {
-        ASSERT_OK(harness_->GrantColumnPrivilege(kDb, table_name, col, user, 
"SELECT",
+        ASSERT_OK(harness_->GrantColumnPrivilege(kDb, table_name, col,
+                                                 user_to_grant, "SELECT",
                                                  /*admin=*/false, cluster_));
       }
       RWPrivileges not_granted_privileges = ComplementaryPrivileges(cols, 
granted_privileges);
@@ -587,12 +593,12 @@ TEST_P(TSAuthzITest, TestAlters) {
 
   static const string kTableName = "table";
   const string table_ident = Substitute("$0.$1", kDb, kTableName);
-  ASSERT_OK(CreateTable(table_ident));
-
   const string user = "user0";
   ASSERT_OK(cluster_->kdc()->CreateUserPrincipal(user));
   ASSERT_OK(cluster_->kdc()->Kinit(user));
 
+  ASSERT_OK(CreateTable(table_ident, user));
+
   shared_ptr<KuduClient> user_client;
   ASSERT_OK(cluster_->CreateClient(nullptr, &user_client));
 
diff --git a/src/kudu/master/authz_provider.h b/src/kudu/master/authz_provider.h
index c8042ac..1c98b15 100644
--- a/src/kudu/master/authz_provider.h
+++ b/src/kudu/master/authz_provider.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <string>
+#include <unordered_map>
 #include <unordered_set>
 
 #include "kudu/gutil/port.h"
@@ -61,7 +62,8 @@ class AuthzProvider {
   // If the operation is not authorized, returns Status::NotAuthorized().
   // Otherwise, may return other Status error codes depend on actual errors.
   virtual Status AuthorizeDropTable(const std::string& table_name,
-                                    const std::string& user) 
WARN_UNUSED_RESULT = 0;
+                                    const std::string& user,
+                                    bool is_owner) WARN_UNUSED_RESULT = 0;
 
   // Checks if the table alteration is authorized for the given user.
   //
@@ -69,7 +71,8 @@ class AuthzProvider {
   // Otherwise, may return other Status error codes depend on actual errors.
   virtual Status AuthorizeAlterTable(const std::string& old_table,
                                      const std::string& new_table,
-                                     const std::string& user) 
WARN_UNUSED_RESULT = 0;
+                                     const std::string& user,
+                                     bool is_owner) WARN_UNUSED_RESULT = 0;
 
   // Checks if retrieving metadata about the table is authorized for the
   // given user. For example, when checking for table presence or locations.
@@ -77,7 +80,8 @@ class AuthzProvider {
   // If the operation is not authorized, returns Status::NotAuthorized().
   // Otherwise, may return other Status error codes depend on actual errors.
   virtual Status AuthorizeGetTableMetadata(const std::string& table_name,
-                                           const std::string& user) 
WARN_UNUSED_RESULT = 0;
+                                           const std::string& user,
+                                           bool is_owner) WARN_UNUSED_RESULT = 
0;
 
   // Filters the given table names, removing any the user is not authorized to
   // see.
@@ -87,7 +91,7 @@ class AuthzProvider {
   // useful, e.g. to indicate that the caller needs to verify the table names
   // have not changed during authorization.
   virtual Status AuthorizeListTables(const std::string& user,
-                                     std::unordered_set<std::string>* 
table_names,
+                                     std::unordered_map<std::string, bool>* 
is_owner_by_table_name,
                                      bool* checked_table_names) 
WARN_UNUSED_RESULT = 0;
 
   // Checks if statistics of the table is authorized for the
@@ -96,7 +100,8 @@ class AuthzProvider {
   // If the operation is not authorized, returns Status::NotAuthorized().
   // Otherwise, may return other Status error codes depend on actual errors.
   virtual Status AuthorizeGetTableStatistics(const std::string& table_name,
-                                             const std::string& user) 
WARN_UNUSED_RESULT = 0;
+                                             const std::string& user,
+                                             bool is_owner) WARN_UNUSED_RESULT 
= 0;
 
   // Populates the privilege fields of 'pb' with the table-specific privileges
   // for the given user, using 'schema_pb' for metadata (e.g. column IDs). This
@@ -104,6 +109,7 @@ class AuthzProvider {
   // as such, it is expected that the table ID field is already set.
   virtual Status FillTablePrivilegePB(const std::string& table_name,
                                       const std::string& user,
+                                      bool is_owner,
                                       const SchemaPB& schema_pb,
                                       security::TablePrivilegePB* pb) 
WARN_UNUSED_RESULT = 0;
 
diff --git a/src/kudu/master/catalog_manager.cc 
b/src/kudu/master/catalog_manager.cc
index f3ee81d..706346b 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -1856,8 +1856,9 @@ Status CatalogManager::IsCreateTableDone(const 
IsCreateTableDoneRequestPB* req,
   //    the user is authorized to operate on the table.
   scoped_refptr<TableInfo> table;
   TableMetadataLock l;
-  auto authz_func = [&] (const string& username, const string& table_name) {
-    return SetupError(authz_provider_->AuthorizeGetTableMetadata(table_name, 
username),
+  auto authz_func = [&] (const string& username, const string& table_name, 
const string& owner) {
+    return SetupError(authz_provider_->AuthorizeGetTableMetadata(table_name, 
username,
+                                                                 username == 
owner),
                       resp, MasterErrorPB::NOT_AUTHORIZED);
   };
   RETURN_NOT_OK(FindLockAndAuthorizeTable(*req, resp, LockMode::READ, 
authz_func, user,
@@ -1941,9 +1942,9 @@ Status CatalogManager::FindLockAndAuthorizeTable(
   // *----------------*--------------*---------------------*-----------------*
   // | NO             | NO           | N/A                 | InvalidArgument |
   // *----------------*--------------*---------------------*-----------------*
-  auto authorize = [&] (const string& name) {
+  auto authorize = [&] (const string& name, const string& owner) {
     if (user) {
-      return authz_func(*user, name);
+      return authz_func(*user, name, owner);
     }
     return Status::OK();
   };
@@ -1956,7 +1957,7 @@ Status CatalogManager::FindLockAndAuthorizeTable(
 
   scoped_refptr<TableInfo> table;
   // Set to true if the client-provided table name and ID refer to different 
tables.
-  bool mismatched_table = false;
+  scoped_refptr<TableInfo> table_with_mismatched_name;
   {
     shared_lock<LockType> l(lock_);
     if (table_identifier.has_table_id()) {
@@ -1964,10 +1965,11 @@ Status CatalogManager::FindLockAndAuthorizeTable(
 
       // If the request contains both a table ID and table name, ensure that
       // both match the same table.
+      auto table_by_name = FindPtrOrNull(normalized_table_names_map_,
+                                         
NormalizeTableName(table_identifier.table_name()));
       if (table_identifier.has_table_name() &&
-          table.get() != FindPtrOrNull(normalized_table_names_map_,
-                                       
NormalizeTableName(table_identifier.table_name())).get()) {
-        mismatched_table = true;
+          table.get() != table_by_name.get()) {
+        table_with_mismatched_name.swap(table_by_name);
       }
     } else if (table_identifier.has_table_name()) {
       table = FindPtrOrNull(normalized_table_names_map_,
@@ -1985,7 +1987,9 @@ Status CatalogManager::FindLockAndAuthorizeTable(
   // NOT_AUTHORIZED error, to avoid leaking table existence.
   if (!table) {
     if (table_identifier.has_table_name()) {
-      
RETURN_NOT_OK(authorize(NormalizeTableName(table_identifier.table_name())));
+      // if the table doesn't exist, we don't have ownership information to 
pass
+      // to authorize().
+      
RETURN_NOT_OK(authorize(NormalizeTableName(table_identifier.table_name()), ""));
     }
     return tnf_error();
   }
@@ -1994,7 +1998,7 @@ Status CatalogManager::FindLockAndAuthorizeTable(
   // found is authorized.
   TableMetadataLock lock(table.get(), lock_mode);
   string table_name = NormalizeTableName(lock.data().name());
-  RETURN_NOT_OK(authorize(table_name));
+  RETURN_NOT_OK(authorize(table_name, lock.data().owner()));
 
   // If the table name and table ID refer to different tables, for example,
   //   1. the ID maps to table A.
@@ -2002,8 +2006,10 @@ Status CatalogManager::FindLockAndAuthorizeTable(
   //
   // Authorize user against both tables, then return TABLE_NOT_FOUND error to
   // avoid leaking table existence.
-  if (mismatched_table) {
-    
RETURN_NOT_OK(authorize(NormalizeTableName(table_identifier.table_name())));
+  if (table_with_mismatched_name) {
+    TableMetadataLock lock_table_by_name(table_with_mismatched_name.get(), 
lock_mode);
+    
RETURN_NOT_OK(authorize(NormalizeTableName(lock_table_by_name.data().name()),
+                            lock_table_by_name.data().owner()));
     return SetupError(
         Status::NotFound(
             Substitute("the table ID refers to a different table '$0' than 
'$1'",
@@ -2050,8 +2056,9 @@ Status CatalogManager::DeleteTableRpc(const 
DeleteTableRequestPB& req,
     // to operate on the table.
     scoped_refptr<TableInfo> table;
     TableMetadataLock l;
-    auto authz_func = [&](const string& username, const string& table_name) {
-      return SetupError(authz_provider_->AuthorizeDropTable(table_name, 
username),
+    auto authz_func = [&](const string& username, const string& table_name, 
const string& owner) {
+      return SetupError(authz_provider_->AuthorizeDropTable(table_name, 
username,
+                                                            username == owner),
                         resp, MasterErrorPB::NOT_AUTHORIZED);
     };
     RETURN_NOT_OK(FindLockAndAuthorizeTable(req, resp, LockMode::READ, 
authz_func, user,
@@ -2112,8 +2119,8 @@ Status CatalogManager::DeleteTable(const 
DeleteTableRequestPB& req,
   //    to operate on the table. Last, mark it as removed.
   scoped_refptr<TableInfo> table;
   TableMetadataLock l;
-  auto authz_func = [&] (const string& username, const string& table_name) {
-    return SetupError(authz_provider_->AuthorizeDropTable(table_name, 
username),
+  auto authz_func = [&] (const string& username, const string& table_name, 
const string& owner) {
+    return SetupError(authz_provider_->AuthorizeDropTable(table_name, 
username, username == owner),
                       resp, MasterErrorPB::NOT_AUTHORIZED);
   };
   RETURN_NOT_OK(FindLockAndAuthorizeTable(req, resp, LockMode::WRITE, 
authz_func, user,
@@ -2480,10 +2487,11 @@ Status CatalogManager::AlterTableRpc(const 
AlterTableRequestPB& req,
     TableMetadataLock l;
     string normalized_new_table_name = 
NormalizeTableName(req.new_table_name());
     auto authz_func = [&](const string& username,
-                          const string& table_name) {
+                          const string& table_name,
+                          const string& owner) {
       return SetupError(authz_provider_->AuthorizeAlterTable(table_name,
                                                              
normalized_new_table_name,
-                                                             username),
+                                                             username, 
username == owner),
                         resp, MasterErrorPB::NOT_AUTHORIZED);
     };
     RETURN_NOT_OK(FindLockAndAuthorizeTable(req, resp, LockMode::READ, 
authz_func, user,
@@ -2600,11 +2608,14 @@ Status CatalogManager::AlterTable(const 
AlterTableRequestPB& req,
   scoped_refptr<TableInfo> table;
   TableMetadataLock l;
   auto authz_func = [&] (const string& username,
-                         const string& table_name) {
+                         const string& table_name,
+                         const string& owner) {
     const string new_table = req.has_new_table_name() ?
         NormalizeTableName(req.new_table_name()) : table_name;
-    return SetupError(authz_provider_->AuthorizeAlterTable(table_name, 
new_table, username),
+    return SetupError(authz_provider_->AuthorizeAlterTable(table_name, 
new_table, username,
+                                                           username == owner),
                       resp, MasterErrorPB::NOT_AUTHORIZED);
+
   };
   RETURN_NOT_OK(FindLockAndAuthorizeTable(req, resp, LockMode::WRITE, 
authz_func, user,
                                           &table, &l));
@@ -2903,8 +2914,9 @@ Status CatalogManager::IsAlterTableDone(const 
IsAlterTableDoneRequestPB* req,
   //    the user is authorized to operate on the table.
   scoped_refptr<TableInfo> table;
   TableMetadataLock l;
-  auto authz_func = [&] (const string& username, const string& table_name) {
-    return SetupError(authz_provider_->AuthorizeGetTableMetadata(table_name, 
username),
+  auto authz_func = [&] (const string& username, const string& table_name, 
const string& owner) {
+    return SetupError(authz_provider_->AuthorizeGetTableMetadata(table_name, 
username,
+                                                                 username == 
owner),
                       resp, MasterErrorPB::NOT_AUTHORIZED);
   };
   RETURN_NOT_OK(FindLockAndAuthorizeTable(*req, resp, LockMode::READ, 
authz_func, user,
@@ -2930,8 +2942,9 @@ Status CatalogManager::GetTableSchema(const 
GetTableSchemaRequestPB* req,
   scoped_refptr<TableInfo> table;
   TableMetadataLock l;
 
-  auto authz_func = [&] (const string& username, const string& table_name) {
-    return SetupError(authz_provider_->AuthorizeGetTableMetadata(table_name, 
username),
+  auto authz_func = [&] (const string& username, const string& table_name, 
const string& owner) {
+    return SetupError(authz_provider_->AuthorizeGetTableMetadata(table_name, 
username,
+                                                                 username == 
owner),
                       resp, MasterErrorPB::NOT_AUTHORIZED);
   };
   RETURN_NOT_OK(FindLockAndAuthorizeTable(*req, resp, LockMode::READ, 
authz_func, user,
@@ -2948,8 +2961,9 @@ Status CatalogManager::GetTableSchema(const 
GetTableSchemaRequestPB* req,
     TablePrivilegePB table_privilege;
     table_privilege.set_table_id(table->id());
     RETURN_NOT_OK(
-        SetupError(authz_provider_->FillTablePrivilegePB(l.data().name(), 
*user, schema_pb,
-                                                         &table_privilege),
+        SetupError(authz_provider_->FillTablePrivilegePB(l.data().name(), 
*user,
+                                                         *user == 
l.data().owner(),
+                                                         schema_pb, 
&table_privilege),
                    resp, MasterErrorPB::UNKNOWN_ERROR));
     security::SignedTokenPB authz_token;
     RETURN_NOT_OK(token_signer->GenerateAuthzToken(
@@ -2981,12 +2995,13 @@ Status CatalogManager::ListTables(const 
ListTablesRequestPB* req,
     }
   }
   unordered_map<string, scoped_refptr<TableInfo>> table_info_by_name;
-  unordered_set<string> table_names;
+  unordered_map<string, bool> table_owner_map;
   for (const auto& table_info : tables_info) {
     TableMetadataLock ltm(table_info.get(), LockMode::READ);
     if (!ltm.data().is_running()) continue; // implies !is_deleted() too
 
     const string& table_name = ltm.data().name();
+    const string& owner = ltm.data().owner();
     if (req->has_name_filter()) {
       size_t found = table_name.find(req->name_filter());
       if (found == string::npos) {
@@ -2994,20 +3009,21 @@ Status CatalogManager::ListTables(const 
ListTablesRequestPB* req,
       }
     }
     InsertOrUpdate(&table_info_by_name, table_name, table_info);
-    EmplaceIfNotPresent(&table_names, table_name);
+    EmplaceIfNotPresent(&table_owner_map, table_name, owner == *user);
   }
 
   
MAYBE_INJECT_FIXED_LATENCY(FLAGS_catalog_manager_inject_latency_list_authz_ms);
   bool checked_table_names = false;
   if (user) {
     RETURN_NOT_OK(authz_provider_->AuthorizeListTables(
-        *user, &table_names, &checked_table_names));
+        *user, &table_owner_map, &checked_table_names));
   }
 
   // If we checked privileges, do another pass over the tables to filter out
   // any that may have been altered while authorizing.
   if (checked_table_names) {
-    for (const auto& table_name : table_names) {
+    for (const auto& table_owner_pair : table_owner_map) {
+      const auto& table_name = table_owner_pair.first;
       const auto& table_info = FindOrDie(table_info_by_name, table_name);
       TableMetadataLock ltm(table_info.get(), LockMode::READ);
       if (!ltm.data().is_running()) continue;
@@ -3041,8 +3057,9 @@ Status CatalogManager::GetTableStatistics(const 
GetTableStatisticsRequestPB* req
 
   scoped_refptr<TableInfo> table;
   TableMetadataLock l;
-  auto authz_func = [&] (const string& username, const string& table_name) {
-      return 
SetupError(authz_provider_->AuthorizeGetTableStatistics(table_name, username),
+  auto authz_func = [&] (const string& username, const string& table_name, 
const string& owner) {
+      return 
SetupError(authz_provider_->AuthorizeGetTableStatistics(table_name, username,
+                                                                     username 
== owner),
                         resp, MasterErrorPB::NOT_AUTHORIZED);
   };
   RETURN_NOT_OK(FindLockAndAuthorizeTable(*req, resp, LockMode::READ, 
authz_func, user,
@@ -5065,7 +5082,7 @@ Status CatalogManager::GetTabletLocations(const string& 
tablet_id,
     // the table that the tablet belongs to.
     TableMetadataLock table_lock(tablet_info->table().get(), LockMode::READ);
     RETURN_NOT_OK(authz_provider_->AuthorizeGetTableMetadata(
-        NormalizeTableName(table_lock.data().name()), *user));
+        NormalizeTableName(table_lock.data().name()), *user, *user == 
table_lock.data().owner()));
   }
 
   return BuildLocationsForTablet(tablet_info, filter, locs_pb, ts_infos_dict);
@@ -5178,8 +5195,9 @@ Status CatalogManager::GetTableLocations(const 
GetTableLocationsRequestPB* req,
   // the user is authorized to operate on the table.
   scoped_refptr<TableInfo> table;
   TableMetadataLock l;
-  auto authz_func = [&] (const string& username, const string& table_name) {
-    return SetupError(authz_provider_->AuthorizeGetTableMetadata(table_name, 
username),
+  auto authz_func = [&] (const string& username, const string& table_name, 
const string& owner) {
+    return SetupError(authz_provider_->AuthorizeGetTableMetadata(table_name, 
username,
+                                                                 username == 
owner),
                       resp, MasterErrorPB::NOT_AUTHORIZED);
   };
   RETURN_NOT_OK(FindLockAndAuthorizeTable(*req, resp, LockMode::READ, 
authz_func, user,
diff --git a/src/kudu/master/default_authz_provider.h 
b/src/kudu/master/default_authz_provider.h
index 3deb6d2..84da23e 100644
--- a/src/kudu/master/default_authz_provider.h
+++ b/src/kudu/master/default_authz_provider.h
@@ -18,7 +18,7 @@
 #pragma once
 
 #include <string>
-#include <unordered_set>
+#include <unordered_map>
 
 #include <glog/logging.h>
 
@@ -47,35 +47,40 @@ class DefaultAuthzProvider : public AuthzProvider {
   }
 
   Status AuthorizeDropTable(const std::string& /*table_name*/,
-                            const std::string& /*user*/) override 
WARN_UNUSED_RESULT {
+                            const std::string& /*user*/,
+                            bool /*is_owner*/) override WARN_UNUSED_RESULT {
     return Status::OK();
   }
 
   Status AuthorizeAlterTable(const std::string& /*old_table*/,
                              const std::string& /*new_table*/,
-                             const std::string& /*user*/) override 
WARN_UNUSED_RESULT {
+                             const std::string& /*user*/,
+                             bool /*is_owner*/) override WARN_UNUSED_RESULT {
     return Status::OK();
   }
 
   Status AuthorizeGetTableMetadata(const std::string& /*table_name*/,
-                                   const std::string& /*user*/) override 
WARN_UNUSED_RESULT {
+                                   const std::string& /*user*/,
+                                   bool /*is_owner*/) override 
WARN_UNUSED_RESULT {
     return Status::OK();
   }
 
   Status AuthorizeListTables(const std::string& /*user*/,
-                             std::unordered_set<std::string>* /*table_names*/,
+                             std::unordered_map<std::string, bool>* 
/*is_owner_by_table_name*/,
                              bool* checked_table_names) override 
WARN_UNUSED_RESULT {
     *checked_table_names = false;
     return Status::OK();
   }
 
   Status AuthorizeGetTableStatistics(const std::string& /*table_name*/,
-                                     const std::string& /*user*/) override 
WARN_UNUSED_RESULT {
+                                     const std::string& /*user*/,
+                                     bool /*is_owner*/) override 
WARN_UNUSED_RESULT {
     return Status::OK();
   }
 
   Status FillTablePrivilegePB(const std::string& /*table_name*/,
                               const std::string& /*user*/,
+                              bool /*is_owner*/,
                               const SchemaPB& /*schema_pb*/,
                               security::TablePrivilegePB* pb) override 
WARN_UNUSED_RESULT {
     DCHECK(pb);
diff --git a/src/kudu/master/ranger_authz_provider.cc 
b/src/kudu/master/ranger_authz_provider.cc
index 42289cb..25cdd06 100644
--- a/src/kudu/master/ranger_authz_provider.cc
+++ b/src/kudu/master/ranger_authz_provider.cc
@@ -18,6 +18,8 @@
 #include "kudu/master/ranger_authz_provider.h"
 
 #include <ostream>
+#include <unordered_map>
+#include <unordered_set>
 
 #include <gflags/gflags.h>
 #include <glog/logging.h>
@@ -39,6 +41,7 @@ using kudu::ranger::ActionPB;
 using kudu::ranger::ActionHash;
 using kudu::ranger::RangerClient;
 using std::string;
+using std::unordered_map;
 using std::unordered_set;
 using strings::Substitute;
 
@@ -100,8 +103,8 @@ Status RangerAuthzProvider::AuthorizeCreateTable(const 
string& table_name,
   RETURN_NOT_OK(client_.AuthorizeAction(user, requires_delegate_admin
                                                 ? ActionPB::ALL
                                                 : ActionPB::CREATE,
-                                        db, tbl, requires_delegate_admin, 
&authorized,
-                                        RangerClient::Scope::DATABASE));
+                                        db, tbl, /*is_owner=*/false, 
requires_delegate_admin,
+                                        &authorized, 
RangerClient::Scope::DATABASE));
 
   if (PREDICT_FALSE(!authorized)) {
     LOG(WARNING) << Substitute("User $0 is not authorized to CREATE $1", user, 
table_name);
@@ -112,7 +115,8 @@ Status RangerAuthzProvider::AuthorizeCreateTable(const 
string& table_name,
 }
 
 Status RangerAuthzProvider::AuthorizeDropTable(const string& table_name,
-                                               const string& user) {
+                                               const string& user,
+                                               bool is_owner) {
   if (IsTrustedUser(user)) {
     return Status::OK();
   }
@@ -122,7 +126,7 @@ Status RangerAuthzProvider::AuthorizeDropTable(const 
string& table_name,
 
   RETURN_NOT_OK(ParseTableIdentifier(table_name, &db, &tbl));
   bool authorized;
-  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::DROP, db, tbl,
+  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::DROP, db, tbl, 
is_owner,
                                         /*requires_delegate_admin=*/false, 
&authorized));
 
   if (PREDICT_FALSE(!authorized)) {
@@ -135,7 +139,8 @@ Status RangerAuthzProvider::AuthorizeDropTable(const 
string& table_name,
 
 Status RangerAuthzProvider::AuthorizeAlterTable(const string& old_table,
                                                 const string& new_table,
-                                                const string& user) {
+                                                const string& user,
+                                                bool is_owner) {
   if (IsTrustedUser(user)) {
     return Status::OK();
   }
@@ -147,7 +152,7 @@ Status RangerAuthzProvider::AuthorizeAlterTable(const 
string& old_table,
   // Table alteration (without table rename) requires ALTER ON TABLE.
   bool authorized;
   if (old_table == new_table) {
-    RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::ALTER, old_db, 
old_tbl,
+    RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::ALTER, old_db, 
old_tbl, is_owner,
                                           /*requires_delegate_admin=*/false, 
&authorized));
 
     if (PREDICT_FALSE(!authorized)) {
@@ -160,7 +165,7 @@ Status RangerAuthzProvider::AuthorizeAlterTable(const 
string& old_table,
 
   // To prevent privilege escalation we require ALL on the old TABLE
   // and CREATE on the new DATABASE for table rename.
-  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::ALL, old_db, old_tbl,
+  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::ALL, old_db, old_tbl, 
is_owner,
                                         /*requires_delegate_admin=*/false, 
&authorized));
   if (PREDICT_FALSE(!authorized)) {
     LOG(WARNING) << Substitute("User $0 is not authorized to perform ALL on 
$1", user, old_table);
@@ -171,7 +176,7 @@ Status RangerAuthzProvider::AuthorizeAlterTable(const 
string& old_table,
   string new_tbl;
 
   RETURN_NOT_OK(ParseTableIdentifier(new_table, &new_db, &new_tbl));
-  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::CREATE, new_db, 
new_tbl,
+  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::CREATE, new_db, 
new_tbl, is_owner,
                                         /*requires_delegate_admin=*/false, 
&authorized,
                                         RangerClient::Scope::DATABASE));
 
@@ -184,7 +189,8 @@ Status RangerAuthzProvider::AuthorizeAlterTable(const 
string& old_table,
 }
 
 Status RangerAuthzProvider::AuthorizeGetTableMetadata(const string& table_name,
-                                                      const string& user) {
+                                                      const string& user,
+                                                      bool is_owner) {
   if (IsTrustedUser(user)) {
     return Status::OK();
   }
@@ -195,7 +201,7 @@ Status RangerAuthzProvider::AuthorizeGetTableMetadata(const 
string& table_name,
   RETURN_NOT_OK(ParseTableIdentifier(table_name, &db, &tbl));
   bool authorized;
   // Get table metadata requires 'METADATA ON TABLE' privilege.
-  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::METADATA, db, tbl,
+  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::METADATA, db, tbl, 
is_owner,
                                         /*requires_delegate_admin=*/false, 
&authorized));
 
   if (PREDICT_FALSE(!authorized)) {
@@ -208,7 +214,7 @@ Status RangerAuthzProvider::AuthorizeGetTableMetadata(const 
string& table_name,
 }
 
 Status RangerAuthzProvider::AuthorizeListTables(const string& user,
-                                                unordered_set<string>* 
table_names,
+                                                unordered_map<string, bool>* 
is_owner_by_table_name,
                                                 bool* checked_table_names) {
   if (IsTrustedUser(user)) {
     *checked_table_names = false;
@@ -218,16 +224,17 @@ Status RangerAuthzProvider::AuthorizeListTables(const 
string& user,
   *checked_table_names = true;
 
   // Return immediately if there is no tables to authorize against.
-  if (table_names->empty()) {
+  if (is_owner_by_table_name->empty()) {
     return Status::OK();
   }
 
   // List tables requires 'METADATA ON TABLE' privilege on all tables being 
listed.
-  return client_.AuthorizeActionMultipleTables(user, ActionPB::METADATA, 
table_names);
+  return client_.AuthorizeActionMultipleTables(user, ActionPB::METADATA, 
is_owner_by_table_name);
 }
 
 Status RangerAuthzProvider::AuthorizeGetTableStatistics(const string& 
table_name,
-                                                        const string& user) {
+                                                        const string& user,
+                                                        bool is_owner) {
   if (IsTrustedUser(user)) {
     return Status::OK();
   }
@@ -238,7 +245,7 @@ Status 
RangerAuthzProvider::AuthorizeGetTableStatistics(const string& table_name
   bool authorized;
   // Statistics contain data (e.g. number of rows) that requires the 'SELECT 
ON TABLE'
   // privilege.
-  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::SELECT, db, tbl,
+  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::SELECT, db, tbl, 
is_owner,
                                         /*requires_delegate_admin=*/false, 
&authorized));
 
   if (PREDICT_FALSE(!authorized)) {
@@ -251,6 +258,7 @@ Status 
RangerAuthzProvider::AuthorizeGetTableStatistics(const string& table_name
 
 Status RangerAuthzProvider::FillTablePrivilegePB(const string& table_name,
                                                  const string& user,
+                                                 bool is_owner,
                                                  const SchemaPB& schema_pb,
                                                  TablePrivilegePB* pb) {
   DCHECK(pb);
@@ -264,7 +272,7 @@ Status RangerAuthzProvider::FillTablePrivilegePB(const 
string& table_name,
   if (IsTrustedUser(user)) {
     authorized = true;
   } else {
-    RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::ALL, db, tbl,
+    RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::ALL, db, tbl, 
is_owner,
                                           /*requires_delegate_admin=*/false, 
&authorized));
   }
   if (authorized) {
@@ -284,7 +292,7 @@ Status RangerAuthzProvider::FillTablePrivilegePB(const 
string& table_name,
 
   // Check if the user has any table-level privileges. If yes, we set them. If
   // select is included, we can also return.
-  RETURN_NOT_OK(client_.AuthorizeActions(user, db, tbl, &actions));
+  RETURN_NOT_OK(client_.AuthorizeActions(user, db, tbl, is_owner, &actions));
   for (const ActionPB& action : actions) {
     switch (action) {
       case ActionPB::DELETE:
@@ -321,7 +329,7 @@ Status RangerAuthzProvider::FillTablePrivilegePB(const 
string& table_name,
 
   // TODO(abukor): revisit if it's worth merging this into the previous request
   RETURN_NOT_OK(client_.AuthorizeActionMultipleColumns(user, ActionPB::SELECT, 
db, tbl,
-                                                       &column_names));
+                                                       is_owner, 
&column_names));
 
   for (const auto& col : schema_pb.columns()) {
     if (ContainsKey(column_names, col.name())) {
diff --git a/src/kudu/master/ranger_authz_provider.h 
b/src/kudu/master/ranger_authz_provider.h
index bfb8bdb..8966f7b 100644
--- a/src/kudu/master/ranger_authz_provider.h
+++ b/src/kudu/master/ranger_authz_provider.h
@@ -18,7 +18,7 @@
 #pragma once
 
 #include <string>
-#include <unordered_set>
+#include <unordered_map>
 
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
@@ -55,24 +55,29 @@ class RangerAuthzProvider : public AuthzProvider {
                               const std::string& owner) override 
WARN_UNUSED_RESULT;
 
   Status AuthorizeDropTable(const std::string& table_name,
-                            const std::string& user) override 
WARN_UNUSED_RESULT;
+                            const std::string& user,
+                            bool is_owner) override WARN_UNUSED_RESULT;
 
   Status AuthorizeAlterTable(const std::string& old_table,
                              const std::string& new_table,
-                             const std::string& user) override 
WARN_UNUSED_RESULT;
+                             const std::string& user,
+                             bool is_owner) override WARN_UNUSED_RESULT;
 
   Status AuthorizeGetTableMetadata(const std::string& table_name,
-                                   const std::string& user) override 
WARN_UNUSED_RESULT;
+                                   const std::string& user,
+                                   bool is_owner) override WARN_UNUSED_RESULT;
 
   Status AuthorizeListTables(const std::string& user,
-                             std::unordered_set<std::string>* table_names,
+                             std::unordered_map<std::string, bool>* 
is_owner_by_table_name,
                              bool* checked_table_names) override 
WARN_UNUSED_RESULT;
 
   Status AuthorizeGetTableStatistics(const std::string& table_name,
-                                     const std::string& user) override 
WARN_UNUSED_RESULT;
+                                     const std::string& user,
+                                     bool is_owner) override 
WARN_UNUSED_RESULT;
 
   Status FillTablePrivilegePB(const std::string& table_name,
                               const std::string& user,
+                              bool is_owner,
                               const SchemaPB& schema_pb,
                               security::TablePrivilegePB* pb) override 
WARN_UNUSED_RESULT;
 
diff --git a/src/kudu/ranger/ranger.proto b/src/kudu/ranger/ranger.proto
index 2bfcb13..9c1285c 100644
--- a/src/kudu/ranger/ranger.proto
+++ b/src/kudu/ranger/ranger.proto
@@ -70,6 +70,9 @@ message RangerRequestPB {
 
   // Whether delegate admin privilege is required to perform the action.
   optional bool requires_delegate_admin = 5;
+
+  // Whether the owner is performing the action.
+  optional bool is_owner = 6;
 }
 
 // Describes a single Ranger authorization response for a single user.
diff --git a/src/kudu/ranger/ranger_client-test.cc 
b/src/kudu/ranger/ranger_client-test.cc
index fb3f0b0..b3f1c9a 100644
--- a/src/kudu/ranger/ranger_client-test.cc
+++ b/src/kudu/ranger/ranger_client-test.cc
@@ -20,6 +20,7 @@
 #include <cstddef>
 #include <memory>
 #include <string>
+#include <unordered_map>
 #include <unordered_set>
 #include <utility>
 #include <vector>
@@ -63,6 +64,7 @@ using kudu::subprocess::SubprocessResponsePB;
 using kudu::subprocess::SubprocessServer;
 using std::move;
 using std::string;
+using std::unordered_map;
 using std::unordered_set;
 using std::vector;
 using strings::SkipEmpty;
@@ -172,7 +174,7 @@ class RangerClientTest : public KuduTest {
 
 TEST_F(RangerClientTest, TestAuthorizeCreateTableUnauthorized) {
   bool authorized;
-  ASSERT_OK(client_.AuthorizeAction("jdoe", ActionPB::CREATE, "bar", "baz",
+  ASSERT_OK(client_.AuthorizeAction("jdoe", ActionPB::CREATE, "bar", "baz", 
/*is_owner=*/false,
                                     /*requires_delegate_admin=*/false, 
&authorized));
   ASSERT_FALSE(authorized);
 }
@@ -180,41 +182,41 @@ TEST_F(RangerClientTest, 
TestAuthorizeCreateTableUnauthorized) {
 TEST_F(RangerClientTest, TestAuthorizeCreateTableAuthorized) {
   Allow("jdoe", ActionPB::CREATE, "foo", "bar");
   bool authorized;
-  ASSERT_OK(client_.AuthorizeAction("jdoe", ActionPB::CREATE, "foo", "bar",
+  ASSERT_OK(client_.AuthorizeAction("jdoe", ActionPB::CREATE, "foo", "bar", 
/*is_owner=*/false,
                                     /*requires_delegate_admin=*/false, 
&authorized));
   ASSERT_TRUE(authorized);
 }
 
 TEST_F(RangerClientTest, TestAuthorizeListNoTables) {
-  unordered_set<string> tables;
+  unordered_map<string, bool> tables;
   ASSERT_OK(client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, 
&tables));
   ASSERT_EQ(0, tables.size());
 }
 
 TEST_F(RangerClientTest, TestAuthorizeListNoTablesAuthorized) {
-  unordered_set<string> tables;
-  tables.emplace("foo.bar");
-  tables.emplace("foo.baz");
+  unordered_map<string, bool> tables;
+  tables.emplace("foo.bar", false);
+  tables.emplace("foo.baz", false);
   ASSERT_OK(client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, 
&tables));
   ASSERT_EQ(0, tables.size());
 }
 
 TEST_F(RangerClientTest, TestAuthorizeMetadataSubsetOfTablesAuthorized) {
   Allow("jdoe", ActionPB::METADATA, "default", "foobar");
-  unordered_set<string> tables;
-  tables.emplace("default.foobar");
-  tables.emplace("barbaz");
+  unordered_map<string, bool> tables;
+  tables.emplace("default.foobar", false);
+  tables.emplace("barbaz", false);
   ASSERT_OK(client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, 
&tables));
   ASSERT_EQ(1, tables.size());
-  ASSERT_EQ("default.foobar", *tables.begin());
+  ASSERT_EQ("default.foobar", tables.begin()->first);
 }
 
 TEST_F(RangerClientTest, TestAuthorizeMetadataAllAuthorized) {
   Allow("jdoe", ActionPB::METADATA, "default", "foobar");
   Allow("jdoe", ActionPB::METADATA, "default", "barbaz");
-  unordered_set<string> tables;
-  tables.emplace("default.foobar");
-  tables.emplace("barbaz");
+  unordered_map<string, bool> tables;
+  tables.emplace("default.foobar", false);
+  tables.emplace("barbaz", false);
   ASSERT_OK(client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, 
&tables));
   ASSERT_EQ(2, tables.size());
   ASSERT_TRUE(ContainsKey(tables, "default.foobar"));
@@ -222,19 +224,19 @@ TEST_F(RangerClientTest, 
TestAuthorizeMetadataAllAuthorized) {
 }
 
 TEST_F(RangerClientTest, TestAuthorizeMetadataAllNonRanger) {
-  unordered_set<string> tables;
-  tables.emplace("foo.");
-  tables.emplace(".bar");
+  unordered_map<string, bool> tables;
+  tables.emplace("foo.", false);
+  tables.emplace(".bar", false);
   ASSERT_OK(client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, 
&tables));
   ASSERT_EQ(0, tables.size());
 }
 
 TEST_F(RangerClientTest, TestAuthorizeMetadataNoneAuthorizedContainsNonRanger) 
{
-  unordered_set<string> tables;
-  tables.emplace("foo.");
-  tables.emplace(".bar");
-  tables.emplace("foo.bar");
-  tables.emplace("foo.baz");
+  unordered_map<string, bool> tables;
+  tables.emplace("foo.", false);
+  tables.emplace(".bar", false);
+  tables.emplace("foo.bar", false);
+  tables.emplace("foo.baz", false);
   ASSERT_OK(client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, 
&tables));
   ASSERT_EQ(0, tables.size());
 }
@@ -242,10 +244,10 @@ TEST_F(RangerClientTest, 
TestAuthorizeMetadataNoneAuthorizedContainsNonRanger) {
 TEST_F(RangerClientTest, TestAuthorizeMetadataAllAuthorizedContainsNonRanger) {
   Allow("jdoe", ActionPB::METADATA, "default", "foobar");
   Allow("jdoe", ActionPB::METADATA, "default", "barbaz");
-  unordered_set<string> tables;
-  tables.emplace("default.foobar");
-  tables.emplace("barbaz");
-  tables.emplace("foo.");
+  unordered_map<string, bool> tables;
+  tables.emplace("default.foobar", false);
+  tables.emplace("barbaz", false);
+  tables.emplace("foo.", false);
   ASSERT_OK(client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, 
&tables));
   ASSERT_EQ(2, tables.size());
   ASSERT_TRUE(ContainsKey(tables, "default.foobar"));
@@ -262,7 +264,8 @@ TEST_F(RangerClientTest, TestAuthorizeScanSubsetAuthorized) 
{
   columns.emplace("col3");
   columns.emplace("col4");
   ASSERT_OK(client_.AuthorizeActionMultipleColumns("jdoe", ActionPB::SELECT,
-                                                   "default", "foobar", 
&columns));
+                                                   "default", "foobar", 
/*is_owner=*/false,
+                                                   &columns));
   ASSERT_EQ(2, columns.size());
   ASSERT_TRUE(ContainsKey(columns, "col1"));
   ASSERT_TRUE(ContainsKey(columns, "col3"));
@@ -281,7 +284,8 @@ TEST_F(RangerClientTest, 
TestAuthorizeScanAllColumnsAuthorized) {
   columns.emplace("col3");
   columns.emplace("col4");
   ASSERT_OK(client_.AuthorizeActionMultipleColumns("jdoe", ActionPB::SELECT,
-                                                   "default", "foobar", 
&columns));
+                                                   "default", "foobar", 
/*is_owner=*/false,
+                                                   &columns));
   ASSERT_EQ(4, columns.size());
   ASSERT_TRUE(ContainsKey(columns, "col1"));
   ASSERT_TRUE(ContainsKey(columns, "col2"));
@@ -295,7 +299,8 @@ TEST_F(RangerClientTest, 
TestAuthorizeScanNoColumnsAuthorized) {
     columns.emplace(Substitute("col$0", i));
   }
   ASSERT_OK(client_.AuthorizeActionMultipleColumns("jdoe", ActionPB::SELECT,
-                                                   "default", "foobar", 
&columns));
+                                                   "default", "foobar", 
/*is_owner=*/false,
+                                                   &columns));
   ASSERT_EQ(0, columns.size());
 }
 
@@ -304,7 +309,7 @@ TEST_F(RangerClientTest, 
TestAuthorizeActionsNoneAuthorized) {
   actions.emplace(ActionPB::DROP);
   actions.emplace(ActionPB::SELECT);
   actions.emplace(ActionPB::INSERT);
-  ASSERT_OK(client_.AuthorizeActions("jdoe", "default", "foobar", &actions));
+  ASSERT_OK(client_.AuthorizeActions("jdoe", "default", "foobar", 
/*is_owner=*/false, &actions));
   ASSERT_EQ(0, actions.size());
 }
 
@@ -314,7 +319,7 @@ TEST_F(RangerClientTest, 
TestAuthorizeActionsSomeAuthorized) {
   actions.emplace(ActionPB::DROP);
   actions.emplace(ActionPB::SELECT);
   actions.emplace(ActionPB::INSERT);
-  ASSERT_OK(client_.AuthorizeActions("jdoe", "default", "foobar", &actions));
+  ASSERT_OK(client_.AuthorizeActions("jdoe", "default", "foobar", 
/*is_owner=*/false, &actions));
   ASSERT_EQ(1, actions.size());
   ASSERT_TRUE(ContainsKey(actions, ActionPB::SELECT));
 }
@@ -327,7 +332,7 @@ TEST_F(RangerClientTest, TestAuthorizeActionsAllAuthorized) 
{
   actions.emplace(ActionPB::DROP);
   actions.emplace(ActionPB::SELECT);
   actions.emplace(ActionPB::INSERT);
-  ASSERT_OK(client_.AuthorizeActions("jdoe", "default", "foobar", &actions));
+  ASSERT_OK(client_.AuthorizeActions("jdoe", "default", "foobar", 
/*is_owner=*/false, &actions));
   ASSERT_EQ(3, actions.size());
 }
 
@@ -408,7 +413,7 @@ TEST_F(RangerClientTestBase, TestLogging) {
   // Make a request. It doesn't matter whether it succeeds or not -- debug logs
   // should include info about each request.
   bool authorized;
-  ASSERT_OK(client_->AuthorizeAction("user", ActionPB::ALL, "db", "table",
+  ASSERT_OK(client_->AuthorizeAction("user", ActionPB::ALL, "db", "table", 
/*is_owner=*/false,
                                      /*requires_delegate_admin=*/false, 
&authorized));
   ASSERT_FALSE(authorized);
   {
@@ -428,7 +433,7 @@ TEST_F(RangerClientTestBase, TestLogging) {
   FLAGS_ranger_overwrite_log_config = false;
   client_.reset(new RangerClient(env_, metric_entity_));
   ASSERT_OK(client_->Start());
-  ASSERT_OK(client_->AuthorizeAction("user", ActionPB::ALL, "db", "table",
+  ASSERT_OK(client_->AuthorizeAction("user", ActionPB::ALL, "db", "table", 
/*is_owner=*/false,
                                      /*requires_delegate_admin=*/false, 
&authorized));
   ASSERT_FALSE(authorized);
   {
@@ -445,7 +450,7 @@ TEST_F(RangerClientTestBase, TestLogging) {
   FLAGS_ranger_overwrite_log_config = true;
   client_.reset(new RangerClient(env_, metric_entity_));
   ASSERT_OK(client_->Start());
-  ASSERT_OK(client_->AuthorizeAction("user", ActionPB::ALL, "db", "table",
+  ASSERT_OK(client_->AuthorizeAction("user", ActionPB::ALL, "db", "table", 
/*is_owner=*/false,
                                      /*requires_delegate_admin=*/false, 
&authorized));
   ASSERT_FALSE(authorized);
   {
diff --git a/src/kudu/ranger/ranger_client.cc b/src/kudu/ranger/ranger_client.cc
index 375f806..3d9ffa6 100644
--- a/src/kudu/ranger/ranger_client.cc
+++ b/src/kudu/ranger/ranger_client.cc
@@ -170,8 +170,10 @@ using kudu::security::GetKrb5ConfigFile;
 using kudu::subprocess::SubprocessMetrics;
 using kudu::subprocess::SubprocessServer;
 using std::move;
+using std::pair;
 using std::string;
 using std::unique_ptr;
+using std::unordered_map;
 using std::unordered_set;
 using std::vector;
 using strings::Substitute;
@@ -394,7 +396,7 @@ Status RangerClient::Start() {
 
 // TODO(abukor): refactor to avoid code duplication
 Status RangerClient::AuthorizeAction(const string& user_name, const ActionPB& 
action,
-                                     const string& database, const string& 
table,
+                                     const string& database, const string& 
table, bool is_owner,
                                      bool requires_delegate_admin, bool* 
authorized,
                                      Scope scope) {
   DCHECK(subprocess_);
@@ -407,6 +409,7 @@ Status RangerClient::AuthorizeAction(const string& 
user_name, const ActionPB& ac
   req->set_action(action);
   req->set_database(database);
   req->set_requires_delegate_admin(requires_delegate_admin);
+  req->set_is_owner(is_owner);
   // Only pass the table name if this is table level request.
   if (scope == Scope::TABLE) {
     req->set_table(table);
@@ -421,6 +424,7 @@ Status RangerClient::AuthorizeAction(const string& 
user_name, const ActionPB& ac
 
 Status RangerClient::AuthorizeActionMultipleColumns(const string& user_name, 
const ActionPB& action,
                                                     const string& database, 
const string& table,
+                                                    bool is_owner,
                                                     unordered_set<string>* 
column_names) {
   DCHECK(subprocess_);
   DCHECK(!column_names->empty());
@@ -435,6 +439,7 @@ Status RangerClient::AuthorizeActionMultipleColumns(const 
string& user_name, con
     req->set_database(database);
     req->set_table(table);
     req->set_column(col);
+    req->set_is_owner(is_owner);
   }
 
   RETURN_NOT_OK(subprocess_->Execute(req_list, &resp_list));
@@ -454,20 +459,20 @@ Status RangerClient::AuthorizeActionMultipleColumns(const 
string& user_name, con
 }
 
 Status RangerClient::AuthorizeActionMultipleTables(const string& user_name, 
const ActionPB& action,
-                                                   unordered_set<string>* 
tables) {
+                                                   unordered_map<string, 
bool>* tables) {
   DCHECK(subprocess_);
 
   RangerRequestListPB req_list;
   RangerResponseListPB resp_list;
   req_list.set_user(user_name);
 
-  vector<string> orig_table_names;
+  vector<pair<string, bool>> orig_table_names;
 
   for (const auto& table : *tables) {
     string db;
     Slice tbl;
 
-    auto s = ParseRangerTableIdentifier(table, &db, &tbl);
+    auto s = ParseRangerTableIdentifier(table.first, &db, &tbl);
     if (PREDICT_TRUE(s.ok())) {
       orig_table_names.emplace_back(table);
 
@@ -475,8 +480,9 @@ Status RangerClient::AuthorizeActionMultipleTables(const 
string& user_name, cons
       req->set_action(action);
       req->set_database(db);
       req->set_table(tbl.ToString());
+      req->set_is_owner(table.second);
     } else {
-      LOG(WARNING) << Substitute(kDenyNonRangerTableTemplate, table);
+      LOG(WARNING) << Substitute(kDenyNonRangerTableTemplate, table.first);
     }
   }
 
@@ -484,7 +490,7 @@ Status RangerClient::AuthorizeActionMultipleTables(const 
string& user_name, cons
 
   DCHECK_EQ(orig_table_names.size(), resp_list.responses_size());
 
-  unordered_set<string> allowed_tables;
+  unordered_map<string, bool> allowed_tables;
   for (auto i = 0; i < orig_table_names.size(); ++i) {
     if (resp_list.responses(i).allowed()) {
       EmplaceOrDie(&allowed_tables, move(orig_table_names[i]));
@@ -497,8 +503,9 @@ Status RangerClient::AuthorizeActionMultipleTables(const 
string& user_name, cons
 }
 
 Status RangerClient::AuthorizeActions(const string& user_name, const string& 
database,
-                                      const string& table,
-                                      unordered_set<ActionPB, ActionHash>* 
actions) {
+                                      const string& table, bool is_owner,
+                                      unordered_set<ActionPB, ActionHash>* 
actions,
+                                      Scope scope) {
   DCHECK(subprocess_);
   DCHECK(!actions->empty());
 
@@ -510,7 +517,10 @@ Status RangerClient::AuthorizeActions(const string& 
user_name, const string& dat
     auto req = req_list.add_requests();
     req->set_action(action);
     req->set_database(database);
-    req->set_table(table);
+    if (scope == Scope::TABLE) {
+      req->set_table(table);
+      req->set_is_owner(is_owner);
+    }
   }
 
   RETURN_NOT_OK(subprocess_->Execute(req_list, &resp_list));
diff --git a/src/kudu/ranger/ranger_client.h b/src/kudu/ranger/ranger_client.h
index 097793c..baf74ed 100644
--- a/src/kudu/ranger/ranger_client.h
+++ b/src/kudu/ranger/ranger_client.h
@@ -19,6 +19,7 @@
 
 #include <memory>
 #include <string>
+#include <unordered_map>
 #include <unordered_set>
 #include <utility>
 
@@ -84,28 +85,30 @@ class RangerClient {
   // Authorizes an action on the table. Sets 'authorized' to true if it's
   // authorized, false otherwise.
   Status AuthorizeAction(const std::string& user_name, const ActionPB& action,
-                         const std::string& database, const std::string& table,
+                         const std::string& database, const std::string& 
table, bool is_owner,
                          bool requires_delegate_admin, bool* authorized,
                          Scope scope = Scope::TABLE) WARN_UNUSED_RESULT;
 
   // Authorizes action on multiple tables. It sets 'table_names' to the
   // tables the user is authorized to access.
   Status AuthorizeActionMultipleTables(const std::string& user_name, const 
ActionPB& action,
-                                       std::unordered_set<std::string>* tables)
+                                       std::unordered_map<std::string, bool>* 
tables)
     WARN_UNUSED_RESULT;
 
   // Authorizes action on multiple columns. It sets 'column_names' to the
   // columns the user is authorized to access.
   Status AuthorizeActionMultipleColumns(const std::string& user_name, const 
ActionPB& action,
                                         const std::string& database, const 
std::string& table,
+                                        bool is_owner,
                                         std::unordered_set<std::string>* 
column_names)
-      WARN_UNUSED_RESULT;
+    WARN_UNUSED_RESULT;
 
   // Authorizes multiple table-level actions on a single table. It sets
   // 'actions' to the actions the user is authorized to perform.
   Status AuthorizeActions(const std::string& user_name, const std::string& 
database,
-                          const std::string& table,
-                          std::unordered_set<ActionPB, ActionHash>* actions) 
WARN_UNUSED_RESULT;
+                          const std::string& table, bool is_owner,
+                          std::unordered_set<ActionPB, ActionHash>* actions,
+                          Scope scope = Scope::TABLE) WARN_UNUSED_RESULT;
 
   // Refreshes policies in the Ranger subprocess. This does not invalidate the
   // existing cache and doesn't fail if Ranger service is unavailable, it 
simply

Reply via email to