This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 880ffe4db635e11e45b22256c7626c80b81ccbc0
Author: Attila Bukor <[email protected]>
AuthorDate: Fri Jun 5 12:01:50 2020 +0200

    KUDU-3090 Add delegate admin privilege
    
    Database systems usually require ALL WITH GRANT OPTION privilege to
    change ownership of a table. Apache Ranger supports a special
    quasi-access type called "delegate admin" which grants the user
    permissions to grant and revoke permissions which is similar to GRANT
    OPTION in other systems.
    
    This patch adds support for this delegate option flag which is required
    in addition to "ALL" to create a table with a different owner. It also
    refactors the Ranger subprocess to allow evaluating ALL and delegate
    admin in a single RangerRequestPB.
    
    Change-Id: If8ba018dac568a1ab74cf2d5657221579636ac1c
    Reviewed-on: http://gerrit.cloudera.org:8080/16071
    Tested-by: Kudu Jenkins
    Reviewed-by: Grant Henke <[email protected]>
---
 .../subprocess/ranger/RangerProtocolHandler.java   |  18 +--
 .../ranger/authorization/RangerKuduAuthorizer.java |  76 ++++++------
 .../subprocess/ranger/TestRangerSubprocess.java    |  23 ++--
 .../authorization/TestRangerKuduAuthorizer.java    |  28 ++---
 src/kudu/integration-tests/master_authz-itest.cc   | 132 +++++++++++++++++++--
 src/kudu/integration-tests/ts_authz-itest.cc       |  30 +++--
 src/kudu/master/ranger_authz_provider.cc           |  34 ++++--
 src/kudu/ranger/mini_ranger-test.cc                |   8 +-
 src/kudu/ranger/mini_ranger.cc                     |   5 +-
 src/kudu/ranger/mini_ranger.h                      |   8 +-
 src/kudu/ranger/ranger.proto                       |   3 +
 src/kudu/ranger/ranger_client-test.cc              |  15 ++-
 src/kudu/ranger/ranger_client.cc                   |   4 +-
 src/kudu/ranger/ranger_client.h                    |   3 +-
 14 files changed, 254 insertions(+), 133 deletions(-)

diff --git 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/RangerProtocolHandler.java
 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/RangerProtocolHandler.java
index e57041e..961ad52 100644
--- 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/RangerProtocolHandler.java
+++ 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/RangerProtocolHandler.java
@@ -36,8 +36,6 @@ import 
org.apache.kudu.subprocess.ranger.authorization.RangerKuduAuthorizer;
 @InterfaceAudience.Private
 class RangerProtocolHandler extends ProtocolHandler<RangerRequestListPB,
                                                     RangerResponseListPB> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(RangerProtocolHandler.class);
-
   // The Ranger Kudu authorizer plugin. This field is not final
   // as it is used in the mock test.
   @InterfaceAudience.LimitedPrivate("Test")
@@ -49,21 +47,7 @@ class RangerProtocolHandler extends 
ProtocolHandler<RangerRequestListPB,
 
   @Override
   protected RangerResponseListPB executeRequest(RangerRequestListPB requests) {
-    RangerResponseListPB.Builder responses = RangerResponseListPB.newBuilder();
-    for (RangerAccessResult result : authz.authorize(requests)) {
-      // The result can be null when Ranger plugin fails to load the policies
-      // from the Ranger admin server.
-      // TODO(Hao): add a test for the above case.
-      boolean isAllowed = (result != null && result.getIsAllowed());
-      RangerResponsePB.Builder response = RangerResponsePB.newBuilder();
-      response.setAllowed(isAllowed);
-      responses.addResponses(response);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("RangerAccessRequest [%s] receives result 
[%s]",
-                                result.getAccessRequest().toString(), 
result.toString()));
-      }
-    }
-    return responses.build();
+    return authz.authorize(requests);
   }
 
   @Override
diff --git 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/authorization/RangerKuduAuthorizer.java
 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/authorization/RangerKuduAuthorizer.java
index 1e895a5..1e6235a 100644
--- 
a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/authorization/RangerKuduAuthorizer.java
+++ 
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ranger/authorization/RangerKuduAuthorizer.java
@@ -18,8 +18,6 @@
 package org.apache.kudu.subprocess.ranger.authorization;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
@@ -30,16 +28,17 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
-import org.apache.ranger.plugin.model.RangerServiceDef;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
 import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
 import org.apache.ranger.plugin.policyengine.RangerAccessResult;
+import org.apache.ranger.plugin.policyengine.RangerPolicyEngine;
 import org.apache.ranger.plugin.service.RangerBasePlugin;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.kudu.ranger.Ranger;
 import org.apache.kudu.ranger.Ranger.RangerRequestListPB;
 import org.apache.kudu.ranger.Ranger.RangerRequestPB;
 import org.apache.kudu.subprocess.KuduSubprocessException;
@@ -102,35 +101,20 @@ public class RangerKuduAuthorizer {
    * @return a list of RangerAccessResult
    */
   @VisibleForTesting
-  public Collection<RangerAccessResult> authorize(RangerRequestListPB 
requests) {
-    Collection<RangerAccessRequest> rangerRequests = createRequests(requests);
-    // Reject requests if user field is empty.
+  public Ranger.RangerResponseListPB authorize(RangerRequestListPB requests) {
     if (!requests.hasUser() || requests.getUser().isEmpty()) {
-      Collection<RangerAccessResult> results = new ArrayList<>();
-      for (RangerAccessRequest request : rangerRequests) {
-        // Create a 'dummy' RangerAccessResult that denies the request (to have
-        // a short cut), instead of sending the request to Ranger.
-        RangerAccessResult result = new RangerAccessResult(
-            /* policyType= */1, APP_ID,
-            new RangerServiceDef(), request);
-        result.setIsAllowed(false);
-        results.add(result);
+      Ranger.RangerResponseListPB.Builder rangerResponseListPB = 
Ranger.RangerResponseListPB
+          .newBuilder();
+      List<RangerRequestPB> requestsList = requests.getRequestsList();
+      for (int i = 0, requestsListSize = requestsList.size(); i < 
requestsListSize; i++) {
+        Ranger.RangerResponsePB response = Ranger.RangerResponsePB.newBuilder()
+            .setAllowed(false)
+            .build();
+        rangerResponseListPB.addResponses(response);
       }
-      return results;
+      return rangerResponseListPB.build();
     }
-    return plugin.isAccessAllowed(rangerRequests);
-  }
-
-  /**
-   * Gets a list of authorization decision from Ranger with the specified list
-   * of ranger access request.
-   *
-   * @param requests a list of RangerAccessRequest
-   * @return a list of RangerAccessResult
-   */
-  @VisibleForTesting
-  Collection<RangerAccessResult> authorize(Collection<RangerAccessRequest> 
requests) {
-    return plugin.isAccessAllowed(requests);
+    return authorizeRequests(requests);
   }
 
   /**
@@ -166,14 +150,15 @@ public class RangerKuduAuthorizer {
   }
 
   /**
-   * Creates a list of <code>RangerAccessRequest</code> for the given
+   * Creates a <code>RangerResponseListPB</code> for the given
    * <code>RangerRequestListPB</code>.
    *
    * @param requests the given RangerRequestListPB
    * @return a list of RangerAccessRequest
    */
-  private static List<RangerAccessRequest> createRequests(RangerRequestListPB 
requests) {
-    List<RangerAccessRequest> rangerRequests = new ArrayList<>();
+  private Ranger.RangerResponseListPB authorizeRequests(RangerRequestListPB 
requests) {
+    Ranger.RangerResponseListPB.Builder rangerResponseList = 
Ranger.RangerResponseListPB
+        .newBuilder();
     Preconditions.checkArgument(requests.hasUser());
     Preconditions.checkArgument(!requests.getUser().isEmpty());
     final String user = requests.getUser();
@@ -184,10 +169,31 @@ public class RangerKuduAuthorizer {
       String db = request.hasDatabase() ? request.getDatabase() : null;
       String table = request.hasTable() ? request.getTable() : null;
       String column = request.hasColumn() ? request.getColumn() : null;
-      rangerRequests.add(createRequest(action, user, groups,
-                                       db, table, column));
+      boolean admin = request.hasRequiresDelegateAdmin() && 
request.getRequiresDelegateAdmin();
+      RangerAccessRequest rangerAccessRequest = createRequest(action, user, 
groups, db, table,
+                                                              column);
+      RangerAccessResult rangerAccessResult = 
plugin.isAccessAllowed(rangerAccessRequest);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("RangerAccessRequest [%s] receives result 
[%s]",
+            rangerAccessResult.getAccessRequest().toString(), 
rangerAccessResult.toString()));
+      }
+      if (rangerAccessResult.getIsAllowed() && admin) {
+        rangerAccessRequest = createRequest(RangerPolicyEngine.ADMIN_ACCESS, 
user, groups, db,
+                                            table, column);
+        rangerAccessResult = plugin.isAccessAllowed(rangerAccessRequest);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("RangerAccessRequest [%s] receives result 
[%s]",
+              rangerAccessResult.getAccessRequest().toString(), 
rangerAccessResult.toString()));
+        }
+      }
+
+      Ranger.RangerResponsePB rangerResponsePB = 
Ranger.RangerResponsePB.newBuilder()
+          .setAllowed(rangerAccessResult.getIsAllowed())
+          .build();
+
+      rangerResponseList.addResponses(rangerResponsePB);
     }
-    return rangerRequests;
+    return rangerResponseList.build();
   }
 
   /**
diff --git 
a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/ranger/TestRangerSubprocess.java
 
b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/ranger/TestRangerSubprocess.java
index f018c2a..aafb06c 100644
--- 
a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/ranger/TestRangerSubprocess.java
+++ 
b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/ranger/TestRangerSubprocess.java
@@ -26,14 +26,12 @@ import java.util.List;
 import java.util.concurrent.TimeoutException;
 
 import com.google.protobuf.Any;
-import org.apache.ranger.plugin.model.RangerServiceDef;
-import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
-import org.apache.ranger.plugin.policyengine.RangerAccessResult;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import org.apache.kudu.ranger.Ranger;
 import org.apache.kudu.ranger.Ranger.ActionPB;
 import org.apache.kudu.ranger.Ranger.RangerRequestListPB;
 import org.apache.kudu.ranger.Ranger.RangerRequestPB;
@@ -105,20 +103,13 @@ public class TestRangerSubprocess extends 
SubprocessTestUtil {
     final SubprocessRequestPB subprocessRequest = 
createRangerSubprocessRequest(requestList);
 
     // Mock the authorization results.
-    List<RangerAccessResult> rangerResults = new ArrayList<>();
-    final RangerAccessResult positiveResult = new RangerAccessResult(
-        /* policyType= */1, "kudu",
-        new RangerServiceDef(), new RangerAccessRequestImpl());
-    positiveResult.setIsAllowed(true);
-    final RangerAccessResult negativeResult = new RangerAccessResult(
-        /* policyType= */1, "kudu",
-        new RangerServiceDef(), new RangerAccessRequestImpl());
-    negativeResult.setIsAllowed(false);
-    rangerResults.add(positiveResult);
-    rangerResults.add(negativeResult);
-    rangerResults.add(positiveResult);
+    RangerResponseListPB responseListPB = RangerResponseListPB.newBuilder()
+        
.addResponses(Ranger.RangerResponsePB.newBuilder().setAllowed(true).build())
+        
.addResponses(Ranger.RangerResponsePB.newBuilder().setAllowed(false).build())
+        
.addResponses(Ranger.RangerResponsePB.newBuilder().setAllowed(true).build())
+        .build();
     Mockito.when(RangerProtocolHandler.authz.authorize(requestList))
-           .thenReturn(rangerResults);
+           .thenReturn(responseListPB);
 
     SubprocessExecutor executor =
         setUpExecutorIO(NO_ERR, /*injectIOError*/false);
diff --git 
a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/ranger/authorization/TestRangerKuduAuthorizer.java
 
b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/ranger/authorization/TestRangerKuduAuthorizer.java
index faa6f5f..3b6a426 100644
--- 
a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/ranger/authorization/TestRangerKuduAuthorizer.java
+++ 
b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/ranger/authorization/TestRangerKuduAuthorizer.java
@@ -20,9 +20,7 @@ package org.apache.kudu.subprocess.ranger.authorization;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
+import java.util.List;
 
 import org.apache.ranger.plugin.model.RangerServiceDef;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
@@ -33,6 +31,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import org.apache.kudu.ranger.Ranger;
 import org.apache.kudu.test.junit.RetryRule;
 
 /**
@@ -66,17 +65,18 @@ public class TestRangerKuduAuthorizer {
         new RangerServiceDef(), mockCreateRequest);
     createResult.setIsAllowed(false);
 
-    Collection<RangerAccessRequest> requests = new ArrayList<>();
-    requests.add(mockUpdateRequest);
-    requests.add(mockCreateRequest);
-    Collection<RangerAccessResult> results = new ArrayList<>();
-    results.add(updateResult);
-    results.add(createResult);
+    
Mockito.when(authz.plugin.isAccessAllowed(Mockito.any(RangerAccessRequest.class)))
+        .thenReturn(updateResult, createResult);
 
-    Mockito.when(authz.plugin.isAccessAllowed(requests))
-           .thenReturn(results);
-    Iterator<RangerAccessResult> actualResultsIter = 
authz.authorize(requests).iterator();
-    assertTrue(actualResultsIter.next().getIsAllowed());
-    assertFalse(actualResultsIter.next().getIsAllowed());
+    Ranger.RangerRequestListPB rangerRequests = 
Ranger.RangerRequestListPB.newBuilder()
+        .addRequests(Ranger.RangerRequestPB.newBuilder().build())
+        .addRequests(Ranger.RangerRequestPB.newBuilder().build())
+        .setUser("jdoe")
+        .build();
+
+    List<Ranger.RangerResponsePB> actualResultsIter = 
authz.authorize(rangerRequests)
+        .getResponsesList();
+    assertTrue(actualResultsIter.get(0).getAllowed());
+    assertFalse(actualResultsIter.get(1).getAllowed());
   }
 }
\ No newline at end of file
diff --git a/src/kudu/integration-tests/master_authz-itest.cc 
b/src/kudu/integration-tests/master_authz-itest.cc
index 76f103a..4a4c975 100644
--- a/src/kudu/integration-tests/master_authz-itest.cc
+++ b/src/kudu/integration-tests/master_authz-itest.cc
@@ -305,6 +305,10 @@ class MasterAuthzITestHarness {
   virtual Status GrantAlterTablePrivilege(const PrivilegeParams& p) = 0;
   virtual Status GrantRenameTablePrivilege(const PrivilegeParams& p) = 0;
   virtual Status GrantGetMetadataTablePrivilege(const PrivilegeParams& p) = 0;
+  virtual Status GrantAllTablePrivilege(const PrivilegeParams& p) = 0;
+  virtual Status GrantAllDatabasePrivilege(const PrivilegeParams& p) = 0;
+  virtual Status GrantAllWithGrantTablePrivilege(const PrivilegeParams& p) = 0;
+  virtual Status GrantAllWithGrantDatabasePrivilege(const PrivilegeParams& p) 
= 0;
   virtual Status GrantGetMetadataDatabasePrivilege(const PrivilegeParams& p) = 
0;
   virtual Status CreateTable(const OperationParams& p,
                              const shared_ptr<KuduClient>& client) = 0;
@@ -332,14 +336,14 @@ class MasterAuthzITestHarness {
 
 class RangerITestHarness : public MasterAuthzITestHarness {
  public:
-  static constexpr int kSleepAfterNewPolicyMs = 1200;
+  static constexpr int kSleepAfterNewPolicyMs = 1400;
 
   Status GrantCreateTablePrivilege(const PrivilegeParams& p) override {
     AuthorizationPolicy policy;
     policy.databases.emplace_back(p.db_name);
     // IsCreateTableDone() requires METADATA on the table level.
     policy.tables.emplace_back("*");
-    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::CREATE}));
+    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::CREATE}, 
false));
     RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
     SleepFor(MonoDelta::FromMilliseconds(kSleepAfterNewPolicyMs));
 
@@ -350,7 +354,7 @@ class RangerITestHarness : public MasterAuthzITestHarness {
     AuthorizationPolicy policy;
     policy.databases.emplace_back(p.db_name);
     policy.tables.emplace_back(p.table_name);
-    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::DROP}));
+    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::DROP}, 
false));
     RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
     SleepFor(MonoDelta::FromMilliseconds(kSleepAfterNewPolicyMs));
 
@@ -368,7 +372,7 @@ class RangerITestHarness : public MasterAuthzITestHarness {
     AuthorizationPolicy policy;
     policy.databases.emplace_back(p.db_name);
     policy.tables.emplace_back(p.table_name);
-    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::ALTER}));
+    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::ALTER}, 
false));
     RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
     SleepFor(MonoDelta::FromMilliseconds(kSleepAfterNewPolicyMs));
     return Status::OK();
@@ -379,13 +383,13 @@ class RangerITestHarness : public MasterAuthzITestHarness 
{
     policy_new_table.databases.emplace_back(p.db_name);
     // IsCreateTableDone() requires METADATA on the table level.
     policy_new_table.tables.emplace_back("*");
-    policy_new_table.items.emplace_back(PolicyItem({kTestUser}, 
{ActionPB::CREATE}));
+    policy_new_table.items.emplace_back(PolicyItem({kTestUser}, 
{ActionPB::CREATE}, false));
     RETURN_NOT_OK(ranger_->AddPolicy(move(policy_new_table)));
 
     AuthorizationPolicy policy;
     policy.databases.emplace_back(p.db_name);
     policy.tables.emplace_back(p.table_name);
-    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::ALL}));
+    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::ALL}, false));
     RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
     SleepFor(MonoDelta::FromMilliseconds(kSleepAfterNewPolicyMs));
     return Status::OK();
@@ -394,7 +398,7 @@ class RangerITestHarness : public MasterAuthzITestHarness {
   Status GrantGetMetadataDatabasePrivilege(const PrivilegeParams& p) override {
     AuthorizationPolicy policy;
     policy.databases.emplace_back(p.db_name);
-    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::METADATA}));
+    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::METADATA}, 
false));
     RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
     SleepFor(MonoDelta::FromMilliseconds(kSleepAfterNewPolicyMs));
     return Status::OK();
@@ -404,12 +408,60 @@ class RangerITestHarness : public MasterAuthzITestHarness 
{
     AuthorizationPolicy policy;
     policy.databases.emplace_back(p.db_name);
     policy.tables.emplace_back(p.table_name);
-    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::METADATA}));
+    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::METADATA}, 
false));
     RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
     SleepFor(MonoDelta::FromMilliseconds(kSleepAfterNewPolicyMs));
     return Status::OK();
   }
 
+  Status GrantAllTablePrivilege(const PrivilegeParams& p) override {
+    AuthorizationPolicy policy;
+    policy.databases.emplace_back(p.db_name);
+    policy.tables.emplace_back(p.table_name);
+    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::ALL}, false));
+    RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
+    SleepFor(MonoDelta::FromMilliseconds(kSleepAfterNewPolicyMs));
+    return Status::OK();
+  }
+
+  Status GrantAllDatabasePrivilege(const PrivilegeParams& p) override {
+    AuthorizationPolicy db_policy;
+    db_policy.databases.emplace_back(p.db_name);
+    db_policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::ALL}, 
false));
+    RETURN_NOT_OK(ranger_->AddPolicy(move(db_policy)));
+    AuthorizationPolicy tbl_policy;
+    tbl_policy.databases.emplace_back(p.db_name);
+    tbl_policy.tables.emplace_back("*");
+    tbl_policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::ALL}, 
false));
+    RETURN_NOT_OK(ranger_->AddPolicy(move(tbl_policy)));
+    SleepFor(MonoDelta::FromMilliseconds(kSleepAfterNewPolicyMs));
+    return Status::OK();
+  }
+
+  Status GrantAllWithGrantTablePrivilege(const PrivilegeParams& p) override {
+    AuthorizationPolicy policy;
+    policy.databases.emplace_back(p.db_name);
+    policy.tables.emplace_back(p.table_name);
+    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::ALL}, true));
+    RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
+    SleepFor(MonoDelta::FromMilliseconds(kSleepAfterNewPolicyMs));
+    return Status::OK();
+  }
+
+  Status GrantAllWithGrantDatabasePrivilege(const PrivilegeParams& p) override 
{
+    AuthorizationPolicy db_policy;
+    db_policy.databases.emplace_back(p.db_name);
+    db_policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::ALL}, 
true));
+    RETURN_NOT_OK(ranger_->AddPolicy(move(db_policy)));
+    AuthorizationPolicy tbl_policy;
+    tbl_policy.databases.emplace_back(p.db_name);
+    tbl_policy.tables.emplace_back("*");
+    tbl_policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::ALL}, 
true));
+    RETURN_NOT_OK(ranger_->AddPolicy(move(tbl_policy)));
+    SleepFor(MonoDelta::FromMilliseconds(kSleepAfterNewPolicyMs));
+    return Status::OK();
+  }
+
   Status CreateTable(const OperationParams& p,
                      const shared_ptr<KuduClient>& client) override {
     string ranger_db;
@@ -443,7 +495,7 @@ class RangerITestHarness : public MasterAuthzITestHarness {
     AuthorizationPolicy policy;
     policy.databases.emplace_back(kDatabaseName);
     policy.tables.emplace_back("*");
-    policy.items.emplace_back(PolicyItem({kAdminUser}, {ActionPB::ALL}));
+    policy.items.emplace_back(PolicyItem({kAdminUser}, {ActionPB::ALL}, true));
     RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
     SleepFor(MonoDelta::FromMilliseconds(kSleepAfterNewPolicyMs));
     return Status::OK();
@@ -517,6 +569,22 @@ class MasterAuthzITestBase : public 
ExternalMiniClusterITestBase {
     return harness_->GrantGetMetadataTablePrivilege(p);
   }
 
+  Status GrantAllTablePrivilege(const PrivilegeParams& p) {
+    return harness_->GrantAllTablePrivilege(p);
+  }
+
+  Status GrantAllDatabasePrivilege(const PrivilegeParams& p) {
+    return harness_->GrantAllDatabasePrivilege(p);
+  }
+
+  Status GrantAllWithGrantTablePrivilege(const PrivilegeParams& p) {
+    return harness_->GrantAllWithGrantTablePrivilege(p);
+  }
+
+  Status GrantAllWithGrantDatabasePrivilege(const PrivilegeParams& p) {
+    return harness_->GrantAllWithGrantDatabasePrivilege(p);
+  }
+
   Status GrantGetMetadataDatabasePrivilege(const PrivilegeParams& p) {
     return harness_->GrantGetMetadataDatabasePrivilege(p);
   }
@@ -638,6 +706,35 @@ TEST_P(MasterAuthzITest, TestCreateTableUnauthorized) {
     .Create().IsNotAuthorized());
 }
 
+TEST_P(MasterAuthzITest, TestCreateTableDifferentOwner) {
+  ASSERT_OK(cluster_->kdc()->Kinit(kTestUser));
+  ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
+
+  KuduSchemaBuilder b;
+  b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()
+                    ->PrimaryKey();
+  KuduSchema schema;
+  ASSERT_OK(b.Build(&schema));
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+
+  const string kTableName = "another_table";
+  PrivilegeParams p = { kDatabaseName, kTableName };
+  this->GrantAllDatabasePrivilege(p);
+  this->GrantAllTablePrivilege(p);
+  this->GrantAllWithGrantTablePrivilege(p);
+  ASSERT_TRUE(table_creator->table_name(Substitute("$0.$1", kDatabaseName,
+                                                   kTableName))
+    .schema(&schema)
+    .num_replicas(1)
+    .set_range_partition_columns({"key"})
+    .set_owner("another_user")
+    .Create().IsNotAuthorized());
+
+  this->GrantAllWithGrantDatabasePrivilege(p);
+
+  ASSERT_OK(table_creator->Create());
+}
+
 // Test that the trusted user can access the cluster without being authorized.
 TEST_P(MasterAuthzITest, TestTrustedUserAcl) {
   // Log in as 'impala' and reset the client to pick up the change in user.
@@ -652,6 +749,23 @@ TEST_P(MasterAuthzITest, TestTrustedUserAcl) {
             tables_set);
 
   ASSERT_OK(this->CreateKuduTable(kDatabaseName, "new_table"));
+
+  // Create another table with a different owner
+  KuduSchemaBuilder b;
+  b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()
+                    ->PrimaryKey();
+  KuduSchema schema;
+  ASSERT_OK(b.Build(&schema));
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+
+  const string kTableName = "another_table";
+  ASSERT_OK(table_creator->table_name(Substitute("$0.$1", kDatabaseName,
+                                                 kTableName))
+    .schema(&schema)
+    .num_replicas(1)
+    .set_range_partition_columns({"key"})
+    .set_owner("another_user")
+    .Create());
   NO_FATALS(this->CheckTable(kDatabaseName, "new_table",
                              make_optional<const string&>(kImpalaUser)));
 }
diff --git a/src/kudu/integration-tests/ts_authz-itest.cc 
b/src/kudu/integration-tests/ts_authz-itest.cc
index 3b6b96d..45564ee 100644
--- a/src/kudu/integration-tests/ts_authz-itest.cc
+++ b/src/kudu/integration-tests/ts_authz-itest.cc
@@ -286,9 +286,9 @@ class TSAuthzITestHarness {
   // These abide the hierarchical definition of privileges, so if granting 
privileges on a
   // table, these grant privileges on the table and all columns in that table.
   virtual Status GrantTablePrivilege(const string& db, const string& tbl, 
const string& user,
-                                     const string& action) = 0;
+                                     const string& action, bool admin) = 0;
   virtual Status GrantColumnPrivilege(const string& db, const string& tbl, 
const string& col,
-                                      const string& user, const string& 
action) = 0;
+                                      const string& user, const string& 
action, bool admin) = 0;
 
   // Creates a table named 'table_ident' with 'kNumColsPerTable' columns.
   Status CreateTable(const string& table_ident, const shared_ptr<KuduClient>& 
client) {
@@ -352,7 +352,7 @@ class RangerITestHarness : public TSAuthzITestHarness {
     policy.databases = { kDb };
     policy.tables = { "*" };
     policy.columns = { "*" };
-    policy.items.emplace_back(PolicyItem({ kAdminUser }, { ActionPB::ALL }));
+    policy.items.emplace_back(PolicyItem({ kAdminUser }, { ActionPB::ALL }, 
true));
     RETURN_NOT_OK(ranger_->AddPolicy(std::move(policy)));
     SleepFor(MonoDelta::FromMilliseconds(kSleepAfterNewPolicyMs));
     return Status::OK();
@@ -365,23 +365,23 @@ class RangerITestHarness : public TSAuthzITestHarness {
     return Status::OK();
   }
   Status GrantTablePrivilege(const string& db, const string& tbl, const 
string& user,
-                             const string& action) override {
+                             const string& action, bool admin) override {
     AuthorizationPolicy policy;
     policy.databases = { db };
     policy.tables = { tbl };
     policy.columns = { "*" };
-    policy.items.emplace_back(PolicyItem({ user }, { StringToActionPB(action) 
}));
+    policy.items.emplace_back(PolicyItem({ user }, { StringToActionPB(action) 
}, admin));
     RETURN_NOT_OK(ranger_->AddPolicy(std::move(policy)));
     SleepFor(MonoDelta::FromMilliseconds(kSleepAfterNewPolicyMs));
     return Status::OK();
   }
   Status GrantColumnPrivilege(const string& db, const string& tbl, const 
string& col,
-                              const string& user, const string& action) 
override {
+                              const string& user, const string& action, bool 
admin) override {
     AuthorizationPolicy policy;
     policy.databases = { db };
     policy.tables = { tbl };
     policy.columns = { col };
-    policy.items.emplace_back(PolicyItem({ user }, { StringToActionPB(action) 
}));
+    policy.items.emplace_back(PolicyItem({ user }, { StringToActionPB(action) 
}, admin));
     RETURN_NOT_OK(ranger_->AddPolicy(std::move(policy)));
     SleepFor(MonoDelta::FromMilliseconds(kSleepAfterNewPolicyMs));
     return Status::OK();
@@ -495,10 +495,12 @@ TEST_P(TSAuthzITest, TestReadsAndWrites) {
     for (const string& table_name : tables) {
       RWPrivileges granted_privileges = GeneratePrivileges(cols, &prng);
       for (const auto& wp : granted_privileges.table_write_privileges) {
-        ASSERT_OK(harness_->GrantTablePrivilege(kDb, table_name, user, 
WritePrivilegeToString(wp)));
+        ASSERT_OK(harness_->GrantTablePrivilege(kDb, table_name, user, 
WritePrivilegeToString(wp),
+                                                /*admin=*/false));
       }
       for (const auto& col : granted_privileges.column_scan_privileges) {
-        ASSERT_OK(harness_->GrantColumnPrivilege(kDb, table_name, col, user, 
"SELECT"));
+        ASSERT_OK(harness_->GrantColumnPrivilege(kDb, table_name, col, user, 
"SELECT",
+                                                 /*admin=*/false));
       }
       RWPrivileges not_granted_privileges = ComplementaryPrivileges(cols, 
granted_privileges);
       InsertOrDie(&table_to_privileges, table_name,
@@ -575,12 +577,13 @@ TEST_P(TSAuthzITest, TestAlters) {
   // Note: we only need privileges on the metadata for OpenTable() calls.
   // METADATA isn't a first-class privilege and won't get carried over
   // on table rename, so we just grant INSERT privileges.
-  ASSERT_OK(harness_->GrantTablePrivilege(kDb, kTableName, user, "INSERT"));
+  ASSERT_OK(harness_->GrantTablePrivilege(kDb, kTableName, user, "INSERT", 
/*admin=*/false));
 
   // First, grant privileges on a new column that doesn't yet exist. Once that
   // column is created, we should be able to scan it immediately.
   const string new_column = Substitute("col$0", kNumColsPerTable);
-  ASSERT_OK(harness_->GrantColumnPrivilege(kDb, kTableName, new_column, user, 
"SELECT"));
+  ASSERT_OK(harness_->GrantColumnPrivilege(kDb, kTableName, new_column, user, 
"SELECT",
+                                           /*admin=*/false));
   {
     unique_ptr<KuduTableAlterer> 
table_alterer(client_->NewTableAlterer(table_ident));
     table_alterer->AddColumn(new_column)->Type(KuduColumnSchema::INT32);
@@ -592,7 +595,8 @@ TEST_P(TSAuthzITest, TestAlters) {
 
   // Now create another column and grant the user privileges for that column.
   const string another_column = Substitute("col$0", kNumColsPerTable + 1);
-  ASSERT_OK(harness_->GrantColumnPrivilege(kDb, kTableName, another_column, 
user, "SELECT"));
+  ASSERT_OK(harness_->GrantColumnPrivilege(kDb, kTableName, another_column, 
user, "SELECT",
+                                           /*admin=*/false));
   {
     unique_ptr<KuduTableAlterer> 
table_alterer(client_->NewTableAlterer(table_ident));
     table_alterer->AddColumn(another_column)->Type(KuduColumnSchema::INT32);
@@ -611,7 +615,7 @@ TEST_P(TSAuthzITest, TestAlters) {
   // We need to explicitly grant privileges on the new table name.
   const string kNewTableName = "newtable";
   const string new_table_ident = Substitute("$0.$1", kDb, kNewTableName);
-  ASSERT_OK(harness_->GrantTablePrivilege(kDb, kNewTableName, user, "SELECT"));
+  ASSERT_OK(harness_->GrantTablePrivilege(kDb, kNewTableName, user, "SELECT", 
/*admin=*/false));
   {
     unique_ptr<KuduTableAlterer> 
table_alterer(client_->NewTableAlterer(table_ident));
     table_alterer->RenameTo(new_table_ident);
diff --git a/src/kudu/master/ranger_authz_provider.cc 
b/src/kudu/master/ranger_authz_provider.cc
index 4129dd4..8317098 100644
--- a/src/kudu/master/ranger_authz_provider.cc
+++ b/src/kudu/master/ranger_authz_provider.cc
@@ -81,7 +81,7 @@ Status RangerAuthzProvider::Start() {
 
 Status RangerAuthzProvider::AuthorizeCreateTable(const string& table_name,
                                                  const string& user,
-                                                 const string& /*owner*/) {
+                                                 const string& owner) {
   if (IsTrustedUser(user)) {
     return Status::OK();
   }
@@ -92,8 +92,15 @@ Status RangerAuthzProvider::AuthorizeCreateTable(const 
string& table_name,
   RETURN_NOT_OK(ParseTableIdentifier(table_name, &db, &tbl));
 
   bool authorized;
-  // Table creation requires 'CREATE ON DATABASE' privilege.
-  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::CREATE, db, tbl, 
&authorized,
+  bool requires_delegate_admin = user != owner;
+  // Table creation requires 'CREATE ON DATABASE' privilege. If the user 
creates
+  // a table with a different owner, 'ALL' and delegate admin are required
+  // (similar to ALL WITH GRANT OPTION). This also matches the old behavior 
with
+  // Sentry: https://issues.apache.org/jira/browse/SENTRY-2151
+  RETURN_NOT_OK(client_.AuthorizeAction(user, requires_delegate_admin
+                                                ? ActionPB::ALL
+                                                : ActionPB::CREATE,
+                                        db, tbl, requires_delegate_admin, 
&authorized,
                                         RangerClient::Scope::DATABASE));
 
   if (PREDICT_FALSE(!authorized)) {
@@ -115,7 +122,8 @@ Status RangerAuthzProvider::AuthorizeDropTable(const 
string& table_name,
 
   RETURN_NOT_OK(ParseTableIdentifier(table_name, &db, &tbl));
   bool authorized;
-  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::DROP, db, tbl, 
&authorized));
+  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::DROP, db, tbl,
+                                        /*requires_delegate_admin=*/false, 
&authorized));
 
   if (PREDICT_FALSE(!authorized)) {
     LOG(WARNING) << Substitute("User $0 is not authorized to DROP $1", user, 
table_name);
@@ -139,7 +147,8 @@ Status RangerAuthzProvider::AuthorizeAlterTable(const 
string& old_table,
   // Table alteration (without table rename) requires ALTER ON TABLE.
   bool authorized;
   if (old_table == new_table) {
-    RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::ALTER, old_db, 
old_tbl, &authorized));
+    RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::ALTER, old_db, 
old_tbl,
+                                          /*requires_delegate_admin=*/false, 
&authorized));
 
     if (PREDICT_FALSE(!authorized)) {
       LOG(WARNING) << Substitute("User $0 is not authorized to ALTER $1", 
user, old_table);
@@ -151,7 +160,8 @@ Status RangerAuthzProvider::AuthorizeAlterTable(const 
string& old_table,
 
   // To prevent privilege escalation we require ALL on the old TABLE
   // and CREATE on the new DATABASE for table rename.
-  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::ALL, old_db, old_tbl, 
&authorized));
+  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::ALL, old_db, old_tbl,
+                                        /*requires_delegate_admin=*/false, 
&authorized));
   if (PREDICT_FALSE(!authorized)) {
     LOG(WARNING) << Substitute("User $0 is not authorized to perform ALL on 
$1", user, old_table);
     return Status::NotAuthorized(kUnauthorizedAction);
@@ -161,7 +171,8 @@ Status RangerAuthzProvider::AuthorizeAlterTable(const 
string& old_table,
   string new_tbl;
 
   RETURN_NOT_OK(ParseTableIdentifier(new_table, &new_db, &new_tbl));
-  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::CREATE, new_db, 
new_tbl, &authorized,
+  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::CREATE, new_db, 
new_tbl,
+                                        /*requires_delegate_admin=*/false, 
&authorized,
                                         RangerClient::Scope::DATABASE));
 
   if (PREDICT_FALSE(!authorized)) {
@@ -184,7 +195,8 @@ Status RangerAuthzProvider::AuthorizeGetTableMetadata(const 
string& table_name,
   RETURN_NOT_OK(ParseTableIdentifier(table_name, &db, &tbl));
   bool authorized;
   // Get table metadata requires 'METADATA ON TABLE' privilege.
-  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::METADATA, db, tbl, 
&authorized));
+  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::METADATA, db, tbl,
+                                        /*requires_delegate_admin=*/false, 
&authorized));
 
   if (PREDICT_FALSE(!authorized)) {
     LOG(WARNING) << Substitute("User $0 is not authorized to access METADATA 
on $1", user,
@@ -226,7 +238,8 @@ Status 
RangerAuthzProvider::AuthorizeGetTableStatistics(const string& table_name
   bool authorized;
   // Statistics contain data (e.g. number of rows) that requires the 'SELECT 
ON TABLE'
   // privilege.
-  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::SELECT, db, tbl, 
&authorized));
+  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::SELECT, db, tbl,
+                                        /*requires_delegate_admin=*/false, 
&authorized));
 
   if (PREDICT_FALSE(!authorized)) {
     LOG(WARNING) << Substitute("User $0 is not authorized to SELECT on $1", 
user, table_name);
@@ -251,7 +264,8 @@ Status RangerAuthzProvider::FillTablePrivilegePB(const 
string& table_name,
   if (IsTrustedUser(user)) {
     authorized = true;
   } else {
-    RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::ALL, db, tbl, 
&authorized));
+    RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::ALL, db, tbl,
+                                          /*requires_delegate_admin=*/false, 
&authorized));
   }
   if (authorized) {
     pb->set_delete_privilege(true);
diff --git a/src/kudu/ranger/mini_ranger-test.cc 
b/src/kudu/ranger/mini_ranger-test.cc
index bb0ec29..da5b86c 100644
--- a/src/kudu/ranger/mini_ranger-test.cc
+++ b/src/kudu/ranger/mini_ranger-test.cc
@@ -48,9 +48,7 @@ class MiniRangerTest : public KuduTest {
 };
 
 TEST_F(MiniRangerTest, TestGrantPrivilege) {
-  PolicyItem item;
-  item.first.emplace_back("testuser");
-  item.second.emplace_back(ActionPB::ALTER);
+  PolicyItem item({ "testuser" }, { ActionPB::ALTER }, false);
 
   AuthorizationPolicy policy;
   policy.databases.emplace_back("foo");
@@ -61,9 +59,7 @@ TEST_F(MiniRangerTest, TestGrantPrivilege) {
 }
 
 TEST_F(MiniRangerTest, TestPersistence) {
-  PolicyItem item;
-  item.first.emplace_back("testuser");
-  item.second.emplace_back(ActionPB::ALTER);
+  PolicyItem item({ "testuser" }, { ActionPB::ALTER }, false);
 
   AuthorizationPolicy policy;
   policy.databases.emplace_back("foo");
diff --git a/src/kudu/ranger/mini_ranger.cc b/src/kudu/ranger/mini_ranger.cc
index 713cfb1..35d90ce 100644
--- a/src/kudu/ranger/mini_ranger.cc
+++ b/src/kudu/ranger/mini_ranger.cc
@@ -308,12 +308,13 @@ Status MiniRanger::AddPolicy(AuthorizationPolicy policy) {
     EasyJson item = policy_items.PushBack(EasyJson::kObject);
 
     EasyJson users = item.Set("users", EasyJson::kArray);
-    for (const string& user : policy_item.first) {
+    for (const string& user : std::get<0>(policy_item)) {
       users.PushBack(user);
     }
 
+    item.Set("delegateAdmin", std::get<2>(policy_item));
     EasyJson accesses = item.Set("accesses", EasyJson::kArray);
-    for (const ActionPB& action : policy_item.second) {
+    for (const ActionPB& action : std::get<1>(policy_item)) {
       EasyJson access = accesses.PushBack(EasyJson::kObject);
       access.Set("type", ActionPB_Name(action));
       access.Set("isAllowed", true);
diff --git a/src/kudu/ranger/mini_ranger.h b/src/kudu/ranger/mini_ranger.h
index 5d15278..e4b7244 100644
--- a/src/kudu/ranger/mini_ranger.h
+++ b/src/kudu/ranger/mini_ranger.h
@@ -46,10 +46,10 @@ namespace ranger {
 // List of usernames to be used in PolicyItem;
 typedef std::vector<std::string> UserList;
 
-// Pair of a vector of usernames and a vector of allowed actions to be used in
-// AuthorizationPolicy. Number of users and actions doesn't have to match, 
their
-// cross-product is taken.
-typedef std::pair<UserList, std::vector<ActionPB>> PolicyItem;
+// Tuple of a vector of usernames,  a vector of allowed actions and a delegate
+// admin flag to be used in AuthorizationPolicy. Number of users and actions
+// doesn't have to match, their cross-product is taken.
+typedef std::tuple<UserList, std::vector<ActionPB>, bool> PolicyItem;
 
 // Policy key used for searching policies_ (values are PolicyItems).
 typedef std::tuple<std::vector<std::string>,
diff --git a/src/kudu/ranger/ranger.proto b/src/kudu/ranger/ranger.proto
index 2f7cabd..3ca54bc 100644
--- a/src/kudu/ranger/ranger.proto
+++ b/src/kudu/ranger/ranger.proto
@@ -67,6 +67,9 @@ message RangerRequestPB {
 
   // Action to be authorized on the resource.
   optional ActionPB action = 4;
+
+  // Whether delegate admin privilege is required to perform the action.
+  optional bool requires_delegate_admin = 5;
 }
 
 // Describes a single Ranger authorization response for a single user.
diff --git a/src/kudu/ranger/ranger_client-test.cc 
b/src/kudu/ranger/ranger_client-test.cc
index c08dd75..eee44b8 100644
--- a/src/kudu/ranger/ranger_client-test.cc
+++ b/src/kudu/ranger/ranger_client-test.cc
@@ -172,14 +172,16 @@ class RangerClientTest : public KuduTest {
 
 TEST_F(RangerClientTest, TestAuthorizeCreateTableUnauthorized) {
   bool authorized;
-  ASSERT_OK(client_.AuthorizeAction("jdoe", ActionPB::CREATE, "bar", "baz", 
&authorized));
+  ASSERT_OK(client_.AuthorizeAction("jdoe", ActionPB::CREATE, "bar", "baz",
+                                    /*requires_delegate_admin=*/false, 
&authorized));
   ASSERT_FALSE(authorized);
 }
 
 TEST_F(RangerClientTest, TestAuthorizeCreateTableAuthorized) {
   Allow("jdoe", ActionPB::CREATE, "foo", "bar");
   bool authorized;
-  ASSERT_OK(client_.AuthorizeAction("jdoe", ActionPB::CREATE, "foo", "bar", 
&authorized));
+  ASSERT_OK(client_.AuthorizeAction("jdoe", ActionPB::CREATE, "foo", "bar",
+                                    /*requires_delegate_admin=*/false, 
&authorized));
   ASSERT_TRUE(authorized);
 }
 
@@ -395,7 +397,8 @@ TEST_F(RangerClientTestBase, TestLogging) {
   // Make a request. It doesn't matter whether it succeeds or not -- debug logs
   // should include info about each request.
   bool authorized;
-  ASSERT_OK(client_->AuthorizeAction("user", ActionPB::ALL, "db", "table", 
&authorized));
+  ASSERT_OK(client_->AuthorizeAction("user", ActionPB::ALL, "db", "table",
+                                     /*requires_delegate_admin=*/false, 
&authorized));
   ASSERT_FALSE(authorized);
   {
     // Check that the Ranger client logs some DEBUG messages.
@@ -414,7 +417,8 @@ TEST_F(RangerClientTestBase, TestLogging) {
   FLAGS_ranger_overwrite_log_config = false;
   client_.reset(new RangerClient(env_, metric_entity_));
   ASSERT_OK(client_->Start());
-  ASSERT_OK(client_->AuthorizeAction("user", ActionPB::ALL, "db", "table", 
&authorized));
+  ASSERT_OK(client_->AuthorizeAction("user", ActionPB::ALL, "db", "table",
+                                     /*requires_delegate_admin=*/false, 
&authorized));
   ASSERT_FALSE(authorized);
   {
     // Our logs should still contain DEBUG messages since we didn't update the
@@ -430,7 +434,8 @@ TEST_F(RangerClientTestBase, TestLogging) {
   FLAGS_ranger_overwrite_log_config = true;
   client_.reset(new RangerClient(env_, metric_entity_));
   ASSERT_OK(client_->Start());
-  ASSERT_OK(client_->AuthorizeAction("user", ActionPB::ALL, "db", "table", 
&authorized));
+  ASSERT_OK(client_->AuthorizeAction("user", ActionPB::ALL, "db", "table",
+                                     /*requires_delegate_admin=*/false, 
&authorized));
   ASSERT_FALSE(authorized);
   {
     // We shouldn't see any DEBUG messages since the client is configured to
diff --git a/src/kudu/ranger/ranger_client.cc b/src/kudu/ranger/ranger_client.cc
index 4c1c059..be0f6f9 100644
--- a/src/kudu/ranger/ranger_client.cc
+++ b/src/kudu/ranger/ranger_client.cc
@@ -394,7 +394,8 @@ Status RangerClient::Start() {
 
 // TODO(abukor): refactor to avoid code duplication
 Status RangerClient::AuthorizeAction(const string& user_name, const ActionPB& 
action,
-                                     const string& database, const string& 
table, bool* authorized,
+                                     const string& database, const string& 
table,
+                                     bool requires_delegate_admin, bool* 
authorized,
                                      Scope scope) {
   DCHECK(subprocess_);
   RangerRequestListPB req_list;
@@ -405,6 +406,7 @@ Status RangerClient::AuthorizeAction(const string& 
user_name, const ActionPB& ac
 
   req->set_action(action);
   req->set_database(database);
+  req->set_requires_delegate_admin(requires_delegate_admin);
   // Only pass the table name if this is table level request.
   if (scope == Scope::TABLE) {
     req->set_table(table);
diff --git a/src/kudu/ranger/ranger_client.h b/src/kudu/ranger/ranger_client.h
index 7261005..804f7da 100644
--- a/src/kudu/ranger/ranger_client.h
+++ b/src/kudu/ranger/ranger_client.h
@@ -84,7 +84,8 @@ class RangerClient {
   // Authorizes an action on the table. Sets 'authorized' to true if it's
   // authorized, false otherwise.
   Status AuthorizeAction(const std::string& user_name, const ActionPB& action,
-                         const std::string& database, const std::string& 
table, bool* authorized,
+                         const std::string& database, const std::string& table,
+                         bool requires_delegate_admin, bool* authorized,
                          Scope scope = Scope::TABLE) WARN_UNUSED_RESULT;
 
   // Authorizes action on multiple tables. It sets 'table_names' to the

Reply via email to