RANGER-533 Hbase plugin: access denied during get/scan if user does not have 
family level acces to any family in a table.

Signed-off-by: Madhan Neethiraj <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/0d38f0f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/0d38f0f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/0d38f0f2

Branch: refs/heads/tag-policy
Commit: 0d38f0f2c37fef8fb293c37899dfd377afe7688e
Parents: 43841b7
Author: Alok Lal <[email protected]>
Authored: Wed Jun 3 17:45:18 2015 -0700
Committer: Madhan Neethiraj <[email protected]>
Committed: Wed Jun 10 00:33:11 2015 -0700

----------------------------------------------------------------------
 .../model/validation/RangerPolicyValidator.java |   8 +
 .../hbase/AuthorizationSession.java             |  21 +-
 .../authorization/hbase/HbaseAuditHandler.java  |   4 +-
 .../hbase/HbaseAuditHandlerImpl.java            |  61 +++-
 .../hbase/RangerAuthorizationCoprocessor.java   | 295 +++++++++++--------
 .../hbase/RangerAuthorizationFilter.java        | 134 ++++++---
 .../hbase/RangerAuthorizationFilterTest.java    | 120 ++++----
 7 files changed, 419 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/0d38f0f2/agents-common/src/main/java/org/apache/ranger/plugin/model/validation/RangerPolicyValidator.java
----------------------------------------------------------------------
diff --git 
a/agents-common/src/main/java/org/apache/ranger/plugin/model/validation/RangerPolicyValidator.java
 
b/agents-common/src/main/java/org/apache/ranger/plugin/model/validation/RangerPolicyValidator.java
index cea3e05..d27b667 100644
--- 
a/agents-common/src/main/java/org/apache/ranger/plugin/model/validation/RangerPolicyValidator.java
+++ 
b/agents-common/src/main/java/org/apache/ranger/plugin/model/validation/RangerPolicyValidator.java
@@ -320,6 +320,10 @@ public class RangerPolicyValidator extends RangerValidator 
{
                                                        
policyResources.toString(), toStringHierarchies_all(hierarchies, defHelper)))
                                        .build());
                                valid = false;
+                       } else {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("isValidResourceNames: Found 
compatible hierarchies: " + toStringHierarchies_all(candidateHierarchies, 
defHelper));
+                               }
                        }
                        /*
                         * Among the candidate hierarchies there should be at 
least one for which policy specifies all of the mandatory resources.  Note that 
there could be multiple 
@@ -335,6 +339,10 @@ public class RangerPolicyValidator extends RangerValidator 
{
                                        .becauseOf("policy is missing required 
resources. Mandatory fields of potential hierarchies are: " + 
toStringHierarchies_mandatory(candidateHierarchies, defHelper))
                                        .build());
                                valid = false;
+                       } else {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("isValidResourceNames: Found 
hierarchies with all mandatory fields specified: " + 
toStringHierarchies_mandatory(validHierarchies, defHelper));
+                               }
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/0d38f0f2/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/AuthorizationSession.java
----------------------------------------------------------------------
diff --git 
a/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/AuthorizationSession.java
 
b/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/AuthorizationSession.java
index e0b652e..006629b 100644
--- 
a/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/AuthorizationSession.java
+++ 
b/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/AuthorizationSession.java
@@ -179,9 +179,9 @@ public class AuthorizationSession {
        
        AuthorizationSession authorize() {
                if (LOG.isDebugEnabled()) {
-                       String message = "authorize: " + getRequestMessage();
-                       LOG.debug(message);
+                       LOG.debug("==> AuthorizationSession.authorize: " + 
getRequestMessage());
                }
+               
                if (_request == null) {
                        String message = String.format("Invalid state 
transition: buildRequest() must be called before authorize().  This request 
would ultimately get denied.!");
                        throw new IllegalStateException(message);
@@ -195,11 +195,11 @@ public class AuthorizationSession {
                        }
                        _result = _authorizer.isAccessAllowed(_request, 
_auditHandler);
                }
+               
                if (LOG.isDebugEnabled()) {
                        boolean allowed = isAuthorized();
                        String reason = getDenialReason();
-                       String message = "AuthorizationSession.authorize: " + 
getLogMessage(allowed, reason);
-                       LOG.debug(message);
+                       LOG.debug("<== AuthorizationSession.authorize: " + 
getLogMessage(allowed, reason));
                }
                return this;
        }
@@ -212,7 +212,10 @@ public class AuthorizationSession {
        }
        
        void publishResults() throws AccessDeniedException {
-
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> AuthorizationSession.publishResults()");
+               }
+               
                boolean authorized = isAuthorized();
                if (_auditHandler != null) {
                        List<AuthzAuditEvent> events = null;
@@ -226,7 +229,7 @@ public class AuthorizationSession {
                                        events = theseEvents;
                                }
                        } else {
-                               AuthzAuditEvent event = 
_auditHandler.discardMostRecentEvent();
+                               AuthzAuditEvent event = 
_auditHandler.getAndDiscardMostRecentEvent();
                                if (event != null) {
                                        events = Lists.newArrayList(event);
                                }
@@ -244,10 +247,14 @@ public class AuthorizationSession {
                        String reason = getDenialReason();
                        String message = getLogMessage(false, reason);
                        if (LOG.isDebugEnabled()) {
-                               LOG.debug("AuthorizationSession.publishResults: 
throwing exception: " + message);
+                               LOG.debug("<== 
AuthorizationSession.publishResults: throwing exception: " + message);
                        }
                        throw new AccessDeniedException("Insufficient 
permissions for user '" + _user.getName() + "' (action=" + _access + ")");
                }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== AuthorizationSession.publishResults()");
+               }
        }
        
        boolean isAudited() {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/0d38f0f2/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/HbaseAuditHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/HbaseAuditHandler.java
 
b/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/HbaseAuditHandler.java
index bbff6df..c77dc20 100644
--- 
a/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/HbaseAuditHandler.java
+++ 
b/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/HbaseAuditHandler.java
@@ -37,10 +37,10 @@ public interface HbaseAuditHandler extends 
RangerAccessResultProcessor {
         * After this call the last set of audit events won't be returned by 
<code>getCapturedEvents</code>. 
         * @return
         */
-       AuthzAuditEvent discardMostRecentEvent();
+       AuthzAuditEvent getAndDiscardMostRecentEvent();
        
        /**
-        * This is a complement to <code>discardMostRecentEvent</code> to set 
the most recent events.  Often useful to un-pop audit messages that were take 
out.
+        * This is a complement to <code>getAndDiscardMostRecentEvent</code> to 
set the most recent events.  Often useful to un-pop audit messages that were 
take out.
         * @param capturedEvents
         */
        void setMostRecentEvent(AuthzAuditEvent capturedEvents);

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/0d38f0f2/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/HbaseAuditHandlerImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/HbaseAuditHandlerImpl.java
 
b/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/HbaseAuditHandlerImpl.java
index e383614..6fbf5fc 100644
--- 
a/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/HbaseAuditHandlerImpl.java
+++ 
b/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/HbaseAuditHandlerImpl.java
@@ -21,12 +21,15 @@ package org.apache.ranger.authorization.hbase;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
 import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
 import org.apache.ranger.plugin.policyengine.RangerAccessResult;
 
 public class HbaseAuditHandlerImpl extends RangerDefaultAuditHandler 
implements HbaseAuditHandler {
 
+       private static final Log LOG = 
LogFactory.getLog(HbaseAuditHandlerImpl.class);
        static final List<AuthzAuditEvent> _EmptyList = new 
ArrayList<AuthzAuditEvent>();
        final List<AuthzAuditEvent> _allEvents = new 
ArrayList<AuthzAuditEvent>();
        // we replace its contents anytime new audit events are generated.
@@ -35,55 +38,109 @@ public class HbaseAuditHandlerImpl extends 
RangerDefaultAuditHandler implements
        
        @Override
        public AuthzAuditEvent getAuthzEvents(RangerAccessResult result) {
-               
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> HbaseAuditHandlerImpl.getAuthzEvents(" + 
result + ")");
+               }
+
                AuthzAuditEvent event = super.getAuthzEvents(result);
                // first accumulate last set of events and then capture these 
as the most recent ones
                if (_mostRecentEvent != null) {
+                       LOG.debug("getAuthzEvents: got one event from default 
audit handler");
                        _allEvents.add(_mostRecentEvent);
+               } else {
+                       LOG.debug("getAuthzEvents: no event produced by default 
audit handler");
                }
                _mostRecentEvent = event;
+
                // We return null because we don't want default audit handler 
to audit anything!
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== HbaseAuditHandlerImpl.getAuthzEvents(" + 
result + "): null");
+               }
                return null;
        }
        
        @Override
        public List<AuthzAuditEvent> getCapturedEvents() {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> 
HbaseAuditHandlerImpl.getCapturedEvents()");
+               }
+
                // construct a new collection since we don't want to lose track 
of which were the most recent events;
                List<AuthzAuditEvent> result = new 
ArrayList<AuthzAuditEvent>(_allEvents);
                if (_mostRecentEvent != null) {
                        result.add(_mostRecentEvent);
                }
                applySuperUserOverride(result);
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== HbaseAuditHandlerImpl.getAuthzEvents(): 
count[" + result.size() + "] :result : " + result);
+               }
                return result;
        }
 
        @Override
-       public AuthzAuditEvent  discardMostRecentEvent() {
+       public AuthzAuditEvent getAndDiscardMostRecentEvent() {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> 
HbaseAuditHandlerImpl.getAndDiscardMostRecentEvent():");
+               }
+
                AuthzAuditEvent result = _mostRecentEvent;
                applySuperUserOverride(result);
                _mostRecentEvent = null;
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== 
HbaseAuditHandlerImpl.getAndDiscardMostRecentEvent(): " + result);
+               }
                return result;
        }
 
        @Override
        public void setMostRecentEvent(AuthzAuditEvent event) {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> 
HbaseAuditHandlerImpl.setMostRecentEvent(" + event + ")");
+               }
                _mostRecentEvent = event;
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== 
HbaseAuditHandlerImpl.setMostRecentEvent(...)");
+               }
        }
 
        @Override
        public void setSuperUserOverride(boolean override) {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> 
HbaseAuditHandlerImpl.setSuperUserOverride(" + override + ")");
+               }
+
                _superUserOverride = override;
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== 
HbaseAuditHandlerImpl.setSuperUserOverride(...)");
+               }
        }
        
        void applySuperUserOverride(List<AuthzAuditEvent> events) {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> 
HbaseAuditHandlerImpl.applySuperUserOverride(" + events + ")");
+               }
+
                for (AuthzAuditEvent event : events) {
                        applySuperUserOverride(event);
                }
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== 
HbaseAuditHandlerImpl.applySuperUserOverride(...)");
+               }
        }
        
        void applySuperUserOverride(AuthzAuditEvent event) {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== 
HbaseAuditHandlerImpl.applySuperUserOverride(" + event + ")");
+               }
                if (event != null && _superUserOverride) {
                        event.setAccessResult((short)1);
                }
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> 
HbaseAuditHandlerImpl.applySuperUserOverride(...)");
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/0d38f0f2/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationCoprocessor.java
----------------------------------------------------------------------
diff --git 
a/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationCoprocessor.java
 
b/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationCoprocessor.java
index abf8a33..e64c5af 100644
--- 
a/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationCoprocessor.java
+++ 
b/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationCoprocessor.java
@@ -98,14 +98,13 @@ import 
org.apache.ranger.authorization.hadoop.constants.RangerHadoopConstants;
 import org.apache.ranger.authorization.utils.StringUtil;
 import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
 import org.apache.ranger.plugin.policyengine.RangerAccessResultProcessor;
+import org.apache.ranger.plugin.policyengine.RangerPolicyEngine;
 import org.apache.ranger.plugin.service.RangerBasePlugin;
 import org.apache.ranger.plugin.util.GrantRevokeRequest;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
 import com.google.common.collect.MapMaker;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
@@ -256,21 +255,23 @@ public class RangerAuthorizationCoprocessor extends 
RangerAuthorizationCoprocess
                final boolean _everythingIsAccessible;
                final boolean _somethingIsAccessible;
                final List<AuthzAuditEvent> _accessAllowedEvents;
+               final List<AuthzAuditEvent> _familyLevelAccessEvents;
                final AuthzAuditEvent _accessDeniedEvent;
-               final Map<String, Set<String>> _allowedColumns;
                final String _denialReason;
-               
-               ColumnFamilyAccessResult(
-                               boolean everythingIsAccessible, boolean 
somethingIsAccessible, 
-                               List<AuthzAuditEvent> accessAllowedEvents, 
AuthzAuditEvent accessDeniedEvent,
-                               Map<String, Set<String>> allowedColumns, String 
denialReason) {
+               final RangerAuthorizationFilter _filter;
+
+               ColumnFamilyAccessResult(boolean everythingIsAccessible, 
boolean somethingIsAccessible,
+                                                                
List<AuthzAuditEvent> accessAllowedEvents, List<AuthzAuditEvent> 
familyLevelAccessEvents, AuthzAuditEvent accessDeniedEvent, String denialReason,
+                                                                
RangerAuthorizationFilter filter) {
                        _everythingIsAccessible = everythingIsAccessible;
                        _somethingIsAccessible = somethingIsAccessible;
                        // WARNING: we are just holding on to reference of the 
collection.  Potentially risky optimization
                        _accessAllowedEvents = accessAllowedEvents;
+                       _familyLevelAccessEvents = familyLevelAccessEvents;
                        _accessDeniedEvent = accessDeniedEvent;
-                       _allowedColumns = allowedColumns;
                        _denialReason = denialReason;
+                       // cached values of access results
+                       _filter = filter;
                }
                
                @Override
@@ -279,9 +280,10 @@ public class RangerAuthorizationCoprocessor extends 
RangerAuthorizationCoprocess
                                        .add("everythingIsAccessible", 
_everythingIsAccessible)
                                        .add("somethingIsAccessible", 
_somethingIsAccessible)
                                        .add("accessAllowedEvents", 
_accessAllowedEvents)
+                                       .add("familyLevelAccessEvents", 
_familyLevelAccessEvents)
                                        .add("accessDeniedEvent", 
_accessDeniedEvent)
-                                       .add("allowedColumns", _allowedColumns)
                                        .add("denialReason", _denialReason)
+                                       .add("filter", _filter)
                                        .toString();
                        
                }
@@ -291,12 +293,12 @@ public class RangerAuthorizationCoprocessor extends 
RangerAuthorizationCoprocess
                        final Map<byte[], ? extends Collection<?>> familyMap) 
throws AccessDeniedException {
                
                String access = _authUtils.getAccess(action);
-               
+               User user = getActiveUser();
+               String userName = _userUtils.getUserAsString(user);
+
                if (LOG.isDebugEnabled()) {
-                       final String format = "evaluateAccess: entered: 
Operation[%s], access[%s], families[%s]";
-                       Map<String, Set<String>> families = 
getColumnFamilies(familyMap);
-                       String message = String.format(format, operation, 
access, families.toString());
-                       LOG.debug(message);
+                       LOG.debug(String.format("evaluateAccess: entered: 
user[%s], Operation[%s], access[%s], families[%s]",
+                                       userName, operation, access, 
getColumnFamilies(familyMap).toString()));
                }
 
                byte[] tableBytes = getTableName(env);
@@ -307,20 +309,19 @@ public class RangerAuthorizationCoprocessor extends 
RangerAuthorizationCoprocess
                }
                String table = Bytes.toString(tableBytes);
 
-               final String messageTemplate = "evaluateAccess: exiting: 
Operation[%s], access[%s], families[%s], verdict[%s]";
+               final String messageTemplate = "evaluateAccess: exiting: 
user[%s], Operation[%s], access[%s], families[%s], verdict[%s]";
                ColumnFamilyAccessResult result;
                if (canSkipAccessCheck(operation, access, table) || 
canSkipAccessCheck(operation, access, env)) {
                        LOG.debug("evaluateAccess: exiting: 
isKnownAccessPattern returned true: access allowed, not audited");
-                       result = new ColumnFamilyAccessResult(true, true, null, 
null, null, null);
+                       result = new ColumnFamilyAccessResult(true, true, null, 
null, null, null, null);
                        if (LOG.isDebugEnabled()) {
                                Map<String, Set<String>> families = 
getColumnFamilies(familyMap);
-                               String message = String.format(messageTemplate, 
operation, access, families.toString(), result.toString());
+                               String message = String.format(messageTemplate, 
userName, operation, access, families.toString(), result.toString());
                                LOG.debug(message);
                        }
                        return result;
                }
                
-               User user = getActiveUser();
                // let's create a session that would be reused.  Set things on 
it that won't change.
                HbaseAuditHandler auditHandler = _factory.getAuditHandler(); 
                AuthorizationSession session = new 
AuthorizationSession(hbasePlugin)
@@ -340,17 +341,21 @@ public class RangerAuthorizationCoprocessor extends 
RangerAuthorizationCoprocess
                                .authorize();
                        boolean authorized = session.isAuthorized();
                        String reason = "";
-                       if (!authorized) {
+                       if (authorized) {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("evaluateAccess: table level 
access granted [" + table + "]");
+                               }
+                       } else {
                                reason = String.format("Insufficient 
permissions for user ‘%s',action: %s, tableName:%s, no column families 
found.", user.getName(), operation, table);
                        }
-                       AuthzAuditEvent event = 
auditHandler.discardMostRecentEvent(); // this could be null, of course, 
depending on audit settings of table.
+                       AuthzAuditEvent event = 
auditHandler.getAndDiscardMostRecentEvent(); // this could be null, of course, 
depending on audit settings of table.
 
                        // if authorized then pass captured events as access 
allowed set else as access denied set.
                        result = new ColumnFamilyAccessResult(authorized, 
authorized, 
                                                authorized ? 
Collections.singletonList(event) : null,
-                                               authorized ? null : event, 
null, reason);
+                                               null, authorized ? null : 
event, reason, null);
                        if (LOG.isDebugEnabled()) {
-                               String message = String.format(messageTemplate, 
operation, access, families.toString(), result.toString());
+                               String message = String.format(messageTemplate, 
userName, operation, access, families.toString(), result.toString());
                                LOG.debug(message);
                        }
                        return result;
@@ -360,13 +365,20 @@ public class RangerAuthorizationCoprocessor extends 
RangerAuthorizationCoprocess
                
                boolean everythingIsAccessible = true;
                boolean somethingIsAccessible = false;
-               // we would have to accumulate audits of all successful 
accesses and any one denial (which in our case ends up being the last denial)
-               List<AuthzAuditEvent> authorizedEvents = new 
ArrayList<AuthzAuditEvent>(); 
+               /*
+                * we would have to accumulate audits of all successful 
accesses and any one denial (which in our case ends up being the last denial)
+                * We need to keep audit events for family level access check 
seperate because we don't want them logged in some cases.
+                */
+               List<AuthzAuditEvent> authorizedEvents = new 
ArrayList<AuthzAuditEvent>();
+               List<AuthzAuditEvent> familyLevelAccessEvents = new 
ArrayList<AuthzAuditEvent>();
                AuthzAuditEvent deniedEvent = null;
                String denialReason = null;
                // we need to cache the auths results so that we can create a 
filter, if needed
-               Map<String, Set<String>> accessResultsCache = new 
HashMap<String, Set<String>>();
-               
+               Map<String, Set<String>> columnsAccessAllowed = new 
HashMap<String, Set<String>>();
+               Set<String> familesAccessAllowed = new HashSet<String>();
+               Set<String> familesAccessDenied = new HashSet<String>();
+               Set<String> familesAccessIndeterminate = new HashSet<String>();
+
                for (Map.Entry<String, Set<String>> anEntry : 
families.entrySet()) {
                        String family = anEntry.getKey();
                        session.columnFamily(family);
@@ -379,19 +391,47 @@ public class RangerAuthorizationCoprocessor extends 
RangerAuthorizationCoprocess
                                session.column(null) // zap stale column from 
prior iteration of this loop, if any
                                        .buildRequest()
                                        .authorize();
+                               AuthzAuditEvent auditEvent = 
auditHandler.getAndDiscardMostRecentEvent(); // capture it only for success
                                if (session.isAuthorized()) {
-                                       // we need to do 3 things: 
housekeeping, capturing audit events, building the results cache for filter
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug("evaluateAccess: has 
family level access [" + family + "]");
+                                       }
+                                       // we need to do 3 things: 
housekeeping, decide about audit events, building the results cache for filter
                                        somethingIsAccessible = true;
-                                       AuthzAuditEvent event = 
auditHandler.discardMostRecentEvent();
-                                       if (event != null) {
-                                               authorizedEvents.add(event);
+                                       familesAccessAllowed.add(family);
+                                       if (auditEvent != null) {
+                                               LOG.debug("evaluateAccess: 
adding to family-level-access-granted-event-set");
+                                               
familyLevelAccessEvents.add(auditEvent);
                                        }
-                                       // presence of key with null value 
would imply access to all columns in a family.
-                                       accessResultsCache.put(family, null);
                                } else {
                                        everythingIsAccessible = false;
-                                       deniedEvent = 
auditHandler.discardMostRecentEvent();
-                                       denialReason = 
String.format("Insufficient permissions for user ‘%s',action: %s, 
tableName:%s, family:%s, no columns found.", user.getName(), operation, table, 
family);
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug("evaluateAccess: no 
family level access [" + family + "].  Checking if has partial access (of any 
type)...");
+                                       }
+                                       
session.access(RangerPolicyEngine.ANY_ACCESS)
+                                                       .buildRequest()
+                                                       .authorize();
+                                       auditEvent = 
auditHandler.getAndDiscardMostRecentEvent(); // capture it only for failure
+                                       if (session.isAuthorized()) {
+                                               if (LOG.isDebugEnabled()) {
+                                                       
LOG.debug("evaluateAccess: has partial access (of some type) in family [" + 
family + "]");
+                                               }
+                                               // we need to do 3 things: 
housekeeping, decide about audit events, building the results cache for filter
+                                               somethingIsAccessible = true;
+                                               
familesAccessIndeterminate.add(family);
+                                       } else {
+                                               if (LOG.isDebugEnabled()) {
+                                                       
LOG.debug("evaluateAccess: has no access of any (of any type) in family [" + 
family + "]");
+                                               }
+                                               familesAccessDenied.add(family);
+                                               denialReason = 
String.format("Insufficient permissions for user ‘%s',action: %s, 
tableName:%s, family:%s, no columns found.", user.getName(), operation, table, 
family);
+                                               if (auditEvent != null && 
deniedEvent == null) { // we need to capture just one denial event
+                                                       
LOG.debug("evaluateAccess: Setting denied access audit event with last auth 
failure audit event.");
+                                                       deniedEvent = 
auditEvent;
+                                               }
+                                       }
+                                       // Restore the access back
+                                       session.access(access);
                                }
                        } else {
                                LOG.debug("evaluateAccess: columns collection 
not empty.  Skipping Family level check, will do finer level access check.");
@@ -403,51 +443,73 @@ public class RangerAuthorizationCoprocessor extends 
RangerAuthorizationCoprocess
                                        session.column(column)
                                                .buildRequest()
                                                .authorize();
+                                       AuthzAuditEvent auditEvent = 
auditHandler.getAndDiscardMostRecentEvent();
                                        if (session.isAuthorized()) {
+                                               if (LOG.isDebugEnabled()) {
+                                                       
LOG.debug("evaluateAccess: has column level access [" + family + ", " + column 
+ "]");
+                                               }
                                                // we need to do 3 things: 
housekeeping, capturing audit events, building the results cache for filter
                                                somethingIsAccessible = true;
-                                               AuthzAuditEvent event = 
auditHandler.discardMostRecentEvent();
-                                               if (event != null) {
-                                                       
authorizedEvents.add(event);
-                                               }
                                                accessibleColumns.add(column);
+                                               if (auditEvent != null) {
+                                                       
LOG.debug("evaluateAccess: adding to access-granted-audit-event-set");
+                                                       
authorizedEvents.add(auditEvent);
+                                               }
                                        } else {
+                                               if (LOG.isDebugEnabled()) {
+                                                       
LOG.debug("evaluateAccess: no column level access [" + family + ", " + column + 
"]");
+                                               }
                                                everythingIsAccessible = false;
-                                               deniedEvent = 
auditHandler.discardMostRecentEvent();
-                                               denialReason = 
String.format("Insufficient permissions for user ‘%s',action: %s, 
tableName:%s, family:%s, column: %s", user.getName(), operation, table, family, 
column);  
+                                               denialReason = 
String.format("Insufficient permissions for user ‘%s',action: %s, 
tableName:%s, family:%s, column: %s", user.getName(), operation, table, family, 
column);
+                                               if (auditEvent != null && 
deniedEvent == null) { // we need to capture just one denial event
+                                                       
LOG.debug("evaluateAccess: Setting denied access audit event with last auth 
failure audit event.");
+                                                       deniedEvent = 
auditEvent;
+                                               }
                                        }
+                                       if (!accessibleColumns.isEmpty()) {
+                                               
columnsAccessAllowed.put(family, accessibleColumns);
+                                       }
                                }
-                               if (!accessibleColumns.isEmpty()) {
-                                       accessResultsCache.put(family, 
accessibleColumns);
-                               }
                        }
                }
-               
-               result = new ColumnFamilyAccessResult(everythingIsAccessible, 
somethingIsAccessible, authorizedEvents, deniedEvent, accessResultsCache, 
denialReason);
+               // Cache of auth results are encapsulated the in the filter. 
Not every caller of the function uses it - only preGet and preOpt will.
+               RangerAuthorizationFilter filter = new 
RangerAuthorizationFilter(session, familesAccessAllowed, familesAccessDenied, 
familesAccessIndeterminate, columnsAccessAllowed);
+               result = new ColumnFamilyAccessResult(everythingIsAccessible, 
somethingIsAccessible, authorizedEvents, familyLevelAccessEvents, deniedEvent, 
denialReason, filter);
                if (LOG.isDebugEnabled()) {
-                       String message = String.format(messageTemplate, 
operation, access, families.toString(), result.toString());
+                       String message = String.format(messageTemplate, 
userName, operation, access, families.toString(), result.toString());
                        LOG.debug(message);
                }
                return result;
        }
-       
+
        Filter authorizeAccess(String operation, Action action, final 
RegionCoprocessorEnvironment env, final Map<byte[], NavigableSet<byte[]>> 
familyMap) throws AccessDeniedException {
 
-               ColumnFamilyAccessResult accessResult = 
evaluateAccess(operation, action, env, familyMap);
-               RangerDefaultAuditHandler auditHandler = new 
RangerDefaultAuditHandler();
-               if (accessResult._everythingIsAccessible) {
-                       
auditHandler.logAuthzAudits(accessResult._accessAllowedEvents);
-                       LOG.debug("authorizeAccess: exiting: No filter returned 
since all access was allowed");
-                       return null; // no filter needed since we are good to 
go.
-               } else if (accessResult._somethingIsAccessible) {
-                       
auditHandler.logAuthzAudits(accessResult._accessAllowedEvents); // we still 
need to log those to which we got access.
-                       LOG.debug("authorizeAccess: exiting: Filter returned 
since some access was allowed");
-                       return new 
RangerAuthorizationFilter(accessResult._allowedColumns);
-               } else {
-                       // If we are here then it means nothing was accessible! 
 So let's log one denial (in our case, the last denial) and throw an exception
-                       
auditHandler.logAuthzAudit(accessResult._accessDeniedEvent);
-                       LOG.debug("authorizeAccess: exiting: Throwing exception 
since nothing was accessible");
-                       throw new 
AccessDeniedException(accessResult._denialReason);
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> authorizeAccess");
+               }
+               try {
+                       ColumnFamilyAccessResult accessResult = 
evaluateAccess(operation, action, env, familyMap);
+                       RangerDefaultAuditHandler auditHandler = new 
RangerDefaultAuditHandler();
+                       if (accessResult._everythingIsAccessible) {
+                               
auditHandler.logAuthzAudits(accessResult._accessAllowedEvents);
+                               
auditHandler.logAuthzAudits(accessResult._familyLevelAccessEvents);
+                               LOG.debug("authorizeAccess: exiting: No filter 
returned since all access was allowed");
+                               return null; // no filter needed since we are 
good to go.
+                       } else if (accessResult._somethingIsAccessible) {
+                               // NOTE: audit logging is split beween logging 
here (in scope of preOp/preGet) and logging in the filter component for those 
that couldn't be determined
+                               
auditHandler.logAuthzAudits(accessResult._accessAllowedEvents);
+                               LOG.debug("authorizeAccess: exiting: Filter 
returned since some access was allowed");
+                               return accessResult._filter;
+                       } else {
+                               // If we are here then it means nothing was 
accessible!  So let's log one denial (in our case, the last denial) and throw 
an exception
+                               
auditHandler.logAuthzAudit(accessResult._accessDeniedEvent);
+                               LOG.debug("authorizeAccess: exiting: Throwing 
exception since nothing was accessible");
+                               throw new 
AccessDeniedException(accessResult._denialReason);
+                       }
+               } finally {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("<== authorizeAccess");
+                       }
                }
        }
        
@@ -466,6 +528,7 @@ public class RangerAuthorizationCoprocessor extends 
RangerAuthorizationCoprocess
                RangerDefaultAuditHandler auditHandler = new 
RangerDefaultAuditHandler();
                if (accessResult._everythingIsAccessible) {
                        
auditHandler.logAuthzAudits(accessResult._accessAllowedEvents);
+                       
auditHandler.logAuthzAudits(accessResult._familyLevelAccessEvents);
                        LOG.debug("requirePermission: exiting: all access was 
allowed");
                        return;
                } else {
@@ -479,7 +542,6 @@ public class RangerAuthorizationCoprocessor extends 
RangerAuthorizationCoprocess
         * This could run s
         * @param operation
         * @param otherInformation
-        * @param access
         * @param table
         * @param columnFamily
         * @param column
@@ -614,35 +676,6 @@ public class RangerAuthorizationCoprocessor extends 
RangerAuthorizationCoprocess
                requirePermission(request, perm, env, familyMap);
        }
        
-       public void checkPermissions(Permission[] permissions) throws 
IOException {
-               String tableName = 
regionEnv.getRegion().getTableDesc().getTableName().getNameAsString() ;
-               for (Permission permission : permissions) {
-                       if (permission instanceof TablePermission) {
-                               TablePermission tperm = (TablePermission) 
permission;
-                               for (Permission.Action action : 
permission.getActions()) {
-                                       if (! 
tperm.getTableName().getNameAsString().equals(tableName)) {
-                                               throw new 
AccessDeniedException(String.format("This method can only execute at the table 
specified in TablePermission. " + "Table of the region:%s , requested 
table:%s", tableName, 
-                                                                               
                                                          
tperm.getTableName().getNameAsString()));
-                                       }
-                                       HashMap<byte[], Set<byte[]>> familyMap 
= Maps.newHashMapWithExpectedSize(1);
-                                       if (tperm.getFamily() != null) {
-                                               if (tperm.getQualifier() != 
null) {
-                                                       
familyMap.put(tperm.getFamily(), Sets.newHashSet(tperm.getQualifier()));
-                                               } else {
-                                                       
familyMap.put(tperm.getFamily(), null);
-                                               }
-                                       }
-                                       requirePermission("checkPermissions", 
action, regionEnv, familyMap);
-                               }
-                       } else {
-                               for (Permission.Action action : 
permission.getActions()) {
-                                       byte[] tname = 
regionEnv.getRegion().getTableDesc().getTableName().getName() ;
-                                       requirePermission("checkPermissions", 
tname, action);
-                               }
-                       }
-               }
-       }
-       
        @Override
        public void 
postScannerClose(ObserverContext<RegionCoprocessorEnvironment> c, 
InternalScanner s) throws IOException {
                scannerOwners.remove(s);
@@ -794,7 +827,6 @@ public class RangerAuthorizationCoprocessor extends 
RangerAuthorizationCoprocess
                final Region region = env.getRegion();
                if (region == null) {
                        LOG.error("NULL region from 
RegionCoprocessorEnvironment in preOpen()");
-                       return;
                } else {
                        HRegionInfo regionInfo = region.getRegionInfo();
                        if (isSpecialTable(regionInfo)) {
@@ -820,24 +852,34 @@ public class RangerAuthorizationCoprocessor extends 
RangerAuthorizationCoprocess
        }
        @Override
        public RegionScanner 
preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan, 
RegionScanner s) throws IOException {
-               RegionCoprocessorEnvironment e = c.getEnvironment();
-               
-               Map<byte[], NavigableSet<byte[]>> familyMap = 
scan.getFamilyMap();
-               String operation = "scannerOpen";
-               Filter filter = authorizeAccess(operation, Action.READ, e, 
familyMap);
-               if (filter == null) {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("preGetOp: Access allowed for all 
families/column.");
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> preScannerOpen");
+               }
+
+               try {
+                       RegionCoprocessorEnvironment e = c.getEnvironment();
+
+                       Map<byte[], NavigableSet<byte[]>> familyMap = 
scan.getFamilyMap();
+                       String operation = "scannerOpen";
+                       Filter filter = authorizeAccess(operation, Action.READ, 
e, familyMap);
+                       if (filter == null) {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("preScannerOpen: Access 
allowed for all families/column.  No filter added");
+                               }
+                       } else {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("preScannerOpen: Access 
allowed for some of the families/column. New filter added.");
+                               }
+                               Filter existingFilter = scan.getFilter();
+                               Filter combinedFilter = combineFilters(filter, 
existingFilter);
+                               scan.setFilter(combinedFilter);
                        }
-               } else {
+                       return s;
+               } finally {
                        if (LOG.isDebugEnabled()) {
-                               LOG.debug("preGetOp: Access allowed for some of 
the families/column.");
+                               LOG.debug("<== preScannerOpen");
                        }
-                       Filter existingFilter = scan.getFilter();
-                       Filter combinedFilter = combineFilters(filter, 
existingFilter);
-                       scan.setFilter(combinedFilter);
                }
-               return s;
        }
        @Override
        public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> 
c) throws IOException {
@@ -948,21 +990,32 @@ public class RangerAuthorizationCoprocessor extends 
RangerAuthorizationCoprocess
        
        @Override
        public void preGetOp(final 
ObserverContext<RegionCoprocessorEnvironment> rEnv, final Get get, final 
List<Cell> result) throws IOException {
-               RegionCoprocessorEnvironment e = rEnv.getEnvironment();
-               Map<byte[], NavigableSet<byte[]>> familyMap = 
get.getFamilyMap() ;
-
-               String operation = "get";
-               Filter filter = authorizeAccess(operation, Action.READ, e, 
familyMap);
-               if (filter == null) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> preGetOp");
+               }
+               try {
+                       RegionCoprocessorEnvironment e = rEnv.getEnvironment();
+                       Map<byte[], NavigableSet<byte[]>> familyMap = 
get.getFamilyMap();
+
+                       String operation = "get";
+                       Filter filter = authorizeAccess(operation, Action.READ, 
e, familyMap);
+                       if (filter == null) {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("preGetOp: all access 
allowed, no filter returned");
+                               }
+                       } else {
+                               Filter existingFilter = get.getFilter();
+                               Filter combinedFilter = combineFilters(filter, 
existingFilter);
+                               get.setFilter(combinedFilter);
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("preGetOp: partial access, 
new filter added");
+                               }
+                       }
+               } finally {
                        if (LOG.isDebugEnabled()) {
-                               LOG.debug("preGetOp: Access allowed.");
+                               LOG.debug("<== preGetOp");
                        }
-               } else {
-                       Filter existingFilter = get.getFilter();
-                       Filter combinedFilter = combineFilters(filter, 
existingFilter);
-                       get.setFilter(combinedFilter);
                }
-               return;
        }
        @Override
        public void 
preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo 
regionInfo) throws IOException {
@@ -1008,7 +1061,7 @@ public class RangerAuthorizationCoprocessor extends 
RangerAuthorizationCoprocess
                                
session.table(tableName).buildRequest().authorize();
                                if (!session.isAuthorized()) {
                                        itr.remove();
-                                       auditHandler.discardMostRecentEvent();
+                                       
auditHandler.getAndDiscardMostRecentEvent();
                                }
                        }
                        if (descriptors.size() > 0) {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/0d38f0f2/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationFilter.java
----------------------------------------------------------------------
diff --git 
a/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationFilter.java
 
b/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationFilter.java
index ae61a1e..e281099 100644
--- 
a/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationFilter.java
+++ 
b/hbase-agent/src/main/java/org/apache/ranger/authorization/hbase/RangerAuthorizationFilter.java
@@ -20,69 +20,127 @@
 package org.apache.ranger.authorization.hbase;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.collections.CollectionUtils;
+import com.google.common.base.Objects;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
 
 public class RangerAuthorizationFilter extends FilterBase {
 
        private static final Log LOG = 
LogFactory.getLog(RangerAuthorizationFilter.class.getName());
-       final Map<String, Set<String>> _cache;
+       final Set<String> _familiesAccessAllowed;
+       final Set<String> _familiesAccessDenied;
+       final Set<String> _familiesAccessIndeterminate;
+       final Map<String, Set<String>> _columnsAccessAllowed;
+       final AuthorizationSession _session;
+       final HbaseAuditHandler _auditHandler = 
HbaseFactory.getInstance().getAuditHandler();
 
-       public RangerAuthorizationFilter(Map<String, Set<String>> cache) {
-               _cache = cache;
+       public RangerAuthorizationFilter(AuthorizationSession session, 
Set<String> familiesAccessAllowed, Set<String> familiesAccessDenied, 
Set<String> familiesAccessIndeterminate,
+                                                                        
Map<String, Set<String>> columnsAccessAllowed) {
+               // the class assumes that all of these can be empty but none of 
these can be null
+               _familiesAccessAllowed = familiesAccessAllowed;
+               _familiesAccessDenied = familiesAccessDenied;
+               _familiesAccessIndeterminate = familiesAccessIndeterminate;
+               _columnsAccessAllowed = columnsAccessAllowed;
+               // this session should have everything set on it except family 
and column which would be altered based on need
+               _session = session;
+               // we don't want to audit denial, so we need to make sure the 
hander is what we need it to be.
+               _session.auditHandler(_auditHandler);
        }
        
        @SuppressWarnings("deprecation")
        @Override
        public ReturnCode filterKeyValue(Cell kv) throws IOException {
-               
-               if (_cache == null || _cache.isEmpty()) {
-                       LOG.debug("filterKeyValue: if cache is null or empty 
then there is no hope for any access. Denied!");
-                       return ReturnCode.NEXT_COL;
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> filterKeyValue");
                }
-               
+
+               String family = null;
                byte[] familyBytes = kv.getFamily();
-               if (familyBytes == null || familyBytes.length == 0) {
-                       LOG.debug("filterKeyValue: null/empty families in 
request! Denied!");
-                       return ReturnCode.NEXT_COL;
+               if (familyBytes != null && familyBytes.length > 0) {
+                       family = Bytes.toString(familyBytes);
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("filterKeyValue: evaluating family[" 
+ family + "].");
+                       }
                }
-               String family = Bytes.toString(familyBytes);
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("filterKeyValue: Evaluating family[" + family 
+ "]");
+               String column = null;
+               if (kv.getQualifier() != null && kv.getQualifier().length > 0) {
+                       column = Bytes.toString(kv.getQualifier());
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("filterKeyValue: evaluating column[" 
+ column + "].");
+                       }
+               } else {
+                       LOG.warn("filterKeyValue: empty/null column set! 
Unexpected!");
                }
-               
-               if (!_cache.containsKey(family)) {
-                       LOG.debug("filterKeyValue: Cache map did not contain 
the family, i.e. nothing in family has access! Denied!");
-                       return ReturnCode.NEXT_COL;
+
+               ReturnCode result = ReturnCode.NEXT_COL;
+               boolean authCheckNeeded = false;
+               if (family == null) {
+                       LOG.warn("filterKeyValue: Unexpected - null/empty 
family! Access denied!");
+               } else if (_familiesAccessDenied.contains(family)) {
+                       LOG.debug("filterKeyValue: family found in access 
denied families cache.  Access denied.");
+               } else if (_columnsAccessAllowed.containsKey(family)) {
+                       LOG.debug("filterKeyValue: family found in column level 
access results cache.");
+                       if (_columnsAccessAllowed.get(family).contains(column)) 
{
+                               LOG.debug("filterKeyValue: family/column found 
in column level access results cache. Access allowed.");
+                               result = ReturnCode.INCLUDE;
+                       } else {
+                               LOG.debug("filterKeyValue: family/column not in 
column level access results cache. Access denied.");
+                       }
+               } else if (_familiesAccessAllowed.contains(family)) {
+                       LOG.debug("filterKeyValue: family found in access 
allowed families cache.  Must re-authorize for correct audit generation.");
+                       authCheckNeeded = true;
+               } else if (_familiesAccessIndeterminate.contains(family)) {
+                       LOG.debug("filterKeyValue: family found in 
indeterminate families cache.  Evaluating access...");
+                       authCheckNeeded = true;
+               } else {
+                       LOG.warn("filterKeyValue: Unexpected - alien family 
encountered that wasn't seen by pre-hook!  Access Denied.!");
                }
-               Set<String> columns = _cache.get(family);
-               
-               if (CollectionUtils.isEmpty(columns)) {
-                       LOG.debug("filterKeyValue: empty/null column set in 
cache for family implies family level access. No need to bother with column 
level.  Allowed!");
-                       return ReturnCode.INCLUDE;
-               }               
-               byte[] columnBytes = kv.getQualifier();
-               if (columnBytes == null || columnBytes.length == 0) {
-                       LOG.debug("filterKeyValue: empty/null column set in 
request implies family level access, which isn't available.  Denied!");
-                       return ReturnCode.NEXT_COL;
+
+               if (authCheckNeeded) {
+                       LOG.debug("filterKeyValue: Checking authorization...");
+                       _session.columnFamily(family)
+                                       .column(column)
+                                       .buildRequest()
+                                       .authorize();
+                       // must always purge the captured audit event out of 
the audit handler to avoid messing up the next check
+                       AuthzAuditEvent auditEvent = 
_auditHandler.getAndDiscardMostRecentEvent();
+                       if (_session.isAuthorized()) {
+                               LOG.debug("filterKeyValue: Access granted.");
+                               result = ReturnCode.INCLUDE;
+                               if (auditEvent != null) {
+                                       LOG.debug("filterKeyValue: access is 
audited.");
+                                       
_auditHandler.logAuthzAudits(Collections.singletonList(auditEvent));
+                               } else {
+                                       LOG.debug("filterKeyValue: no audit 
event returned.  Access not audited.");
+                               }
+                       } else {
+                               LOG.debug("filterKeyValue: Access denied.  
Denial not audited.");
+                       }
                }
-               String column = Bytes.toString(columnBytes);
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("filterKeyValue: Evaluating column[" + column 
+ "]");
-               }
-               if (columns.contains(column)) {
-                       LOG.debug("filterKeyValue: cache contains Column in 
column-family's collection.  Access allowed!");
-                       return ReturnCode.INCLUDE;
-               } else {
-                       LOG.debug("filterKeyValue: cache missing Column in 
column-family's collection.  Access denied!");
-                       return ReturnCode.NEXT_COL;
+                       LOG.debug("filterKeyValue: " + result);
                }
+               return result;
        }
+
+       @Override
+       public String toString() {
+               return Objects.toStringHelper(getClass())
+                               .add("familiesAccessAllowed", 
_familiesAccessAllowed)
+                               .add("familiesAccessDenied", 
_familiesAccessDenied)
+                               .add("familiesAccessUnknown", 
_familiesAccessIndeterminate)
+                               .add("columnsAccessAllowed", 
_columnsAccessAllowed)
+                               .toString();
+
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/0d38f0f2/hbase-agent/src/test/java/org/apache/ranger/authorization/hbase/RangerAuthorizationFilterTest.java
----------------------------------------------------------------------
diff --git 
a/hbase-agent/src/test/java/org/apache/ranger/authorization/hbase/RangerAuthorizationFilterTest.java
 
b/hbase-agent/src/test/java/org/apache/ranger/authorization/hbase/RangerAuthorizationFilterTest.java
index 4b49721..2c460d1 100644
--- 
a/hbase-agent/src/test/java/org/apache/ranger/authorization/hbase/RangerAuthorizationFilterTest.java
+++ 
b/hbase-agent/src/test/java/org/apache/ranger/authorization/hbase/RangerAuthorizationFilterTest.java
@@ -19,6 +19,7 @@
 package org.apache.ranger.authorization.hbase;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -28,6 +29,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
 import org.junit.Test;
@@ -40,68 +43,77 @@ public class RangerAuthorizationFilterTest {
        @Test
        public void testFilterKeyValueCell_happyPath() throws IOException {
 
-               // null/empty column collection in cache for a family implies 
family level access  
-               Map<String, Set<String>> cache = new HashMap<String, 
Set<String>>();
-               cache.put("family1", Collections.<String> emptySet());
-               RangerAuthorizationFilter filter = new 
RangerAuthorizationFilter(cache);
+               // null/empty column collection in cache for a family implies 
family level access
+               String[] allowedFamilies = new String[] { "family1", "family2" 
};
+               String[] deniedFamilies = new String[] { "family3", "family4" };
+               String[] indeterminateFamilies = new String[] { "family5", 
"family6" };
+               Set<String> familiesAccessAllowed = 
ImmutableSet.copyOf(allowedFamilies);
+               Set<String> familiesAccessDenied = 
ImmutableSet.copyOf(deniedFamilies);
+               Set<String> familiesAccessIndeterminate = 
ImmutableSet.copyOf(indeterminateFamilies);
 
-               Cell aCell = mock(Cell.class);
-               when(aCell.getFamily()).thenReturn("family1".getBytes());
-               when(aCell.getQualifier()).thenReturn("column1".getBytes());
-               assertEquals(ReturnCode.INCLUDE, filter.filterKeyValue(aCell));
-
-               when(aCell.getQualifier()).thenReturn("column2".getBytes());
-               assertEquals(ReturnCode.INCLUDE, filter.filterKeyValue(aCell));
-               
-               // null empty column collection in REQUEST implies family level 
access
-               when(aCell.getQualifier()).thenReturn(null);
-               assertEquals(ReturnCode.INCLUDE, filter.filterKeyValue(aCell));
-               
-               // specific columns in cache should be allowed only if there is 
a match of family and column
-               cache.clear();
-               cache.put("family1", Sets.newHashSet("column11", "column12"));
-               cache.put("family2", Sets.newHashSet("column21", "column22"));
-               filter = new RangerAuthorizationFilter(cache);
+               Map<String, Set<String>> columnsAccessAllowed = new 
HashMap<String, Set<String>>();
+               String[] family7KnowGoodColumns = new String[] 
{"family7-column1", "family7-column2"};
+               columnsAccessAllowed.put("family7", 
ImmutableSet.copyOf(family7KnowGoodColumns));
+               String[] family8KnowGoodColumns = new String[] 
{"family8-column1", "family8-column2"};
+               columnsAccessAllowed.put("family8", 
ImmutableSet.copyOf(family8KnowGoodColumns));
 
-               when(aCell.getFamily()).thenReturn("family1".getBytes());
-               when(aCell.getQualifier()).thenReturn("column11".getBytes());
-               assertEquals(ReturnCode.INCLUDE, filter.filterKeyValue(aCell));
-               when(aCell.getQualifier()).thenReturn("column12".getBytes());
-               assertEquals(ReturnCode.INCLUDE, filter.filterKeyValue(aCell));
-               when(aCell.getQualifier()).thenReturn("column13".getBytes());
-               assertEquals(ReturnCode.NEXT_COL, filter.filterKeyValue(aCell));
-
-               when(aCell.getFamily()).thenReturn("family2".getBytes());
-               when(aCell.getQualifier()).thenReturn("column22".getBytes());
-               assertEquals(ReturnCode.INCLUDE, filter.filterKeyValue(aCell));
+               // auth session
+               AuthorizationSession session = createSessionMock();
+               RangerAuthorizationFilter filter = new 
RangerAuthorizationFilter(session, familiesAccessAllowed, familiesAccessDenied, 
familiesAccessIndeterminate, columnsAccessAllowed);
 
-               when(aCell.getFamily()).thenReturn("family3".getBytes());
-               when(aCell.getQualifier()).thenReturn("column11".getBytes());
-               assertEquals(ReturnCode.NEXT_COL, filter.filterKeyValue(aCell));
-               
-               // asking for family level access when one doesn't exist (colum 
collection for a family is not null/empty then it should get denied
-               when(aCell.getFamily()).thenReturn("family1".getBytes());
+               // evaluate access for various types of cases
+               Cell aCell = mock(Cell.class);
+               // families with know denied acess
+               for (String family : deniedFamilies) {
+                       when(aCell.getFamily()).thenReturn(family.getBytes());
+                       assertEquals(ReturnCode.NEXT_COL, 
filter.filterKeyValue(aCell));
+               }
+               // family that isn't in allowed and if cell does not have 
column then it should be denied
+               when(aCell.getFamily()).thenReturn("family7".getBytes());
                when(aCell.getQualifier()).thenReturn(null);
                assertEquals(ReturnCode.NEXT_COL, filter.filterKeyValue(aCell));
+               // families with known partial access
+               for (String column : family7KnowGoodColumns ) {
+                       
when(aCell.getQualifier()).thenReturn(column.getBytes());
+                       assertEquals(ReturnCode.INCLUDE, 
filter.filterKeyValue(aCell));
+               }
+               when(aCell.getFamily()).thenReturn("family8".getBytes());
+               for (String column : family8KnowGoodColumns ) {
+                       
when(aCell.getQualifier()).thenReturn(column.getBytes());
+                       assertEquals(ReturnCode.INCLUDE, 
filter.filterKeyValue(aCell));
+               }
+               // try some columns that are not in the cache
+               for (String column : new String[] { "family8-column3", 
"family8-column4"}) {
+                       
when(aCell.getQualifier()).thenReturn(column.getBytes());
+                       assertEquals(ReturnCode.NEXT_COL, 
filter.filterKeyValue(aCell));
+               }
+               // families with known allowed access - for these we need to 
doctor up the session
+               when(session.isAuthorized()).thenReturn(true);
+               for (String family : allowedFamilies) {
+                       when(aCell.getFamily()).thenReturn(family.getBytes());
+                       
when(aCell.getQualifier()).thenReturn("some-column".getBytes());
+                       assertEquals(ReturnCode.INCLUDE, 
filter.filterKeyValue(aCell));
+               }
+               when(session.isAuthorized()).thenReturn(false);
+               for (String family : indeterminateFamilies) {
+                       when(aCell.getFamily()).thenReturn(family.getBytes());
+                       
when(aCell.getQualifier()).thenReturn("some-column".getBytes());
+                       assertEquals(ReturnCode.NEXT_COL, 
filter.filterKeyValue(aCell));
+               }
        }
 
-       @Test
-       public void testFilterKeyValueCell_firewalling() throws IOException {
-               // null cache will deny everything.
-               RangerAuthorizationFilter filter = new 
RangerAuthorizationFilter(null);
-               Cell aCell = mock(Cell.class);
-               when(aCell.getFamily()).thenReturn("family1".getBytes());
-               when(aCell.getQualifier()).thenReturn("column1".getBytes());
-               assertEquals(ReturnCode.NEXT_COL, filter.filterKeyValue(aCell));
+       AuthorizationSession createSessionMock() {
+               AuthorizationSession session = mock(AuthorizationSession.class);
+               when(session.column(anyString())).thenReturn(session);
+               when(session.columnFamily(anyString())).thenReturn(session);
+               when(session.table(anyString())).thenReturn(session);
+               when(session.buildRequest()).thenReturn(session);
+               when(session.authorize()).thenReturn(session);
+               when(session.isAuthorized()).thenReturn(false); // by default 
the mock fails all auth requests
 
-               // non-null but empty cache should do the same
-               Map<String, Set<String>> cache = new HashMap<String, 
Set<String>>();
-               filter = new RangerAuthorizationFilter(cache);
-               assertEquals(ReturnCode.NEXT_COL, filter.filterKeyValue(aCell));
+               HbaseAuditHandler auditHandler = mock(HbaseAuditHandler.class);
+               session._auditHandler = auditHandler;
 
-               // Null family in request would get denied, too
-               when(aCell.getFamily()).thenReturn(null);
-               when(aCell.getQualifier()).thenReturn("column1".getBytes());
-               assertEquals(ReturnCode.NEXT_COL, filter.filterKeyValue(aCell));
+               return session;
        }
 }

Reply via email to