HBASE-20357 AccessControlClient API Enhancement

Signed-off-by: tedyu <yuzhih...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/bb8826ca
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/bb8826ca
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/bb8826ca

Branch: refs/heads/HBASE-18477
Commit: bb8826ca5f3d5bcd185cf6fe4ebe7d72942b8d6d
Parents: 63477d6
Author: Pankaj <pankaj...@huawei.com>
Authored: Mon Jun 25 12:20:48 2018 +0530
Committer: tedyu <yuzhih...@gmail.com>
Committed: Thu Jun 28 22:48:58 2018 -0700

----------------------------------------------------------------------
 .../security/access/AccessControlClient.java    | 184 ++++++-
 .../security/access/AccessControlUtil.java      | 151 +++++-
 .../shaded/protobuf/ResponseConverter.java      |  10 +
 .../org/apache/hadoop/hbase/HConstants.java     |   7 +-
 .../src/main/protobuf/AccessControl.proto       |  15 +
 .../hbase/rsgroup/RSGroupAdminEndpoint.java     |   2 +-
 .../hbase/regionserver/RSRpcServices.java       |   2 +-
 .../hbase/security/access/AccessChecker.java    | 176 ++++++-
 .../security/access/AccessControlLists.java     | 190 ++++++--
 .../hbase/security/access/AccessController.java | 199 +++++---
 .../hbase/security/access/AuthResult.java       |  30 +-
 .../master/TestMasterCoprocessorServices.java   |   7 +
 .../security/access/TestAccessController.java   | 481 ++++++++++++++++++-
 13 files changed, 1301 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/bb8826ca/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java
index 0363ba2..981db76 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.regex.Pattern;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MasterNotRunningException;
@@ -255,15 +256,28 @@ public class AccessControlClient {
    * along with the list of superusers would be returned. Else, no rows get 
returned.
    * @param connection The Connection instance to use
    * @param tableRegex The regular expression string to match against
-   * @return - returns an array of UserPermissions
+   * @return List of UserPermissions
    * @throws Throwable
    */
   public static List<UserPermission> getUserPermissions(Connection connection, 
String tableRegex)
       throws Throwable {
-    /** TODO: Pass an rpcController
-    HBaseRpcController controller
-      = ((ClusterConnection) 
connection).getRpcControllerFactory().newController();
-      */
+    return getUserPermissions(connection, tableRegex, HConstants.EMPTY_STRING);
+  }
+
+  /**
+   * List all the userPermissions matching the given table pattern and user 
name.
+   * @param connection Connection
+   * @param tableRegex The regular expression string to match against
+   * @param userName User name, if empty then all user permissions will be 
retrieved.
+   * @return List of UserPermissions
+   * @throws Throwable on failure
+   */
+  public static List<UserPermission> getUserPermissions(Connection connection, 
String tableRegex,
+      String userName) throws Throwable {
+    /**
+     * TODO: Pass an rpcController HBaseRpcController controller = 
((ClusterConnection)
+     * connection).getRpcControllerFactory().newController();
+     */
     List<UserPermission> permList = new ArrayList<>();
     try (Table table = connection.getTable(ACL_TABLE_NAME)) {
       try (Admin admin = connection.getAdmin()) {
@@ -272,25 +286,167 @@ public class AccessControlClient {
             AccessControlProtos.AccessControlService.newBlockingStub(service);
         HTableDescriptor[] htds = null;
         if (tableRegex == null || tableRegex.isEmpty()) {
-          permList = AccessControlUtil.getUserPermissions(null, protocol);
-        } else if (tableRegex.charAt(0) == '@') {  // Namespaces
+          permList = AccessControlUtil.getUserPermissions(null, protocol, 
userName);
+        } else if (tableRegex.charAt(0) == '@') { // Namespaces
           String namespaceRegex = tableRegex.substring(1);
-          for (NamespaceDescriptor nsds : admin.listNamespaceDescriptors()) {  
// Read out all namespaces
+          for (NamespaceDescriptor nsds : admin.listNamespaceDescriptors()) { 
// Read out all
+                                                                              
// namespaces
             String namespace = nsds.getName();
-            if (namespace.matches(namespaceRegex)) {  // Match the given 
namespace regex?
+            if (namespace.matches(namespaceRegex)) { // Match the given 
namespace regex?
               permList.addAll(AccessControlUtil.getUserPermissions(null, 
protocol,
-                Bytes.toBytes(namespace)));
+                Bytes.toBytes(namespace), userName));
             }
           }
-        } else {  // Tables
+        } else { // Tables
           htds = admin.listTables(Pattern.compile(tableRegex), true);
-          for (HTableDescriptor hd : htds) {
-            permList.addAll(AccessControlUtil.getUserPermissions(null, 
protocol,
-              hd.getTableName()));
+          for (HTableDescriptor htd : htds) {
+            permList.addAll(AccessControlUtil.getUserPermissions(null, 
protocol, htd.getTableName(),
+              null, null, userName));
           }
         }
       }
     }
     return permList;
   }
+
+  /**
+   * List all the userPermissions matching the given table pattern and column 
family.
+   * @param connection Connection
+   * @param tableRegex The regular expression string to match against. It 
shouldn't be null, empty
+   *          or a namespace regular expression.
+   * @param columnFamily Column family
+   * @return List of UserPermissions
+   * @throws Throwable on failure
+   */
+  public static List<UserPermission> getUserPermissions(Connection connection, 
String tableRegex,
+      byte[] columnFamily) throws Throwable {
+    return getUserPermissions(connection, tableRegex, columnFamily, null, 
HConstants.EMPTY_STRING);
+  }
+
+  /**
+   * List all the userPermissions matching the given table pattern, column 
family and user name.
+   * @param connection Connection
+   * @param tableRegex The regular expression string to match against. It 
shouldn't be null, empty
+   *          or a namespace regular expression.
+   * @param columnFamily Column family
+   * @param userName User name, if empty then all user permissions will be 
retrieved.
+   * @return List of UserPermissions
+   * @throws Throwable on failure
+   */
+  public static List<UserPermission> getUserPermissions(Connection connection, 
String tableRegex,
+      byte[] columnFamily, String userName) throws Throwable {
+    return getUserPermissions(connection, tableRegex, columnFamily, null, 
userName);
+  }
+
+  /**
+   * List all the userPermissions matching the given table pattern, column 
family and column
+   * qualifier.
+   * @param connection Connection
+   * @param tableRegex The regular expression string to match against. It 
shouldn't be null, empty
+   *          or a namespace regular expression.
+   * @param columnFamily Column family
+   * @param columnQualifier Column qualifier
+   * @return List of UserPermissions
+   * @throws Throwable on failure
+   */
+  public static List<UserPermission> getUserPermissions(Connection connection, 
String tableRegex,
+      byte[] columnFamily, byte[] columnQualifier) throws Throwable {
+    return getUserPermissions(connection, tableRegex, columnFamily, 
columnQualifier,
+      HConstants.EMPTY_STRING);
+  }
+
+  /**
+   * List all the userPermissions matching the given table pattern, column 
family and column
+   * qualifier.
+   * @param connection Connection
+   * @param tableRegex The regular expression string to match against. It 
shouldn't be null, empty
+   *          or a namespace regular expression.
+   * @param columnFamily Column family
+   * @param columnQualifier Column qualifier
+   * @param userName User name, if empty then all user permissions will be 
retrieved.
+   * @return List of UserPermissions
+   * @throws Throwable on failure
+   */
+  public static List<UserPermission> getUserPermissions(Connection connection, 
String tableRegex,
+      byte[] columnFamily, byte[] columnQualifier, String userName) throws 
Throwable {
+    if (tableRegex == null || tableRegex.isEmpty() || tableRegex.charAt(0) == 
'@') {
+      throw new IllegalArgumentException("Table name can't be null or empty or 
a namespace.");
+    }
+    /**
+     * TODO: Pass an rpcController HBaseRpcController controller = 
((ClusterConnection)
+     * connection).getRpcControllerFactory().newController();
+     */
+    List<UserPermission> permList = new ArrayList<UserPermission>();
+    try (Table table = connection.getTable(ACL_TABLE_NAME)) {
+      try (Admin admin = connection.getAdmin()) {
+        CoprocessorRpcChannel service = 
table.coprocessorService(HConstants.EMPTY_START_ROW);
+        BlockingInterface protocol =
+            AccessControlProtos.AccessControlService.newBlockingStub(service);
+        HTableDescriptor[] htds = 
admin.listTables(Pattern.compile(tableRegex), true);
+        // Retrieve table permissions
+        for (HTableDescriptor htd : htds) {
+          permList.addAll(AccessControlUtil.getUserPermissions(null, protocol, 
htd.getTableName(),
+            columnFamily, columnQualifier, userName));
+        }
+      }
+    }
+    return permList;
+  }
+
+  /**
+   * Validates whether specified user has permission to perform actions on the 
mentioned table,
+   * column family or column qualifier.
+   * @param connection Connection
+   * @param tableName Table name, it shouldn't be null or empty.
+   * @param columnFamily The column family. Optional argument, can be empty. 
If empty then
+   *          validation will happen at table level.
+   * @param columnQualifier The column qualifier. Optional argument, can be 
empty. If empty then
+   *          validation will happen at table and column family level. 
columnQualifier will not be
+   *          considered if columnFamily is passed as null or empty.
+   * @param userName User name, it shouldn't be null or empty.
+   * @param actions Actions
+   * @return true if access allowed to the specified user, otherwise false.
+   * @throws Throwable on failure
+   */
+  public static boolean hasPermission(Connection connection, String tableName, 
String columnFamily,
+      String columnQualifier, String userName, Permission.Action... actions) 
throws Throwable {
+    return hasPermission(connection, tableName, Bytes.toBytes(columnFamily),
+      Bytes.toBytes(columnQualifier), userName, actions);
+  }
+
+  /**
+   * Validates whether specified user has permission to perform actions on the 
mentioned table,
+   * column family or column qualifier.
+   * @param connection Connection
+   * @param tableName Table name, it shouldn't be null or empty.
+   * @param columnFamily The column family. Optional argument, can be empty. 
If empty then
+   *          validation will happen at table level.
+   * @param columnQualifier The column qualifier. Optional argument, can be 
empty. If empty then
+   *          validation will happen at table and column family level. 
columnQualifier will not be
+   *          considered if columnFamily is passed as null or empty.
+   * @param userName User name, it shouldn't be null or empty.
+   * @param actions Actions
+   * @return true if access allowed to the specified user, otherwise false.
+   * @throws Throwable on failure
+   */
+  public static boolean hasPermission(Connection connection, String tableName, 
byte[] columnFamily,
+      byte[] columnQualifier, String userName, Permission.Action... actions) 
throws Throwable {
+    if (StringUtils.isEmpty(tableName) || StringUtils.isEmpty(userName)) {
+      throw new IllegalArgumentException("Table and user name can't be null or 
empty.");
+    }
+    boolean hasPermission = false;
+    /**
+     * todo: pass an rpccontroller hbaserpccontroller controller = 
((clusterconnection)
+     * connection).getrpccontrollerfactory().newcontroller();
+     */
+    try (Table table = connection.getTable(ACL_TABLE_NAME)) {
+      CoprocessorRpcChannel service = 
table.coprocessorService(HConstants.EMPTY_START_ROW);
+      BlockingInterface protocol =
+          AccessControlProtos.AccessControlService.newBlockingStub(service);
+      // Check whether user has permission
+      hasPermission = AccessControlUtil.hasPermission(null, protocol, 
TableName.valueOf(tableName),
+        columnFamily, columnQualifier, userName, actions);
+    }
+    return hasPermission;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/bb8826ca/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlUtil.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlUtil.java
index 53c4dd8..1b5a70c 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlUtil.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlUtil.java
@@ -22,6 +22,8 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -29,7 +31,7 @@ import 
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
 import 
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
 import 
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
 import org.apache.hadoop.hbase.util.ByteStringer;
-
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
 import org.apache.hbase.thirdparty.com.google.common.collect.ListMultimap;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -256,8 +258,31 @@ public class AccessControlUtil {
   }
 
   /**
+   * Converts a TablePermission proto to a client TablePermission object.
+   * @param proto the protobuf TablePermission
+   * @return the converted TablePermission
+   */
+  public static TablePermission 
toTablePermission(AccessControlProtos.TablePermission proto) {
+    List<Permission.Action> actions = 
toPermissionActions(proto.getActionList());
+    TableName table = null;
+    byte[] qualifier = null;
+    byte[] family = null;
+    if (!proto.hasTableName()) {
+      throw new IllegalStateException("TableName cannot be empty");
+    }
+    table = ProtobufUtil.toTableName(proto.getTableName());
+    if (proto.hasFamily()) {
+      family = proto.getFamily().toByteArray();
+    }
+    if (proto.hasQualifier()) {
+      qualifier = proto.getQualifier().toByteArray();
+    }
+    return new TablePermission(table, family, qualifier,
+        actions.toArray(new Permission.Action[actions.size()]));
+  }
+
+  /**
    * Converts a Permission proto to a client TablePermission object.
-   *
    * @param proto the protobuf Permission
    * @return the converted TablePermission
    */
@@ -539,6 +564,7 @@ public class AccessControlUtil {
    * <p>
    * It's also called by the shell, in case you want to find references.
    *
+   * @param controller RpcController
    * @param protocol the AccessControlService protocol proxy
    * @param namespace the short name of the user to grant permissions
    * @param actions the permissions to be granted
@@ -562,10 +588,11 @@ public class AccessControlUtil {
    * <p>
    * It's also called by the shell, in case you want to find references.
    *
+   * @param controller RpcController
    * @param protocol the AccessControlService protocol proxy
    * @param userShortName the short name of the user to revoke permissions
    * @param actions the permissions to be revoked
-   * @throws ServiceException
+   * @throws ServiceException on failure
    */
   public static void revoke(RpcController controller,
       AccessControlService.BlockingInterface protocol, String userShortName,
@@ -586,13 +613,14 @@ public class AccessControlUtil {
    * <p>
    * It's also called by the shell, in case you want to find references.
    *
+   * @param controller RpcController
    * @param protocol the AccessControlService protocol proxy
    * @param userShortName the short name of the user to revoke permissions
    * @param tableName optional table name
    * @param f optional column family
    * @param q optional qualifier
    * @param actions the permissions to be revoked
-   * @throws ServiceException
+   * @throws ServiceException on failure
    */
   public static void revoke(RpcController controller,
       AccessControlService.BlockingInterface protocol, String userShortName, 
TableName tableName,
@@ -612,11 +640,12 @@ public class AccessControlUtil {
    * <p>
    * It's also called by the shell, in case you want to find references.
    *
+   * @param controller RpcController
    * @param protocol the AccessControlService protocol proxy
    * @param userShortName the short name of the user to revoke permissions
    * @param namespace optional table name
    * @param actions the permissions to be revoked
-   * @throws ServiceException
+   * @throws ServiceException on failure
    */
   public static void revoke(RpcController controller,
       AccessControlService.BlockingInterface protocol, String userShortName, 
String namespace,
@@ -636,19 +665,36 @@ public class AccessControlUtil {
    * <p>
    * It's also called by the shell, in case you want to find references.
    *
+   * @param controller RpcController
    * @param protocol the AccessControlService protocol proxy
-   * @throws ServiceException
+   * @throws ServiceException on failure
    */
   public static List<UserPermission> getUserPermissions(RpcController 
controller,
       AccessControlService.BlockingInterface protocol) throws ServiceException 
{
+    return getUserPermissions(controller, protocol, HConstants.EMPTY_STRING);
+  }
+
+  /**
+   * A utility used to get user's global permissions based on the specified 
user name.
+   * @param controller RpcController
+   * @param protocol the AccessControlService protocol proxy
+   * @param userName User name, if empty then all user permissions will be 
retrieved.
+   * @throws ServiceException
+   */
+  public static List<UserPermission> getUserPermissions(RpcController 
controller,
+      AccessControlService.BlockingInterface protocol, String userName) throws 
ServiceException {
     AccessControlProtos.GetUserPermissionsRequest.Builder builder =
         AccessControlProtos.GetUserPermissionsRequest.newBuilder();
     builder.setType(AccessControlProtos.Permission.Type.Global);
+    if (!StringUtils.isEmpty(userName)) {
+      builder.setUserName(ByteString.copyFromUtf8(userName));
+    }
+
     AccessControlProtos.GetUserPermissionsRequest request = builder.build();
     AccessControlProtos.GetUserPermissionsResponse response =
         protocol.getUserPermissions(controller, request);
     List<UserPermission> perms = new 
ArrayList<>(response.getUserPermissionCount());
-    for (AccessControlProtos.UserPermission perm: 
response.getUserPermissionList()) {
+    for (AccessControlProtos.UserPermission perm : 
response.getUserPermissionList()) {
       perms.add(toUserPermission(perm));
     }
     return perms;
@@ -659,6 +705,7 @@ public class AccessControlUtil {
    * <p>
    * It's also called by the shell, in case you want to find references.
    *
+   * @param controller RpcController
    * @param protocol the AccessControlService protocol proxy
    * @param t optional table name
    * @throws ServiceException
@@ -666,17 +713,44 @@ public class AccessControlUtil {
   public static List<UserPermission> getUserPermissions(RpcController 
controller,
       AccessControlService.BlockingInterface protocol,
       TableName t) throws ServiceException {
+    return getUserPermissions(controller, protocol, t, null, null, 
HConstants.EMPTY_STRING);
+  }
+
+  /**
+   * A utility used to get user table permissions based on the column family, 
column qualifier and
+   * user name.
+   * @param controller RpcController
+   * @param protocol the AccessControlService protocol proxy
+   * @param t optional table name
+   * @param columnFamily Column family
+   * @param columnQualifier Column qualifier
+   * @param userName User name, if empty then all user permissions will be 
retrieved.
+   * @throws ServiceException
+   */
+  public static List<UserPermission> getUserPermissions(RpcController 
controller,
+      AccessControlService.BlockingInterface protocol, TableName t, byte[] 
columnFamily,
+      byte[] columnQualifier, String userName) throws ServiceException {
     AccessControlProtos.GetUserPermissionsRequest.Builder builder =
         AccessControlProtos.GetUserPermissionsRequest.newBuilder();
     if (t != null) {
       builder.setTableName(ProtobufUtil.toProtoTableName(t));
     }
+    if (Bytes.len(columnFamily) > 0) {
+      builder.setColumnFamily(ByteString.copyFrom(columnFamily));
+    }
+    if (Bytes.len(columnQualifier) > 0) {
+      builder.setColumnQualifier(ByteString.copyFrom(columnQualifier));
+    }
+    if (!StringUtils.isEmpty(userName)) {
+      builder.setUserName(ByteString.copyFromUtf8(userName));
+    }
+
     builder.setType(AccessControlProtos.Permission.Type.Table);
     AccessControlProtos.GetUserPermissionsRequest request = builder.build();
     AccessControlProtos.GetUserPermissionsResponse response =
         protocol.getUserPermissions(controller, request);
     List<UserPermission> perms = new 
ArrayList<>(response.getUserPermissionCount());
-    for (AccessControlProtos.UserPermission perm: 
response.getUserPermissionList()) {
+    for (AccessControlProtos.UserPermission perm : 
response.getUserPermissionList()) {
       perms.add(toUserPermission(perm));
     }
     return perms;
@@ -687,6 +761,7 @@ public class AccessControlUtil {
    * <p>
    * It's also called by the shell, in case you want to find references.
    *
+   * @param controller RpcController
    * @param protocol the AccessControlService protocol proxy
    * @param namespace name of the namespace
    * @throws ServiceException
@@ -694,23 +769,81 @@ public class AccessControlUtil {
   public static List<UserPermission> getUserPermissions(RpcController 
controller,
       AccessControlService.BlockingInterface protocol,
       byte[] namespace) throws ServiceException {
+    return getUserPermissions(controller, protocol, namespace, 
HConstants.EMPTY_STRING);
+  }
+
+  /**
+   * A utility used to get permissions for selected namespace based on the 
specified user name.
+   * @param controller RpcController
+   * @param protocol the AccessControlService protocol proxy
+   * @param namespace name of the namespace
+   * @param userName User name, if empty then all user permissions will be 
retrieved.
+   * @throws ServiceException
+   */
+  public static List<UserPermission> getUserPermissions(RpcController 
controller,
+      AccessControlService.BlockingInterface protocol, byte[] namespace, 
String userName)
+      throws ServiceException {
     AccessControlProtos.GetUserPermissionsRequest.Builder builder =
         AccessControlProtos.GetUserPermissionsRequest.newBuilder();
     if (namespace != null) {
       builder.setNamespaceName(ByteStringer.wrap(namespace));
     }
+    if (!StringUtils.isEmpty(userName)) {
+      builder.setUserName(ByteString.copyFromUtf8(userName));
+    }
     builder.setType(AccessControlProtos.Permission.Type.Namespace);
     AccessControlProtos.GetUserPermissionsRequest request = builder.build();
     AccessControlProtos.GetUserPermissionsResponse response =
         protocol.getUserPermissions(controller, request);
     List<UserPermission> perms = new 
ArrayList<>(response.getUserPermissionCount());
-    for (AccessControlProtos.UserPermission perm: 
response.getUserPermissionList()) {
+    for (AccessControlProtos.UserPermission perm : 
response.getUserPermissionList()) {
       perms.add(toUserPermission(perm));
     }
     return perms;
   }
 
   /**
+   * Validates whether specified user has permission to perform actions on the 
mentioned table,
+   * column family or column qualifier.
+   * @param controller RpcController
+   * @param protocol the AccessControlService protocol proxy
+   * @param tableName Table name, it shouldn't be null or empty.
+   * @param columnFamily The column family. Optional argument, can be empty. 
If empty then
+   *          validation will happen at table level.
+   * @param columnQualifier The column qualifier. Optional argument, can be 
empty. If empty then
+   *          validation will happen at table and column family level. 
columnQualifier will not be
+   *          considered if columnFamily is passed as null or empty.
+   * @param userName User name, it shouldn't be null or empty.
+   * @param actions Actions
+   * @return true if access allowed, otherwise false
+   * @throws ServiceException
+   */
+  public static boolean hasPermission(RpcController controller,
+      AccessControlService.BlockingInterface protocol, TableName tableName, 
byte[] columnFamily,
+      byte[] columnQualifier, String userName, Permission.Action[] actions)
+      throws ServiceException {
+    AccessControlProtos.TablePermission.Builder tablePermissionBuilder =
+        AccessControlProtos.TablePermission.newBuilder();
+    tablePermissionBuilder
+        
.setTableName(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toProtoTableName(tableName));
+    if (Bytes.len(columnFamily) > 0) {
+      tablePermissionBuilder.setFamily(ByteStringer.wrap(columnFamily));
+    }
+    if (Bytes.len(columnQualifier) > 0) {
+      
tablePermissionBuilder.setQualifier(ByteString.copyFrom(columnQualifier));
+    }
+    for (Permission.Action a : actions) {
+      tablePermissionBuilder.addAction(toPermissionAction(a));
+    }
+    AccessControlProtos.HasPermissionRequest request = 
AccessControlProtos.HasPermissionRequest
+        .newBuilder().setTablePermission(tablePermissionBuilder)
+        .setUserName(ByteString.copyFromUtf8(userName)).build();
+    AccessControlProtos.HasPermissionResponse response =
+        protocol.hasPermission(controller, request);
+    return response.getHasPermission();
+  }
+
+  /**
    * Convert a protobuf UserTablePermissions to a
    * ListMultimap&lt;String, TablePermission&gt; where key is username.
    *

http://git-wip-us.apache.org/repos/asf/hbase/blob/bb8826ca/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
index 255d9f5..12661a6 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.SingleResponse;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import 
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.HasPermissionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -228,6 +229,15 @@ public final class ResponseConverter {
     return parameterBuilder.build();
   }
 
+  /**
+   * Builds a protocol buffer HasPermissionResponse
+   */
+  public static HasPermissionResponse buildHasPermissionResponse(boolean 
hasPermission) {
+    HasPermissionResponse.Builder builder = HasPermissionResponse.newBuilder();
+    builder.setHasPermission(hasPermission);
+    return builder.build();
+  }
+
 // End utilities for Client
 // Start utilities for Admin
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/bb8826ca/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index dca723f..b5a5d62 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -530,10 +530,15 @@ public final class HConstants {
   // Other constants
 
   /**
-   * An empty instance.
+   * An empty byte array instance.
    */
   public static final byte [] EMPTY_BYTE_ARRAY = new byte [0];
 
+  /**
+   * An empty string instance.
+   */
+  public static final String EMPTY_STRING = "";
+
   public static final ByteBuffer EMPTY_BYTE_BUFFER = 
ByteBuffer.wrap(EMPTY_BYTE_ARRAY);
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/bb8826ca/hbase-protocol/src/main/protobuf/AccessControl.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/AccessControl.proto 
b/hbase-protocol/src/main/protobuf/AccessControl.proto
index cc0d4a5..e175e6a 100644
--- a/hbase-protocol/src/main/protobuf/AccessControl.proto
+++ b/hbase-protocol/src/main/protobuf/AccessControl.proto
@@ -96,6 +96,9 @@ message GetUserPermissionsRequest {
   optional Permission.Type type = 1;
   optional TableName table_name = 2;
   optional bytes namespace_name = 3;
+  optional bytes column_family = 4;
+  optional bytes column_qualifier = 5;
+  optional bytes user_name = 6;
 }
 
 message GetUserPermissionsResponse {
@@ -109,6 +112,15 @@ message CheckPermissionsRequest {
 message CheckPermissionsResponse {
 }
 
+message HasPermissionRequest {
+  required TablePermission table_permission = 1;
+  required bytes user_name = 2;
+}
+
+message HasPermissionResponse {
+  optional bool has_permission = 1;
+}
+
 service AccessControlService {
     rpc Grant(GrantRequest)
       returns (GrantResponse);
@@ -121,4 +133,7 @@ service AccessControlService {
 
     rpc CheckPermissions(CheckPermissionsRequest)
       returns (CheckPermissionsResponse);
+
+    rpc HasPermission(HasPermissionRequest)
+      returns (HasPermissionResponse);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/bb8826ca/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java
 
b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java
index 2efc3a4..b67e335 100644
--- 
a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java
+++ 
b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java
@@ -543,7 +543,7 @@ public class RSGroupAdminEndpoint implements 
MasterCoprocessor, MasterObserver {
   }
 
   public void checkPermission(String request) throws IOException {
-    accessChecker.requirePermission(getActiveUser(), request, Action.ADMIN);
+    accessChecker.requirePermission(getActiveUser(), request, null, 
Action.ADMIN);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/bb8826ca/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index bdb86d0..7ef986b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1294,7 +1294,7 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
 
   protected void requirePermission(String request, Permission.Action perm) 
throws IOException {
     if (accessChecker != null) {
-      accessChecker.requirePermission(RpcServer.getRequestUser().orElse(null), 
request, perm);
+      accessChecker.requirePermission(RpcServer.getRequestUser().orElse(null), 
request, null, perm);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/bb8826ca/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessChecker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessChecker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessChecker.java
index 7feeaa0..c31658f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessChecker.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessChecker.java
@@ -20,7 +20,11 @@ package org.apache.hadoop.hbase.security.access;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
@@ -32,18 +36,25 @@ import 
org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.access.Permission.Action;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.security.HadoopKerberosName;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @InterfaceAudience.Private
 public final class AccessChecker {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AccessChecker.class);
   private static final Logger AUDITLOG =
       LoggerFactory.getLogger("SecurityLogger." + 
AccessChecker.class.getName());
   // TODO: we should move to a design where we don't even instantiate an 
AccessChecker if
   // authorization is not enabled (like in RSRpcServices), instead of always 
instantiating one and
   // calling requireXXX() only to do nothing (since authorizationEnabled will 
be false).
   private TableAuthManager authManager;
+
+  /** Group service to retrieve the user group information */
+  private static Groups groupService;
+
   /**
    * if we are active, usually false, only true if 
"hbase.security.authorization"
    * has been set to true in site configuration.see HBASE-19483.
@@ -72,6 +83,7 @@ public final class AccessChecker {
       throw new NullPointerException("Error obtaining AccessChecker, zk found 
null.");
     }
     authorizationEnabled = isAuthorizationSupported(conf);
+    initGroupService(conf);
   }
 
   /**
@@ -88,9 +100,11 @@ public final class AccessChecker {
   /**
    * Authorizes that the current user has any of the given permissions to 
access the table.
    *
+   * @param user Active user to which authorization checks should be applied
+   * @param request Request type.
    * @param tableName   Table requested
    * @param permissions Actions being requested
-   * @throws IOException           if obtaining the current user fails
+   * @throws IOException if obtaining the current user fails
    * @throws AccessDeniedException if user has no authorization
    */
   public void requireAccess(User user, String request, TableName tableName,
@@ -119,14 +133,16 @@ public final class AccessChecker {
 
   /**
    * Authorizes that the current user has global privileges for the given 
action.
-   *
+   * @param user Active user to which authorization checks should be applied
+   * @param request Request type
+   * @param filterUser User name to be filtered from permission as requested
    * @param perm The action being requested
-   * @throws IOException           if obtaining the current user fails
+   * @throws IOException if obtaining the current user fails
    * @throws AccessDeniedException if authorization is denied
    */
-  public void requirePermission(User user, String request, Action perm)
+  public void requirePermission(User user, String request, String filterUser, 
Action perm)
       throws IOException {
-    requireGlobalPermission(user, request, perm, null, null);
+    requireGlobalPermission(user, request, perm, null, null, filterUser);
   }
 
   /**
@@ -134,25 +150,29 @@ public final class AccessChecker {
    * audit log message will contain context information for the operation
    * being authorized, based on the given parameters.
    *
+   * @param user Active user to which authorization checks should be applied
+   * @param request Request type
    * @param perm      Action being requested
    * @param tableName Affected table name.
    * @param familyMap Affected column families.
+   * @param filterUser User name to be filtered from permission as requested
    */
   public void requireGlobalPermission(User user, String request,
       Action perm, TableName tableName,
-      Map<byte[], ? extends Collection<byte[]>> familyMap)throws IOException {
+      Map<byte[], ? extends Collection<byte[]>> familyMap, String filterUser) 
throws IOException {
     if (!authorizationEnabled) {
       return;
     }
     AuthResult result;
     if (authManager.authorize(user, perm)) {
       result = AuthResult.allow(request, "Global check allowed", user, perm, 
tableName, familyMap);
-      result.getParams().setTableName(tableName).setFamilies(familyMap);
-      logResult(result);
     } else {
       result = AuthResult.deny(request, "Global check failed", user, perm, 
tableName, familyMap);
-      result.getParams().setTableName(tableName).setFamilies(familyMap);
-      logResult(result);
+    }
+    result.getParams().setTableName(tableName).setFamilies(familyMap);
+    result.getParams().addExtraParam("filterUser", filterUser);
+    logResult(result);
+    if (!result.isAllowed()) {
       throw new AccessDeniedException(
           "Insufficient permissions for user '" + (user != null ? 
user.getShortName() : "null")
               + "' (global, action=" + perm.toString() + ")");
@@ -164,6 +184,8 @@ public final class AccessChecker {
    * audit log message will contain context information for the operation
    * being authorized, based on the given parameters.
    *
+   * @param user Active user to which authorization checks should be applied
+   * @param request Request type
    * @param perm      Action being requested
    * @param namespace The given namespace
    */
@@ -189,12 +211,14 @@ public final class AccessChecker {
 
   /**
    * Checks that the user has the given global or namespace permission.
-   *
-   * @param namespace  The given namespace
+   * @param user Active user to which authorization checks should be applied
+   * @param request Request type
+   * @param namespace Name space as requested
+   * @param filterUser User name to be filtered from permission as requested
    * @param permissions Actions being requested
    */
   public void requireNamespacePermission(User user, String request, String 
namespace,
-      Action... permissions) throws IOException {
+      String filterUser, Action... permissions) throws IOException {
     if (!authorizationEnabled) {
       return;
     }
@@ -210,6 +234,7 @@ public final class AccessChecker {
         result = AuthResult.deny(request, "Insufficient permissions", user, 
permission, namespace);
       }
     }
+    result.getParams().addExtraParam("filterUser", filterUser);
     logResult(result);
     if (!result.isAllowed()) {
       throw new AccessDeniedException("Insufficient permissions " + 
result.toContextString());
@@ -219,7 +244,11 @@ public final class AccessChecker {
   /**
    * Checks that the user has the given global or namespace permission.
    *
+   * @param user Active user to which authorization checks should be applied
+   * @param request Request type
    * @param namespace  The given namespace
+   * @param tableName Table requested
+   * @param familyMap    Column family map requested
    * @param permissions Actions being requested
    */
   public void requireNamespacePermission(User user, String request, String 
namespace,
@@ -252,14 +281,18 @@ public final class AccessChecker {
    * Authorizes that the current user has any of the given permissions for the
    * given table, column family and column qualifier.
    *
+   * @param user Active user to which authorization checks should be applied
+   * @param request Request type
    * @param tableName Table requested
    * @param family    Column family requested
    * @param qualifier Column qualifier requested
-   * @throws IOException           if obtaining the current user fails
+   * @param filterUser User name to be filtered from permission as requested
+   * @param permissions Actions being requested
+   * @throws IOException if obtaining the current user fails
    * @throws AccessDeniedException if user has no authorization
    */
   public void requirePermission(User user, String request, TableName 
tableName, byte[] family,
-      byte[] qualifier, Action... permissions) throws IOException {
+      byte[] qualifier, String filterUser, Action... permissions) throws 
IOException {
     if (!authorizationEnabled) {
       return;
     }
@@ -273,9 +306,10 @@ public final class AccessChecker {
       } else {
         // rest of the world
         result = AuthResult.deny(request, "Insufficient permissions",
-                user, permission, tableName, family, qualifier);
+          user, permission, tableName, family, qualifier);
       }
     }
+    result.getParams().addExtraParam("filterUser", filterUser);
     logResult(result);
     if (!result.isAllowed()) {
       throw new AccessDeniedException("Insufficient permissions " + 
result.toContextString());
@@ -286,6 +320,8 @@ public final class AccessChecker {
    * Authorizes that the current user has any of the given permissions for the
    * given table, column family and column qualifier.
    *
+   * @param user Active user to which authorization checks should be applied
+   * @param request Request type
    * @param tableName Table requested
    * @param family    Column family param
    * @param qualifier Column qualifier param
@@ -323,7 +359,7 @@ public final class AccessChecker {
       TableName tableName, RegionInfo[] regionInfos, String reason)
       throws IOException {
     if (namespace != null && !namespace.isEmpty()) {
-      requireNamespacePermission(user, reason, namespace, Action.ADMIN, 
Action.CREATE);
+      requireNamespacePermission(user, reason, namespace, null, Action.ADMIN, 
Action.CREATE);
     } else if (tableName != null || (regionInfos != null && regionInfos.length 
> 0)) {
       // So, either a table or regions op. If latter, check perms ons table.
       TableName tn = tableName != null? tableName: regionInfos[0].getTable();
@@ -340,9 +376,111 @@ public final class AccessChecker {
         "Access {} for user {}; reason: {}; remote address: {}; request: {}; 
context: {}",
         (result.isAllowed() ? "allowed" : "denied"),
         (result.getUser() != null ? result.getUser().getShortName() : 
"UNKNOWN"),
-        result.getReason(),
-        RpcServer.getRemoteAddress().map(InetAddress::toString).orElse(""),
+        result.getReason(), 
RpcServer.getRemoteAddress().map(InetAddress::toString).orElse(""),
         result.getRequest(), result.toContextString());
     }
   }
+
+  /*
+   * Validate the hasPermission operation caller with the filter user. Self 
check doesn't require
+   * any privilege but for others caller must have ADMIN privilege.
+   */
+  public User validateCallerWithFilterUser(User caller, TablePermission tPerm, 
String inputUserName)
+      throws IOException {
+    User filterUser = null;
+    if (!caller.getShortName().equals(inputUserName)) {
+      // User should have admin privilege if checking permission for other 
users
+      requirePermission(caller, "hasPermission", tPerm.getTableName(), 
tPerm.getFamily(),
+        tPerm.getQualifier(), inputUserName, Action.ADMIN);
+      // Initialize user instance for the input user name
+      List<String> groups = getUserGroups(inputUserName);
+      filterUser = new InputUser(inputUserName, groups.toArray(new 
String[groups.size()]));
+    } else {
+      // User don't need ADMIN privilege for self check.
+      // Setting action as null in AuthResult to display empty action in audit 
log
+      AuthResult result = AuthResult.allow("hasPermission", "Self user 
validation allowed", caller,
+        null, tPerm.getTableName(), tPerm.getFamily(), tPerm.getQualifier());
+      logResult(result);
+      filterUser = caller;
+    }
+    return filterUser;
+  }
+
+  /**
+   * A temporary user class to instantiate User instance based on the name and 
groups.
+   */
+  public static class InputUser extends User {
+    private String name;
+    private String shortName = null;
+    private String[] groups;
+
+    public InputUser(String name, String[] groups) {
+      this.name = name;
+      this.groups = groups;
+    }
+
+    @Override
+    public String getShortName() {
+      if (this.shortName == null) {
+        try {
+          this.shortName = new HadoopKerberosName(this.name).getShortName();
+        } catch (IOException ioe) {
+          throw new IllegalArgumentException(
+              "Illegal principal name " + this.name + ": " + ioe.toString(), 
ioe);
+        }
+      }
+      return shortName;
+    }
+
+    @Override
+    public String getName() {
+      return this.name;
+    }
+
+    @Override
+    public String[] getGroupNames() {
+      return this.groups;
+    }
+
+    @Override
+    public <T> T runAs(PrivilegedAction<T> action) {
+      throw new UnsupportedOperationException(
+          "Method not supported, this class has limited implementation");
+    }
+
+    @Override
+    public <T> T runAs(PrivilegedExceptionAction<T> action)
+        throws IOException, InterruptedException {
+      throw new UnsupportedOperationException(
+          "Method not supported, this class has limited implementation");
+    }
+
+    @Override
+    public String toString() {
+      return this.name;
+    }
+  }
+
+  /*
+   * Initialize the group service.
+   */
+  private void initGroupService(Configuration conf) {
+    if (groupService == null) {
+      groupService = Groups.getUserToGroupsMappingService(conf);
+    }
+  }
+
+  /**
+   * Retrieve the groups of the given user.
+   * @param user User name
+   * @return Groups
+   */
+  public static List<String> getUserGroups(String user) {
+    try {
+      return groupService.getGroups(user);
+    } catch (IOException e) {
+      LOG.error("Error occured while retrieving group for " + user, e);
+      return new ArrayList<String>();
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/bb8826ca/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
index 219625b..b6d8fe9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.AuthUtil;
 import org.apache.hadoop.hbase.Cell;
@@ -144,7 +145,7 @@ public class AccessControlLists {
 
     Set<Permission.Action> actionSet = new TreeSet<Permission.Action>();
     if(mergeExistingPermissions){
-      List<UserPermission> perms = getUserPermissions(conf, rowKey);
+      List<UserPermission> perms = getUserPermissions(conf, rowKey, null, 
null, null, false);
       UserPermission currentPerm = null;
       for (UserPermission perm : perms) {
         if (Bytes.equals(perm.getUser(), userPerm.getUser())
@@ -228,7 +229,8 @@ public class AccessControlLists {
       removePermissionRecord(conf, userPerm, t);
     } else {
       // Get all the global user permissions from the acl table
-      List<UserPermission> permsList = getUserPermissions(conf, 
userPermissionRowKey(userPerm));
+      List<UserPermission> permsList =
+          getUserPermissions(conf, userPermissionRowKey(userPerm), null, null, 
null, false);
       List<Permission.Action> remainingActions = new ArrayList<>();
       List<Permission.Action> dropActions = 
Arrays.asList(userPerm.getActions());
       for (UserPermission perm : permsList) {
@@ -430,8 +432,8 @@ public class AccessControlLists {
           if (entry == null) {
             entry = CellUtil.cloneRow(kv);
           }
-          Pair<String,TablePermission> permissionsOfUserOnTable =
-              parsePermissionRecord(entry, kv);
+          Pair<String, TablePermission> permissionsOfUserOnTable =
+              parsePermissionRecord(entry, kv, null, null, false, null);
           if (permissionsOfUserOnTable != null) {
             String username = permissionsOfUserOnTable.getFirst();
             TablePermission permissions = permissionsOfUserOnTable.getSecond();
@@ -474,7 +476,8 @@ public class AccessControlLists {
         scanner = table.getScanner(scan);
         try {
           for (Result row : scanner) {
-            ListMultimap<String,TablePermission> resultPerms = 
parsePermissions(row.getRow(), row);
+            ListMultimap<String, TablePermission> resultPerms =
+                parsePermissions(row.getRow(), row, null, null, null, false);
             allPerms.put(row.getRow(), resultPerms);
           }
         } finally {
@@ -488,28 +491,27 @@ public class AccessControlLists {
 
   public static ListMultimap<String, TablePermission> 
getTablePermissions(Configuration conf,
       TableName tableName) throws IOException {
-    return getPermissions(conf, tableName != null ? tableName.getName() : 
null, null);
+    return getPermissions(conf, tableName != null ? tableName.getName() : 
null, null, null, null,
+      null, false);
   }
 
   @VisibleForTesting
   public static ListMultimap<String, TablePermission> 
getNamespacePermissions(Configuration conf,
       String namespace) throws IOException {
-    return getPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)), 
null);
+    return getPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)), 
null, null, null, null,
+      false);
   }
 
   /**
-   * Reads user permission assignments stored in the <code>l:</code> column
-   * family of the first table row in <code>_acl_</code>.
-   *
+   * Reads user permission assignments stored in the <code>l:</code> column 
family of the first
+   * table row in <code>_acl_</code>.
    * <p>
-   * See {@link AccessControlLists class documentation} for the key structure
-   * used for storage.
+   * See {@link AccessControlLists class documentation} for the key structure 
used for storage.
    * </p>
    */
-  static ListMultimap<String, TablePermission> getPermissions(Configuration 
conf,
-      byte[] entryName, Table t) throws IOException {
+  static ListMultimap<String, TablePermission> getPermissions(Configuration 
conf, byte[] entryName,
+      Table t, byte[] cf, byte[] cq, String user, boolean hasFilterUser) 
throws IOException {
     if (entryName == null) entryName = ACL_GLOBAL_NAME;
-
     // for normal user tables, we just read the table row from _acl_
     ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
     Get get = new Get(entryName);
@@ -525,7 +527,7 @@ public class AccessControlLists {
       row = t.get(get);
     }
     if (!row.isEmpty()) {
-      perms = parsePermissions(entryName, row);
+      perms = parsePermissions(entryName, row, cf, cq, user, hasFilterUser);
     } else {
       LOG.info("No permissions found in " + ACL_TABLE_NAME + " for acl entry "
           + Bytes.toString(entryName));
@@ -535,34 +537,50 @@ public class AccessControlLists {
   }
 
   /**
-   * Returns the currently granted permissions for a given table as a list of
-   * user plus associated permissions.
+   * Returns the currently granted permissions for a given table as the 
specified user plus
+   * associated permissions.
    */
-  static List<UserPermission> getUserTablePermissions(
-      Configuration conf, TableName tableName) throws IOException {
-    return getUserPermissions(conf, tableName == null ? null : 
tableName.getName());
+  static List<UserPermission> getUserTablePermissions(Configuration conf, 
TableName tableName,
+      byte[] cf, byte[] cq, String userName, boolean hasFilterUser) throws 
IOException {
+    return getUserPermissions(conf, tableName == null ? null : 
tableName.getName(), cf, cq,
+      userName, hasFilterUser);
   }
 
-  static List<UserPermission> getUserNamespacePermissions(
-      Configuration conf, String namespace) throws IOException {
-    return getUserPermissions(conf, 
Bytes.toBytes(toNamespaceEntry(namespace)));
+  /**
+   * Returns the currently granted permissions for a given namespace as the 
specified user plus
+   * associated permissions.
+   */
+  static List<UserPermission> getUserNamespacePermissions(Configuration conf, 
String namespace,
+      String user, boolean hasFilterUser) throws IOException {
+    return getUserPermissions(conf, 
Bytes.toBytes(toNamespaceEntry(namespace)), null, null, user,
+      hasFilterUser);
   }
 
-  static List<UserPermission> getUserPermissions(
-      Configuration conf, byte[] entryName)
-          throws IOException {
-    ListMultimap<String,TablePermission> allPerms = getPermissions(
-        conf, entryName, null);
+  /**
+   * Returns the currently granted permissions for a given table/namespace 
with associated
+   * permissions based on the specified column family, column qualifier and 
user name.
+   * @param conf the configuration
+   * @param entryName Table name or the namespace
+   * @param cf Column family
+   * @param cq Column qualifier
+   * @param user User name to be filtered from permission as requested
+   * @param hasFilterUser true if filter user is provided, otherwise false.
+   * @return List of UserPermissions
+   * @throws IOException on failure
+   */
+  static List<UserPermission> getUserPermissions(Configuration conf, byte[] 
entryName, byte[] cf,
+      byte[] cq, String user, boolean hasFilterUser) throws IOException {
+    ListMultimap<String, TablePermission> allPerms =
+        getPermissions(conf, entryName, null, cf, cq, user, hasFilterUser);
 
     List<UserPermission> perms = new ArrayList<>();
-
-    if(isNamespaceEntry(entryName)) {  // Namespace
+    if (isNamespaceEntry(entryName)) { // Namespace
       for (Map.Entry<String, TablePermission> entry : allPerms.entries()) {
         UserPermission up = new UserPermission(Bytes.toBytes(entry.getKey()),
             entry.getValue().getNamespace(), entry.getValue().getActions());
         perms.add(up);
       }
-    } else {  // Table
+    } else { // Table
       for (Map.Entry<String, TablePermission> entry : allPerms.entries()) {
         UserPermission up = new UserPermission(Bytes.toBytes(entry.getKey()),
             entry.getValue().getTableName(), entry.getValue().getFamily(),
@@ -570,17 +588,21 @@ public class AccessControlLists {
         perms.add(up);
       }
     }
+
     return perms;
   }
 
-  private static ListMultimap<String, TablePermission> parsePermissions(
-      byte[] entryName, Result result) {
+  /**
+   * Parse and filter permission based on the specified column family, column 
qualifier and user
+   * name.
+   */
+  private static ListMultimap<String, TablePermission> parsePermissions(byte[] 
entryName,
+      Result result, byte[] cf, byte[] cq, String user, boolean hasFilterUser) 
{
     ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
     if (result != null && result.size() > 0) {
       for (Cell kv : result.rawCells()) {
-
-        Pair<String,TablePermission> permissionsOfUserOnTable =
-            parsePermissionRecord(entryName, kv);
+        Pair<String, TablePermission> permissionsOfUserOnTable =
+            parsePermissionRecord(entryName, kv, cf, cq, hasFilterUser, user);
 
         if (permissionsOfUserOnTable != null) {
           String username = permissionsOfUserOnTable.getFirst();
@@ -592,11 +614,10 @@ public class AccessControlLists {
     return perms;
   }
 
-  private static Pair<String, TablePermission> parsePermissionRecord(
-      byte[] entryName, Cell kv) {
+  private static Pair<String, TablePermission> parsePermissionRecord(byte[] 
entryName, Cell kv,
+      byte[] cf, byte[] cq, boolean filterPerms, String filterUser) {
     // return X given a set of permissions encoded in the permissionRecord kv.
     byte[] family = CellUtil.cloneFamily(kv);
-
     if (!Bytes.equals(family, ACL_LIST_FAMILY)) {
       return null;
     }
@@ -613,9 +634,25 @@ public class AccessControlLists {
     // TODO: avoid the string conversion to make this more efficient
     String username = Bytes.toString(key);
 
-    //Handle namespace entry
-    if(isNamespaceEntry(entryName)) {
-      return new Pair<>(username, new 
TablePermission(Bytes.toString(fromNamespaceEntry(entryName)), value));
+    // Retrieve group list for the filterUser if cell key is a group.
+    // Group list is not required when filterUser itself a group
+    List<String> filterUserGroups = null;
+    if (filterPerms) {
+      if (username.charAt(0) == '@' && !StringUtils.isEmpty(filterUser)
+          && filterUser.charAt(0) != '@') {
+        filterUserGroups = AccessChecker.getUserGroups(filterUser);
+      }
+    }
+
+    // Handle namespace entry
+    if (isNamespaceEntry(entryName)) {
+      // Filter the permissions cell record if client query
+      if (filterPerms && !validateFilterUser(username, filterUser, 
filterUserGroups)) {
+        return null;
+      }
+
+      return new Pair<>(username,
+          new TablePermission(Bytes.toString(fromNamespaceEntry(entryName)), 
value));
     }
 
     //Handle table and global entry
@@ -635,14 +672,71 @@ public class AccessControlLists {
       }
     }
 
-    return new Pair<>(username, new 
TablePermission(TableName.valueOf(entryName), permFamily, permQualifier, 
value));
+    // Filter the permissions cell record if client query
+    if (filterPerms) {
+      // ACL table contain 3 types of cell key entries; hbase:Acl, namespace 
and table. So to filter
+      // the permission cell records additional validations are required at 
CF, CQ and username.
+      // Here we can proceed based on client input whether it contain 
filterUser.
+      // Validate the filterUser when specified
+      if (filterUser != null && !validateFilterUser(username, filterUser, 
filterUserGroups)) {
+        return null;
+      }
+      if (!validateCFAndCQ(permFamily, cf, permQualifier, cq)) {
+        return null;
+      }
+    }
+
+    return new Pair<>(username,
+        new TablePermission(TableName.valueOf(entryName), permFamily, 
permQualifier, value));
+  }
+
+  /*
+   * Validate the cell key with the client filterUser if specified in the 
query input. 1. If cell
+   * key (username) is not a group then check whether client filterUser is 
equal to username 2. If
+   * cell key (username) is a group then check whether client filterUser 
belongs to the cell key
+   * group (username) 3. In case when both filterUser and username are group 
names then cell will be
+   * filtered if not equal.
+   */
+  private static boolean validateFilterUser(String username, String filterUser,
+      List<String> filterUserGroups) {
+    if (filterUserGroups == null) {
+      // Validate user name or group names whether equal
+      if (filterUser.equals(username)) {
+        return true;
+      }
+    } else {
+      // Check whether filter user belongs to the cell key group.
+      return filterUserGroups.contains(username.substring(1));
+    }
+    return false;
+  }
+
+  /*
+   * Validate the cell with client CF and CQ if specified in the query input. 
1. If CF is NULL, then
+   * no need of further validation, result should include all CF and CQ. 2. IF 
CF specified and
+   * equal then validation required at CQ level if CF specified in client 
input, otherwise return
+   * all CQ records.
+   */
+  private static boolean validateCFAndCQ(byte[] permFamily, byte[] cf, byte[] 
permQualifier,
+      byte[] cq) {
+    boolean include = true;
+    if (cf != null) {
+      if (Bytes.equals(cf, permFamily)) {
+        if (cq != null && !Bytes.equals(cq, permQualifier)) {
+          // if CQ specified and didn't match then ignore this cell
+          include = false;
+        }
+      } else {
+        // if CF specified and didn't match then ignore this cell
+        include = false;
+      }
+    }
+    return include;
   }
 
   /**
-   * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} 
instances
-   * and returns the resulting byte array.
-   *
-   * Writes a set of permission [user: table permission]
+   * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} 
instances and returns the
+   * resulting byte array. Writes a set of permission [user: table permission]
    */
   public static byte[] writePermissionsAsBytes(ListMultimap<String, 
TablePermission> perms,
       Configuration conf) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/bb8826ca/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 52b7a92..1100500 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -36,6 +36,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ArrayBackedTag;
 import org.apache.hadoop.hbase.Cell;
@@ -98,6 +99,8 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
 import 
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
+import 
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.HasPermissionRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.HasPermissionResponse;
 import org.apache.hadoop.hbase.quotas.GlobalQuotaSettings;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
@@ -119,6 +122,7 @@ import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.access.Permission.Action;
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.util.ByteRange;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -240,8 +244,7 @@ public class AccessController implements MasterCoprocessor, 
RegionCoprocessor,
   private void initialize(RegionCoprocessorEnvironment e) throws IOException {
     final Region region = e.getRegion();
     Configuration conf = e.getConfiguration();
-    Map<byte[], ListMultimap<String,TablePermission>> tables =
-        AccessControlLists.loadAll(region);
+    Map<byte[], ListMultimap<String, TablePermission>> tables = 
AccessControlLists.loadAll(region);
     // For each table, write out the table's permissions to the respective
     // znode for that table.
     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
@@ -284,7 +287,7 @@ public class AccessController implements MasterCoprocessor, 
RegionCoprocessor,
       for (byte[] entry : entries) {
         currentEntry = entry;
         ListMultimap<String, TablePermission> perms =
-            AccessControlLists.getPermissions(conf, entry, t);
+            AccessControlLists.getPermissions(conf, entry, t, null, null, 
null, false);
         byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, 
conf);
         zkw.writeToZookeeper(entry, serialized);
       }
@@ -295,31 +298,29 @@ public class AccessController implements 
MasterCoprocessor, RegionCoprocessor,
   }
 
   /**
-   * Check the current user for authorization to perform a specific action
-   * against the given set of row data.
-   *
-   * <p>Note: Ordering of the authorization checks
-   * has been carefully optimized to short-circuit the most common requests
-   * and minimize the amount of processing required.</p>
-   *
+   * Check the current user for authorization to perform a specific action 
against the given set of
+   * row data.
+   * <p>
+   * Note: Ordering of the authorization checks has been carefully optimized 
to short-circuit the
+   * most common requests and minimize the amount of processing required.
+   * </p>
+   * @param request User request
+   * @param user User name
    * @param permRequest the action being requested
    * @param e the coprocessor environment
-   * @param families the map of column families to qualifiers present in
-   * the request
+   * @param tableName Table name
+   * @param families the map of column families to qualifiers present in the 
request
    * @return an authorization result
    */
   private AuthResult permissionGranted(String request, User user, Action 
permRequest,
-      RegionCoprocessorEnvironment e,
-      Map<byte [], ? extends Collection<?>> families) {
-    RegionInfo hri = e.getRegion().getRegionInfo();
-    TableName tableName = hri.getTable();
-
+      RegionCoprocessorEnvironment e, TableName tableName,
+      Map<byte[], ? extends Collection<?>> families) {
     // 1. All users need read access to hbase:meta table.
     // this is a very common operation, so deal with it quickly.
-    if (hri.isMetaRegion()) {
+    if (TableName.META_TABLE_NAME.equals(tableName)) {
       if (permRequest == Action.READ) {
-        return AuthResult.allow(request, "All users allowed", user,
-          permRequest, tableName, families);
+        return AuthResult.allow(request, "All users allowed", user, 
permRequest, tableName,
+          families);
       }
     }
 
@@ -398,7 +399,8 @@ public class AccessController implements MasterCoprocessor, 
RegionCoprocessor,
       Map<byte [], ? extends Collection<?>> families, Action... actions) {
     AuthResult result = null;
     for (Action action: actions) {
-      result = permissionGranted(opType.toString(), user, action, e, families);
+      result = permissionGranted(opType.toString(), user, action, e,
+        e.getRegion().getRegionInfo().getTable(), families);
       if (!result.isAllowed()) {
         return result;
       }
@@ -413,14 +415,14 @@ public class AccessController implements 
MasterCoprocessor, RegionCoprocessor,
 
   public void requirePermission(ObserverContext<?> ctx, String request,
       Action perm) throws IOException {
-    accessChecker.requirePermission(getActiveUser(ctx), request, perm);
+    accessChecker.requirePermission(getActiveUser(ctx), request, null, perm);
   }
 
   public void requireGlobalPermission(ObserverContext<?> ctx, String request,
       Action perm, TableName tableName,
       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
-    accessChecker.requireGlobalPermission(getActiveUser(ctx),
-        request, perm,tableName, familyMap);
+    accessChecker.requireGlobalPermission(getActiveUser(ctx), request, perm, 
tableName, familyMap,
+      null);
   }
 
   public void requireGlobalPermission(ObserverContext<?> ctx, String request,
@@ -432,7 +434,7 @@ public class AccessController implements MasterCoprocessor, 
RegionCoprocessor,
   public void requireNamespacePermission(ObserverContext<?> ctx, String 
request, String namespace,
       Action... permissions) throws IOException {
     accessChecker.requireNamespacePermission(getActiveUser(ctx),
-        request, namespace, permissions);
+        request, namespace, null, permissions);
   }
 
   public void requireNamespacePermission(ObserverContext<?> ctx, String 
request, String namespace,
@@ -446,7 +448,7 @@ public class AccessController implements MasterCoprocessor, 
RegionCoprocessor,
   public void requirePermission(ObserverContext<?> ctx, String request, 
TableName tableName,
       byte[] family, byte[] qualifier, Action... permissions) throws 
IOException {
     accessChecker.requirePermission(getActiveUser(ctx), request,
-        tableName, family, qualifier, permissions);
+        tableName, family, qualifier, null, permissions);
   }
 
   public void requireTablePermission(ObserverContext<?> ctx, String request,
@@ -936,7 +938,8 @@ public class AccessController implements MasterCoprocessor, 
RegionCoprocessor,
     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
       @Override
       public Void run() throws Exception {
-        List<UserPermission> acls = 
AccessControlLists.getUserTablePermissions(conf, tableName);
+        List<UserPermission> acls =
+            AccessControlLists.getUserTablePermissions(conf, tableName, null, 
null, null, false);
         if (acls != null) {
           tableAcls.put(tableName, acls);
         }
@@ -1040,7 +1043,7 @@ public class AccessController implements 
MasterCoprocessor, RegionCoprocessor,
   public void preGetLocks(ObserverContext<MasterCoprocessorEnvironment> ctx)
       throws IOException {
     User user = getActiveUser(ctx);
-    accessChecker.requirePermission(user, "getLocks", Action.ADMIN);
+    accessChecker.requirePermission(user, "getLocks", null, Action.ADMIN);
   }
 
   @Override
@@ -1152,7 +1155,8 @@ public class AccessController implements 
MasterCoprocessor, RegionCoprocessor,
           "Snapshot owner check allowed", user, null, null, null);
       AccessChecker.logResult(result);
     } else {
-      accessChecker.requirePermission(user, "listSnapshot " + 
snapshot.getName(), Action.ADMIN);
+      accessChecker.requirePermission(user, "listSnapshot " + 
snapshot.getName(), null,
+        Action.ADMIN);
     }
   }
 
@@ -1168,7 +1172,8 @@ public class AccessController implements 
MasterCoprocessor, RegionCoprocessor,
         "Snapshot owner check allowed", user, null, 
hTableDescriptor.getTableName(), null);
       AccessChecker.logResult(result);
     } else {
-      accessChecker.requirePermission(user, "cloneSnapshot " + 
snapshot.getName(), Action.ADMIN);
+      accessChecker.requirePermission(user, "cloneSnapshot " + 
snapshot.getName(), null,
+        Action.ADMIN);
     }
   }
 
@@ -1179,9 +1184,10 @@ public class AccessController implements 
MasterCoprocessor, RegionCoprocessor,
     User user = getActiveUser(ctx);
     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)) {
       accessChecker.requirePermission(user, "restoreSnapshot " + 
snapshot.getName(),
-          hTableDescriptor.getTableName(), null, null, 
Permission.Action.ADMIN);
+        hTableDescriptor.getTableName(), null, null, null, 
Permission.Action.ADMIN);
     } else {
-      accessChecker.requirePermission(user, "restoreSnapshot " + 
snapshot.getName(), Action.ADMIN);
+      accessChecker.requirePermission(user, "restoreSnapshot " + 
snapshot.getName(), null,
+        Action.ADMIN);
     }
   }
 
@@ -1195,7 +1201,8 @@ public class AccessController implements 
MasterCoprocessor, RegionCoprocessor,
           "Snapshot owner check allowed", user, null, null, null);
       AccessChecker.logResult(result);
     } else {
-      accessChecker.requirePermission(user, "deleteSnapshot " + 
snapshot.getName(), Action.ADMIN);
+      accessChecker.requirePermission(user, "deleteSnapshot " + 
snapshot.getName(), null,
+        Action.ADMIN);
     }
   }
 
@@ -1255,8 +1262,8 @@ public class AccessController implements 
MasterCoprocessor, RegionCoprocessor,
     while (itr.hasNext()) {
       NamespaceDescriptor desc = itr.next();
       try {
-        accessChecker.requireNamespacePermission(user, "listNamespaces",
-            desc.getName(), Action.ADMIN);
+        accessChecker.requireNamespacePermission(user, "listNamespaces", 
desc.getName(), null,
+          Action.ADMIN);
       } catch (AccessDeniedException e) {
         itr.remove();
       }
@@ -1971,10 +1978,8 @@ public class AccessController implements 
MasterCoprocessor, RegionCoprocessor,
     User user = getActiveUser(ctx);
     for(Pair<byte[],String> el : familyPaths) {
       accessChecker.requirePermission(user, "preBulkLoadHFile",
-          ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(),
-          el.getFirst(),
-          null,
-          Action.CREATE);
+        ctx.getEnvironment().getRegion().getTableDescriptor().getTableName(), 
el.getFirst(), null,
+        null, Action.CREATE);
     }
   }
 
@@ -2048,11 +2053,11 @@ public class AccessController implements 
MasterCoprocessor, RegionCoprocessor,
           case Global :
           case Table :
             accessChecker.requirePermission(caller, "grant", 
perm.getTableName(),
-                perm.getFamily(), perm.getQualifier(), Action.ADMIN);
+                perm.getFamily(), perm.getQualifier(), null, Action.ADMIN);
             break;
           case Namespace :
             accessChecker.requireNamespacePermission(caller, "grant", 
perm.getNamespace(),
-                Action.ADMIN);
+                null, Action.ADMIN);
            break;
         }
 
@@ -2106,11 +2111,11 @@ public class AccessController implements 
MasterCoprocessor, RegionCoprocessor,
           case Global :
           case Table :
             accessChecker.requirePermission(caller, "revoke", 
perm.getTableName(), perm.getFamily(),
-              perm.getQualifier(), Action.ADMIN);
+              perm.getQualifier(), null, Action.ADMIN);
             break;
           case Namespace :
             accessChecker.requireNamespacePermission(caller, "revoke", 
perm.getNamespace(),
-                Action.ADMIN);
+                null, Action.ADMIN);
             break;
         }
 
@@ -2156,42 +2161,73 @@ public class AccessController implements 
MasterCoprocessor, RegionCoprocessor,
         User caller = RpcServer.getRequestUser().orElse(null);
 
         List<UserPermission> perms = null;
+        // Initialize username, cf and cq. Set to null if request doesn't have.
+        final String userName = request.hasUserName() ? 
request.getUserName().toStringUtf8() : null;
+        final byte[] cf =
+            request.hasColumnFamily() ? 
request.getColumnFamily().toByteArray() : null;
+        final byte[] cq =
+            request.hasColumnQualifier() ? 
request.getColumnQualifier().toByteArray() : null;
+
         if (request.getType() == AccessControlProtos.Permission.Type.Table) {
           final TableName table = request.hasTableName() ?
             ProtobufUtil.toTableName(request.getTableName()) : null;
-          accessChecker.requirePermission(caller, "userPermissions",
-              table, null, null, Action.ADMIN);
+          accessChecker.requirePermission(caller, "userPermissions", table, 
cf, cq, userName,
+            Action.ADMIN);
           perms = User.runAsLoginUser(new 
PrivilegedExceptionAction<List<UserPermission>>() {
             @Override
             public List<UserPermission> run() throws Exception {
-              return 
AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), table);
+              if (cf != null || userName != null) {
+                // retrieve permission based on the requested parameters
+                return 
AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(),
+                  table, cf, cq, userName, true);
+              } else {
+                return 
AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(),
+                  table, null, null, null, false);
+              }
             }
           });
         } else if (request.getType() == 
AccessControlProtos.Permission.Type.Namespace) {
           final String namespace = request.getNamespaceName().toStringUtf8();
           accessChecker.requireNamespacePermission(caller, "userPermissions",
-              namespace, Action.ADMIN);
+            namespace,userName, Action.ADMIN);
           perms = User.runAsLoginUser(new 
PrivilegedExceptionAction<List<UserPermission>>() {
             @Override
             public List<UserPermission> run() throws Exception {
-              return 
AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
-                namespace);
+              if (userName != null) {
+                // retrieve permission based on the requested parameters
+                return 
AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
+                  namespace, userName, true);
+              } else {
+                return 
AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
+                  namespace, null, false);
+              }
             }
           });
         } else {
-          accessChecker.requirePermission(caller, "userPermissions", 
Action.ADMIN);
+          accessChecker.requirePermission(caller, "userPermissions", userName, 
Action.ADMIN);
           perms = User.runAsLoginUser(new 
PrivilegedExceptionAction<List<UserPermission>>() {
             @Override
             public List<UserPermission> run() throws Exception {
-              return 
AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null);
+              if (userName != null) {
+                // retrieve permission based on the requested parameters
+                return 
AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null,
+                  null, null, userName, true);
+              } else {
+                return 
AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null,
+                  null, null, null, false);
+              }
             }
           });
-          // Adding superusers explicitly to the result set as 
AccessControlLists do not store them.
-          // Also using acl as table name to be inline  with the results of 
global admin and will
-          // help in avoiding any leakage of information about being 
superusers.
-          for (String user: Superusers.getSuperUsers()) {
-            perms.add(new UserPermission(Bytes.toBytes(user), 
AccessControlLists.ACL_TABLE_NAME,
-                null, Action.values()));
+
+          // Skip super users when filter user is specified
+          if (userName == null) {
+            // Adding superusers explicitly to the result set as 
AccessControlLists do not store
+            // them. Also using acl as table name to be inline with the 
results of global admin and
+            // will help in avoiding any leakage of information about being 
superusers.
+            for (String user : Superusers.getSuperUsers()) {
+              perms.add(new UserPermission(Bytes.toBytes(user), 
AccessControlLists.ACL_TABLE_NAME,
+                  null, Action.values()));
+            }
           }
         }
         response = AccessControlUtil.buildGetUserPermissionsResponse(perms);
@@ -2243,7 +2279,7 @@ public class AccessController implements 
MasterCoprocessor, RegionCoprocessor,
             }
 
             AuthResult result = permissionGranted("checkPermissions", user, 
action, regionEnv,
-              familyMap);
+              regionEnv.getRegion().getRegionInfo().getTable(), familyMap);
             AccessChecker.logResult(result);
             if (!result.isAllowed()) {
               // Even if passive we need to throw an exception here, we 
support checking
@@ -2550,4 +2586,51 @@ public class AccessController implements 
MasterCoprocessor, RegionCoprocessor,
     }
     return userProvider.getCurrent();
   }
+
+  @Override
+  public void hasPermission(RpcController controller, HasPermissionRequest 
request,
+      RpcCallback<HasPermissionResponse> done) {
+    // Converts proto to a TablePermission object.
+    TablePermission tPerm = 
AccessControlUtil.toTablePermission(request.getTablePermission());
+    // Check input user name
+    if (!request.hasUserName()) {
+      throw new IllegalStateException("Input username cannot be empty");
+    }
+    final String inputUserName = request.getUserName().toStringUtf8();
+    AccessControlProtos.HasPermissionResponse response = null;
+    try {
+      User caller = RpcServer.getRequestUser().orElse(null);
+      // User instance for the input user name
+      User filterUser = accessChecker.validateCallerWithFilterUser(caller, 
tPerm, inputUserName);
+
+      // Initialize family and qualifier map
+      Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], 
Set<byte[]>>(Bytes.BYTES_COMPARATOR);
+      if (tPerm.getFamily() != null) {
+        if (tPerm.getQualifier() != null) {
+          Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
+          qualifiers.add(tPerm.getQualifier());
+          familyMap.put(tPerm.getFamily(), qualifiers);
+        } else {
+          familyMap.put(tPerm.getFamily(), null);
+        }
+      }
+
+      // Iterate each action and check whether permission granted
+      boolean hasPermission = false;
+      for (Action action : tPerm.getActions()) {
+        AuthResult result = permissionGranted("hasPermission", filterUser, 
action, regionEnv,
+          tPerm.getTableName(), familyMap);
+        if (!result.isAllowed()) {
+          hasPermission = false;
+          // Break the loop is any action is not allowed
+          break;
+        }
+        hasPermission = true;
+      }
+      response = ResponseConverter.buildHasPermissionResponse(hasPermission);
+    } catch (IOException ioe) {
+      ResponseConverter.setControllerException(controller, ioe);
+    }
+    done.run(response);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/bb8826ca/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AuthResult.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AuthResult.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AuthResult.java
index cecca41..64a8c4c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AuthResult.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AuthResult.java
@@ -19,7 +19,9 @@
 package org.apache.hadoop.hbase.security.access;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
@@ -254,6 +256,13 @@ public class AuthResult {
     private Map<byte[], ? extends Collection<?>> families = null;
     byte[] family = null;
     byte[] qualifier = null;
+    // For extra parameters to be shown in audit log
+    private final Map<String, String> extraParams = new HashMap<String, 
String>(2);
+
+    public Params addExtraParam(String key, String value) {
+      extraParams.put(key, value);
+      return this;
+    }
 
     public Params setNamespace(String namespace) {
       this.namespace = namespace;
@@ -286,10 +295,29 @@ public class AuthResult {
       String[] params = new String[] {
           namespace != null ? "namespace=" + namespace : null,
           tableName != null ? "table=" + 
tableName.getNameWithNamespaceInclAsString() : null,
-          familiesString.length() > 0 ? "family=" + familiesString : null
+          familiesString.length() > 0 ? "family=" + familiesString : null,
+          extraParams.isEmpty() ? null : concatenateExtraParams()
       };
       return Joiner.on(",").skipNulls().join(params);
     }
 
+    /**
+     * @return extra parameter key/value string
+     */
+    private String concatenateExtraParams() {
+      final StringBuilder sb = new StringBuilder();
+      boolean first = true;
+      for (Entry<String, String> entry : extraParams.entrySet()) {
+        if (entry.getKey() != null && entry.getValue() != null) {
+          if (!first) {
+            sb.append(',');
+          }
+          first = false;
+          sb.append(entry.getKey() + '=');
+          sb.append(entry.getValue());
+        }
+      }
+      return sb.toString();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/bb8826ca/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterCoprocessorServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterCoprocessorServices.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterCoprocessorServices.java
index 9a2696f..b895d39 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterCoprocessorServices.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterCoprocessorServices.java
@@ -41,6 +41,8 @@ import 
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GetUserPer
 import 
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
 import 
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GrantRequest;
 import 
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GrantResponse;
+import 
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.HasPermissionRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.HasPermissionResponse;
 import 
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.RevokeRequest;
 import 
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.RevokeResponse;
 import 
org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.GetAuthsRequest;
@@ -88,6 +90,11 @@ public class TestMasterCoprocessorServices {
     @Override
     public void checkPermissions(RpcController controller, 
CheckPermissionsRequest request,
         RpcCallback<CheckPermissionsResponse> done) {}
+
+    @Override
+    public void hasPermission(RpcController controller, HasPermissionRequest 
request,
+        RpcCallback<HasPermissionResponse> done) {
+    }
   }
 
   private static class MockVisibilityController implements 
VisibilityLabelsService.Interface,

Reply via email to