http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/hdfs-agent/src/main/java/org/apache/ranger/authorization/hadoop/RangerHdfsAuthorizerImpl.java
----------------------------------------------------------------------
diff --git 
a/hdfs-agent/src/main/java/org/apache/ranger/authorization/hadoop/RangerHdfsAuthorizerImpl.java
 
b/hdfs-agent/src/main/java/org/apache/ranger/authorization/hadoop/RangerHdfsAuthorizerImpl.java
new file mode 100644
index 0000000..9389875
--- /dev/null
+++ 
b/hdfs-agent/src/main/java/org/apache/ranger/authorization/hadoop/RangerHdfsAuthorizerImpl.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.authorization.hadoop;
+
+import static 
org.apache.ranger.authorization.hadoop.constants.RangerHadoopConstants.EXECUTE_ACCCESS_TYPE;
+import static 
org.apache.ranger.authorization.hadoop.constants.RangerHadoopConstants.READ_ACCCESS_TYPE;
+import static 
org.apache.ranger.authorization.hadoop.constants.RangerHadoopConstants.WRITE_ACCCESS_TYPE;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.INodeAttributeProvider;
+import org.apache.hadoop.hdfs.server.namenode.INodeAttributes;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+import org.apache.ranger.authorization.hadoop.config.RangerConfiguration;
+import org.apache.ranger.authorization.hadoop.constants.RangerHadoopConstants;
+import 
org.apache.ranger.authorization.hadoop.exceptions.RangerAccessControlException;
+import org.apache.ranger.authorization.utils.StringUtil;
+import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
+import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
+import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
+import org.apache.ranger.plugin.policyengine.RangerAccessResource;
+import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
+import org.apache.ranger.plugin.policyengine.RangerAccessResult;
+import org.apache.ranger.plugin.service.RangerBasePlugin;
+
+import com.google.common.collect.Sets;
+
+public class RangerHdfsAuthorizerImpl extends INodeAttributeProvider {
+       private static final Log LOG = 
LogFactory.getLog(RangerHdfsAuthorizerImpl.class);
+
+       private RangerHdfsPlugin           rangerPlugin            = null;
+       private Map<FsAction, Set<String>> access2ActionListMapper = new 
HashMap<FsAction, Set<String>>();
+
+       public RangerHdfsAuthorizerImpl() {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> 
RangerHdfsAuthorizer.RangerHdfsAuthorizer()");
+               }
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== 
RangerHdfsAuthorizer.RangerHdfsAuthorizer()");
+               }
+       }
+
+       public void start() {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> RangerHdfsAuthorizer.start()");
+               }
+
+               RangerHdfsPlugin plugin = new RangerHdfsPlugin();
+               plugin.init();
+
+               access2ActionListMapper.put(FsAction.NONE,          new 
HashSet<String>());
+               access2ActionListMapper.put(FsAction.ALL,           
Sets.newHashSet(READ_ACCCESS_TYPE, WRITE_ACCCESS_TYPE, EXECUTE_ACCCESS_TYPE));
+               access2ActionListMapper.put(FsAction.READ,          
Sets.newHashSet(READ_ACCCESS_TYPE));
+               access2ActionListMapper.put(FsAction.READ_WRITE,    
Sets.newHashSet(READ_ACCCESS_TYPE, WRITE_ACCCESS_TYPE));
+               access2ActionListMapper.put(FsAction.READ_EXECUTE,  
Sets.newHashSet(READ_ACCCESS_TYPE, EXECUTE_ACCCESS_TYPE));
+               access2ActionListMapper.put(FsAction.WRITE,         
Sets.newHashSet(WRITE_ACCCESS_TYPE));
+               access2ActionListMapper.put(FsAction.WRITE_EXECUTE, 
Sets.newHashSet(WRITE_ACCCESS_TYPE, EXECUTE_ACCCESS_TYPE));
+               access2ActionListMapper.put(FsAction.EXECUTE,       
Sets.newHashSet(EXECUTE_ACCCESS_TYPE));
+
+               rangerPlugin = plugin;
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== RangerHdfsAuthorizer.start()");
+               }
+       }
+
+       public void stop() {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> RangerHdfsAuthorizer.stop()");
+               }
+
+               RangerHdfsPlugin plugin = rangerPlugin;
+               rangerPlugin = null;
+
+               if(plugin != null) {
+                       plugin.cleanup();
+               }
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== RangerHdfsAuthorizer.stop()");
+               }
+       }
+
+       @Override
+       public INodeAttributes getAttributes(String fullPath, INodeAttributes 
inode) {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> RangerHdfsAuthorizer.getAttributes(" + 
fullPath + ")");
+               }
+
+               INodeAttributes ret = inode; // return default attributes
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== RangerHdfsAuthorizer.getAttributes(" + 
fullPath + "): " + ret);
+               }
+
+               return ret;
+       }
+
+       @Override
+       public INodeAttributes getAttributes(String[] pathElements, 
INodeAttributes inode) {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> 
RangerHdfsAuthorizer.getAttributes(pathElementsCount=" + (pathElements == null 
? 0 : pathElements.length) + ")");
+               }
+
+               INodeAttributes ret = inode; // return default attributes
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== 
RangerHdfsAuthorizer.getAttributes(pathElementsCount=" + (pathElements == null 
? 0 : pathElements.length) + "): " + ret);
+               }
+
+               return ret;
+       }
+
+       @Override
+       public AccessControlEnforcer 
getExternalAccessControlEnforcer(AccessControlEnforcer defaultEnforcer) {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> 
RangerHdfsAuthorizer.getExternalAccessControlEnforcer()");
+               }
+
+               RangerAccessControlEnforcer rangerAce = new 
RangerAccessControlEnforcer(defaultEnforcer);
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== 
RangerHdfsAuthorizer.getExternalAccessControlEnforcer()");
+               }
+
+               return rangerAce;
+       }
+
+
+       class RangerAccessControlEnforcer implements AccessControlEnforcer {
+               private INodeAttributeProvider.AccessControlEnforcer 
defaultEnforcer = null;
+
+               public RangerAccessControlEnforcer(AccessControlEnforcer 
defaultEnforcer) {
+                       if(LOG.isDebugEnabled()) {
+                               LOG.debug("==> 
RangerAccessControlEnforcer.RangerAccessControlEnforcer()");
+                       }
+
+                       this.defaultEnforcer = defaultEnforcer;
+
+                       if(LOG.isDebugEnabled()) {
+                               LOG.debug("<== 
RangerAccessControlEnforcer.RangerAccessControlEnforcer()");
+                       }
+               }
+
+               @Override
+               public void checkPermission(String fsOwner, String superGroup, 
UserGroupInformation ugi,
+                                                                       
INodeAttributes[] inodeAttrs, INode[] inodes, byte[][] pathByNameArr,
+                                                                       int 
snapshotId, String path, int ancestorIndex, boolean doCheckOwner,
+                                                                       
FsAction ancestorAccess, FsAction parentAccess, FsAction access,
+                                                                       
FsAction subAccess, boolean ignoreEmptyDir) throws AccessControlException {
+                       boolean                accessGranted = false;
+                       RangerHdfsPlugin       plugin        = rangerPlugin;
+                       RangerHdfsAuditHandler auditHandler  = null;
+                       String                 user          = ugi != null ? 
ugi.getShortUserName() : null;
+                       Set<String>            groups        = ugi != null ? 
Sets.newHashSet(ugi.getGroupNames()) : null;
+
+                       if(LOG.isDebugEnabled()) {
+                               LOG.debug("==> 
RangerAccessControlEnforcer.checkPermission("
+                                               + "fsOwner=" + fsOwner + "; 
superGroup=" + superGroup + ", inodesCount=" + (inodes != null ? inodes.length 
: 0)
+                                               + ", snapshotId=" + snapshotId 
+ ", user=" + user + ", path=" + path + ", ancestorIndex=" + ancestorIndex
+                                               + ", doCheckOwner="+ 
doCheckOwner + ", ancestorAccess=" + ancestorAccess + ", parentAccess=" + 
parentAccess
+                                               + ", access=" + access + ", 
subAccess=" + subAccess + ", ignoreEmptyDir=" + ignoreEmptyDir + ")");
+                       }
+
+                       try {
+                               if(plugin != null && 
!ArrayUtils.isEmpty(inodes)) {
+                                       auditHandler = new 
RangerHdfsAuditHandler(path);
+
+                                       if(ancestorIndex >= inodes.length) {
+                                               ancestorIndex = inodes.length - 
1;
+                                       }
+
+                                       for(; ancestorIndex >= 0 && 
inodes[ancestorIndex] == null; ancestorIndex--);
+
+                                       accessGranted = true;
+
+                                       INode ancestor = inodes.length > 
ancestorIndex && ancestorIndex >= 0 ? inodes[ancestorIndex] : null;
+                                       INode parent   = inodes.length > 1 ? 
inodes[inodes.length - 2] : null;
+                                       INode inode    = inodes[inodes.length - 
1];
+
+                                       boolean noAccessToCheck = access == 
null && parentAccess == null && ancestorAccess == null && subAccess == null;
+
+                                       if(noAccessToCheck) { // check for 
traverse (EXECUTE) access on the path (if path is a directory) or its parent 
(if path is a file)
+                                               INode           node        = 
null;
+                                               INodeAttributes nodeAttribs = 
null;
+
+                                               if(inode != null && 
inode.isDirectory()) {
+                                                       node        = inode;
+                                                       nodeAttribs = 
inodeAttrs.length > 0 ? inodeAttrs[inodeAttrs.length - 1] : null;
+                                               } else if(parent != null) {
+                                                       node        = parent;
+                                                       nodeAttribs = 
inodeAttrs.length > 1 ? inodeAttrs[inodeAttrs.length - 2] : null;
+                                               }
+
+                                               if(node != null) {
+                                                       accessGranted = 
isAccessAllowed(node, nodeAttribs, FsAction.EXECUTE, user, groups, fsOwner, 
superGroup, plugin, null);
+                                               }
+                                       }
+
+                                       // checkStickyBit
+                                       if (accessGranted && parentAccess != 
null && parentAccess.implies(FsAction.WRITE) && parent != null && inode != 
null) {
+                                               if (parent.getFsPermission() != 
null && parent.getFsPermission().getStickyBit()) {
+                                                   // user should be owner of 
the parent or the inode
+                                                   accessGranted = 
StringUtils.equals(parent.getUserName(), user) || 
StringUtils.equals(inode.getUserName(), user);
+                                               }
+                                       }
+
+                                       // checkAncestorAccess
+                                       if(accessGranted && ancestorAccess != 
null && ancestor != null) {
+                                               INodeAttributes ancestorAttribs 
= inodeAttrs.length > ancestorIndex ? inodeAttrs[ancestorIndex] : null;
+
+                                               accessGranted = 
isAccessAllowed(ancestor, ancestorAttribs, ancestorAccess, user, groups, 
fsOwner, superGroup, plugin, auditHandler);
+                                       }
+
+                                       // checkParentAccess
+                                       if(accessGranted && parentAccess != 
null && parent != null) {
+                                               INodeAttributes parentAttribs = 
inodeAttrs.length > 1 ? inodeAttrs[inodeAttrs.length - 2] : null;
+
+                                               accessGranted = 
isAccessAllowed(parent, parentAttribs, parentAccess, user, groups, fsOwner, 
superGroup, plugin, auditHandler);
+                                       }
+
+                                       // checkINodeAccess
+                                       if(accessGranted && access != null && 
inode != null) {
+                                               INodeAttributes inodeAttribs = 
inodeAttrs.length > 0 ? inodeAttrs[inodeAttrs.length - 1] : null;
+
+                                               accessGranted = 
isAccessAllowed(inode, inodeAttribs, access, user, groups, fsOwner, superGroup, 
plugin, auditHandler);
+                                       }
+
+                                       // checkSubAccess
+                                       if(accessGranted && subAccess != null 
&& inode != null && inode.isDirectory()) {
+                                               Stack<INodeDirectory> 
directories = new Stack<INodeDirectory>();
+
+                                               
for(directories.push(inode.asDirectory()); !directories.isEmpty(); ) {
+                                                       INodeDirectory      dir 
  = directories.pop();
+                                                       ReadOnlyList<INode> 
cList = dir.getChildrenList(snapshotId);
+
+                                                       if (!(cList.isEmpty() 
&& ignoreEmptyDir)) {
+                                                               INodeAttributes 
dirAttribs = dir.getSnapshotINode(snapshotId);
+
+                                                               accessGranted = 
isAccessAllowed(dir, dirAttribs, subAccess, user, groups, fsOwner, superGroup, 
plugin, auditHandler);
+
+                                                               if(! 
accessGranted) {
+                                                                       break;
+                                                               }
+                                                       }
+
+                                                       for(INode child : 
cList) {
+                                                               if 
(child.isDirectory()) {
+                                                                       
directories.push(child.asDirectory());
+                                                               }
+                                                       }
+                                               }
+                                       }
+
+                                       // checkOwnerAccess
+                                       if(accessGranted && doCheckOwner) {
+                                               INodeAttributes inodeAttribs = 
inodeAttrs.length > 0 ? inodeAttrs[inodeAttrs.length - 1] : null;
+                                               String          owner        = 
inodeAttribs != null ? inodeAttribs.getUserName() : null;
+
+                                               accessGranted = 
StringUtils.equals(user, owner);
+                                       }
+                               }
+
+                               if(! accessGranted && 
RangerHdfsPlugin.isHadoopAuthEnabled() && defaultEnforcer != null) {
+                                       try {
+                                               
defaultEnforcer.checkPermission(fsOwner, superGroup, ugi, inodeAttrs, inodes,
+                                                                               
                                pathByNameArr, snapshotId, path, ancestorIndex, 
doCheckOwner,
+                                                                               
                                ancestorAccess, parentAccess, access, 
subAccess, ignoreEmptyDir);
+
+                                               accessGranted = true;
+                                       } finally {
+                                               if(auditHandler != null) {
+                                                       FsAction action = 
access;
+
+                                                       if(action == null) {
+                                                               if(parentAccess 
!= null) {
+                                                                       action 
= parentAccess;
+                                                               } else 
if(ancestorAccess != null) {
+                                                                       action 
= ancestorAccess;
+                                                               } else 
if(subAccess != null) {
+                                                                       action 
= subAccess;
+                                                               } else {
+                                                                       action 
= FsAction.NONE;
+                                                               }
+                                                       }
+
+                                                       
auditHandler.logHadoopEvent(path, action, accessGranted);
+                                               }
+                                       }
+                               }
+
+                               if(! accessGranted) {
+                                       throw new 
RangerAccessControlException("Permission denied: principal{user=" + user + 
",groups: " + groups + "}, access=" + access + ", " + path) ;
+                               }
+                       } finally {
+                               if(auditHandler != null) {
+                                       auditHandler.flushAudit();
+                               }
+
+                               if(LOG.isDebugEnabled()) {
+                                       LOG.debug("<== 
RangerAccessControlEnforcer.checkPermission(" + path + ", " + access + ", 
user=" + user + ") : " + accessGranted);
+                               }
+                       }
+               }
+
+               private boolean isAccessAllowed(INode inode, INodeAttributes 
inodeAttribs, FsAction access, String user, Set<String> groups, String fsOwner, 
String superGroup, RangerHdfsPlugin plugin, RangerHdfsAuditHandler 
auditHandler) {
+                       boolean ret       = false;
+                       String  path      = inode != null ? 
inode.getFullPathName() : null;
+                       String  pathOwner = inodeAttribs != null ? 
inodeAttribs.getUserName() : null;
+
+                       if(pathOwner == null && inode != null) {
+                               pathOwner = inode.getUserName();
+                       }
+
+                       if 
(RangerHadoopConstants.HDFS_ROOT_FOLDER_PATH_ALT.equals(path)) {
+                               path = 
RangerHadoopConstants.HDFS_ROOT_FOLDER_PATH;
+                       }
+
+                       if(LOG.isDebugEnabled()) {
+                               LOG.debug("==> 
RangerAccessControlEnforcer.isAccessAllowed(" + path + ", " + access + ", " + 
user + ")");
+                       }
+
+                       Set<String> accessTypes = 
access2ActionListMapper.get(access);
+
+                       if(accessTypes == null) {
+                               
LOG.warn("RangerAccessControlEnforcer.isAccessAllowed(" + path + ", " + access 
+ ", " + user + "): no Ranger accessType found for " + access);
+
+                               accessTypes = 
access2ActionListMapper.get(FsAction.NONE);
+                       }
+
+                       for(String accessType : accessTypes) {
+                               RangerHdfsAccessRequest request = new 
RangerHdfsAccessRequest(path, pathOwner, access, accessType, user, groups);
+
+                               RangerAccessResult result = 
plugin.isAccessAllowed(request, auditHandler);
+
+                               if (result == null) {
+                                       LOG.error("RangerAccessControlEnforcer: 
Internal error: null RangerAccessResult object received back from 
isAccessAllowed()!");
+                               } else {
+                                       ret = result.getIsAllowed();
+
+                                       if (!ret) {
+                                               break;
+                                       }
+                               }
+                       }
+
+                       if(LOG.isDebugEnabled()) {
+                               LOG.debug("<== 
RangerAccessControlEnforcer.isAccessAllowed(" + path + ", " + access + ", " + 
user + "): " + ret);
+                       }
+
+                       return ret;
+               }
+       }
+}
+
+
+class RangerHdfsPlugin extends RangerBasePlugin {
+       private static boolean hadoopAuthEnabled = 
RangerHadoopConstants.RANGER_ADD_HDFS_PERMISSION_DEFAULT;
+
+       public RangerHdfsPlugin() {
+               super("hdfs", "hdfs");
+       }
+       
+       public void init() {
+               super.init();
+               
+               RangerHdfsPlugin.hadoopAuthEnabled = 
RangerConfiguration.getInstance().getBoolean(RangerHadoopConstants.RANGER_ADD_HDFS_PERMISSION_PROP,
 RangerHadoopConstants.RANGER_ADD_HDFS_PERMISSION_DEFAULT);
+       }
+
+       public static boolean isHadoopAuthEnabled() {
+               return RangerHdfsPlugin.hadoopAuthEnabled;
+       }
+}
+
+class RangerHdfsResource extends RangerAccessResourceImpl {
+       private static final String KEY_PATH = "path";
+
+
+       public RangerHdfsResource(String path, String owner) {
+               super.setValue(KEY_PATH, path);
+               super.setOwnerUser(owner);
+       }
+}
+
+class RangerHdfsAccessRequest extends RangerAccessRequestImpl {
+       public RangerHdfsAccessRequest(String path, String pathOwner, FsAction 
access, String accessType, String user, Set<String> groups) {
+               super.setResource(new RangerHdfsResource(path, pathOwner));
+               super.setAccessType(accessType);
+               super.setUser(user);
+               super.setUserGroups(groups);
+               super.setAccessTime(StringUtil.getUTCDate());
+               super.setClientIPAddress(getRemoteIp());
+               super.setAction(access.toString());
+       }
+       
+       private static String getRemoteIp() {
+               String ret = null ;
+               InetAddress ip = Server.getRemoteIp() ;
+               if (ip != null) {
+                       ret = ip.getHostAddress();
+               }
+               return ret ;
+       }
+}
+
+class RangerHdfsAuditHandler extends RangerDefaultAuditHandler {
+       private static final Log LOG = 
LogFactory.getLog(RangerHdfsAuditHandler.class);
+
+       private boolean         isAuditEnabled = false;
+       private AuthzAuditEvent auditEvent     = null;
+
+       private static final String    RangerModuleName = 
RangerConfiguration.getInstance().get(RangerHadoopConstants.AUDITLOG_RANGER_MODULE_ACL_NAME_PROP
 , RangerHadoopConstants.DEFAULT_RANGER_MODULE_ACL_NAME) ;
+       private static final String    HadoopModuleName = 
RangerConfiguration.getInstance().get(RangerHadoopConstants.AUDITLOG_HADOOP_MODULE_ACL_NAME_PROP
 , RangerHadoopConstants.DEFAULT_HADOOP_MODULE_ACL_NAME) ;
+       private static final String    excludeUserList  = 
RangerConfiguration.getInstance().get(RangerHadoopConstants.AUDITLOG_HDFS_EXCLUDE_LIST_PROP,
 RangerHadoopConstants.AUDITLOG_EMPTY_STRING) ;
+       private static HashSet<String> excludeUsers     = null ;
+
+       static {
+               if (excludeUserList != null && excludeUserList.trim().length() 
> 0) {
+                       excludeUsers = new HashSet<String>() ;
+                       for(String excludeUser : 
excludeUserList.trim().split(",")) {
+                               excludeUser = excludeUser.trim() ;
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Adding exclude user [" + 
excludeUser + "]");
+                               }
+                               excludeUsers.add(excludeUser) ;
+                               }
+               }
+       }
+
+       public RangerHdfsAuditHandler(String pathToBeValidated) {
+               auditEvent = new AuthzAuditEvent();
+               auditEvent.setResourcePath(pathToBeValidated);
+       }
+
+       @Override
+       public void processResult(RangerAccessResult result) {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> RangerHdfsAuditHandler.logAudit(" + 
result + ")");
+               }
+
+               if(! isAuditEnabled && result.getIsAudited()) {
+                       isAuditEnabled = true;
+               }
+
+               RangerAccessRequest  request      = result.getAccessRequest();
+//             RangerServiceDef     serviceDef   = result.getServiceDef();
+               RangerAccessResource resource     = request.getResource();
+               String               resourceType = resource != null ? 
resource.getLeafName() : null;
+               String               resourcePath = resource != null ? 
resource.getAsString() : null;
+
+               auditEvent.setUser(request.getUser());
+               auditEvent.setResourceType(resourceType) ;
+               auditEvent.setAccessType(request.getAction());
+               auditEvent.setAccessResult((short)(result.getIsAllowed() ? 1 : 
0));
+               auditEvent.setClientIP(request.getClientIPAddress());
+               auditEvent.setEventTime(request.getAccessTime());
+               auditEvent.setAclEnforcer(RangerModuleName);
+               auditEvent.setPolicyId(result.getPolicyId());
+               auditEvent.setRepositoryType(result.getServiceType());
+               auditEvent.setRepositoryName(result.getServiceName());
+               auditEvent.setResultReason(resourcePath);
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== RangerHdfsAuditHandler.logAudit(" + 
result + "): " + auditEvent);
+               }
+       }
+
+       public void logHadoopEvent(String path, FsAction action, boolean 
accessGranted) {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> RangerHdfsAuditHandler.logHadoopEvent(" 
+ path + ", " + action + ", " + accessGranted + ")");
+               }
+
+               auditEvent.setResultReason(path);
+               auditEvent.setAccessResult((short) (accessGranted ? 1 : 0));
+               auditEvent.setAccessType(action == null ? null : 
action.toString());
+               auditEvent.setAclEnforcer(HadoopModuleName);
+               auditEvent.setPolicyId(-1);
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== RangerHdfsAuditHandler.logHadoopEvent(" 
+ path + ", " + action + ", " + accessGranted + "): " + auditEvent);
+               }
+       }
+
+       public void flushAudit() {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> RangerHdfsAuditHandler.flushAudit(" + 
isAuditEnabled + ", " + auditEvent + ")");
+               }
+
+               if(isAuditEnabled && 
!StringUtils.isEmpty(auditEvent.getAccessType())) {
+                       String username = auditEvent.getUser();
+
+                       boolean skipLog = (username != null && excludeUsers != 
null && excludeUsers.contains(username)) ;
+
+                       if (! skipLog) {
+                               super.logAuthzAudit(auditEvent);
+                       }
+               }
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== RangerHdfsAuditHandler.flushAudit(" + 
isAuditEnabled + ", " + auditEvent + ")");
+               }
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/hive-agent/conf/ranger-hive-security.xml
----------------------------------------------------------------------
diff --git a/hive-agent/conf/ranger-hive-security.xml 
b/hive-agent/conf/ranger-hive-security.xml
index 3a5fc54..515b1fd 100644
--- a/hive-agent/conf/ranger-hive-security.xml
+++ b/hive-agent/conf/ranger-hive-security.xml
@@ -65,6 +65,22 @@
                </description>
        </property>
 
+               <property>
+               <name>ranger.policy.rest.client.connection.timeoutMs</name>
+               <value>120000</value>
+               <description>
+                       RangerRestClient Connection Timeout in Milli Seconds
+               </description>
+       </property>
+       
+       <property>
+               <name>ranger.policy.rest.client.read.timeoutMs</name>
+               <value>30000</value>
+               <description>
+                       RangerRestClient read Timeout in Milli Seconds
+               </description>
+       </property>
+       
        <property>
                <name>xasecure.hive.update.xapolicies.on.grant.revoke</name>
                <value>true</value>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/hive-agent/src/main/java/com/xasecure/authorization/hive/authorizer/XaSecureHiveAuthorizerFactory.java
----------------------------------------------------------------------
diff --git 
a/hive-agent/src/main/java/com/xasecure/authorization/hive/authorizer/XaSecureHiveAuthorizerFactory.java
 
b/hive-agent/src/main/java/com/xasecure/authorization/hive/authorizer/XaSecureHiveAuthorizerFactory.java
index e941704..079616c 100644
--- 
a/hive-agent/src/main/java/com/xasecure/authorization/hive/authorizer/XaSecureHiveAuthorizerFactory.java
+++ 
b/hive-agent/src/main/java/com/xasecure/authorization/hive/authorizer/XaSecureHiveAuthorizerFactory.java
@@ -18,7 +18,7 @@
  */
 package com.xasecure.authorization.hive.authorizer;
 
-import 
org.apache.ranger.authorization.hive.authorizer.RangerHiveAuthorizerFactory;
+import 
org.apache.ranger.authorization.hive.authorizer.RangerHiveAuthorizerFactoryImpl;
 
 /**
  * This class exists only to provide for seamless upgrade/downgrade 
capabilities.  Coprocessor name is in hbase config files in /etc/.../conf which
@@ -28,6 +28,6 @@ import 
org.apache.ranger.authorization.hive.authorizer.RangerHiveAuthorizerFacto
  * This class is final because if one needs to customize coprocessor it is 
expected that RangerAuthorizationCoprocessor would be modified/extended as that 
is
  * the "real" coprocessor!  This class, hence, should NEVER be more than an 
EMPTY shell!
  */
-public final class XaSecureHiveAuthorizerFactory extends 
RangerHiveAuthorizerFactory {
+public final class XaSecureHiveAuthorizerFactory extends 
RangerHiveAuthorizerFactoryImpl {
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizerFactory.java
----------------------------------------------------------------------
diff --git 
a/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizerFactory.java
 
b/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizerFactory.java
deleted file mode 100644
index bd410b7..0000000
--- 
a/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizerFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
- package org.apache.ranger.authorization.hive.authorizer;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider;
-import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizer;
-import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizerFactory;
-import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzPluginException;
-import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveMetastoreClientFactory;
-import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext;
-
-public class RangerHiveAuthorizerFactory implements HiveAuthorizerFactory {
-       @Override
-       public HiveAuthorizer createHiveAuthorizer(HiveMetastoreClientFactory 
metastoreClientFactory,
-                                                                               
           HiveConf                   conf,
-                                                                               
           HiveAuthenticationProvider hiveAuthenticator,
-                                                                               
           HiveAuthzSessionContext    sessionContext)
-                                                                               
                           throws HiveAuthzPluginException {
-               return new RangerHiveAuthorizer(metastoreClientFactory, conf, 
hiveAuthenticator, sessionContext);
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizerFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizerFactoryImpl.java
 
b/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizerFactoryImpl.java
new file mode 100644
index 0000000..937aaed
--- /dev/null
+++ 
b/hive-agent/src/main/java/org/apache/ranger/authorization/hive/authorizer/RangerHiveAuthorizerFactoryImpl.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ package org.apache.ranger.authorization.hive.authorizer;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizer;
+import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizerFactory;
+import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzPluginException;
+import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveMetastoreClientFactory;
+import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext;
+
+public class RangerHiveAuthorizerFactoryImpl implements HiveAuthorizerFactory {
+       
+       @Override
+       public HiveAuthorizer createHiveAuthorizer(HiveMetastoreClientFactory 
metastoreClientFactory,
+                                                                               
           HiveConf                   conf,
+                                                                               
           HiveAuthenticationProvider hiveAuthenticator,
+                                                                               
           HiveAuthzSessionContext    sessionContext)
+                                                                               
                           throws HiveAuthzPluginException {
+               return new RangerHiveAuthorizer(metastoreClientFactory, conf, 
hiveAuthenticator, sessionContext);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/knox-agent/src/main/java/com/xasecure/pdp/knox/filter/XASecurePDPKnoxFilter.java
----------------------------------------------------------------------
diff --git 
a/knox-agent/src/main/java/com/xasecure/pdp/knox/filter/XASecurePDPKnoxFilter.java
 
b/knox-agent/src/main/java/com/xasecure/pdp/knox/filter/XASecurePDPKnoxFilter.java
deleted file mode 100644
index 6b9d6fd..0000000
--- 
a/knox-agent/src/main/java/com/xasecure/pdp/knox/filter/XASecurePDPKnoxFilter.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.xasecure.pdp.knox.filter;
-
-import org.apache.ranger.authorization.knox.RangerPDPKnoxFilter;
-
-public class XASecurePDPKnoxFilter extends RangerPDPKnoxFilter {
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/plugin-kafka/conf/ranger-kafka-security.xml
----------------------------------------------------------------------
diff --git a/plugin-kafka/conf/ranger-kafka-security.xml 
b/plugin-kafka/conf/ranger-kafka-security.xml
index 2c06f5c..f9c8d5f 100644
--- a/plugin-kafka/conf/ranger-kafka-security.xml
+++ b/plugin-kafka/conf/ranger-kafka-security.xml
@@ -80,4 +80,5 @@
                        RangerRestClient read Timeout in Milli Seconds
                </description>
        </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/plugin-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/plugin-kafka/pom.xml b/plugin-kafka/pom.xml
index afee47d..e14e48c 100644
--- a/plugin-kafka/pom.xml
+++ b/plugin-kafka/pom.xml
@@ -47,5 +47,10 @@
                        <artifactId>kafka_2.10</artifactId>
                        <version>${kafka.version}</version>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-hdfs</artifactId>
+                       <version>${hadoop.version}</version>
+               </dependency>
        </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git 
a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
deleted file mode 100644
index dbb2723..0000000
--- 
a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.authorization.kafka.authorizer;
-
-import java.io.IOException;
-import java.security.Principal;
-import java.util.Date;
-
-import javax.security.auth.Subject;
-
-import kafka.security.auth.Acl;
-import kafka.security.auth.Authorizer;
-import kafka.security.auth.KafkaPrincipal;
-import kafka.security.auth.Operation;
-import kafka.security.auth.Resource;
-import kafka.security.auth.ResourceType;
-import kafka.server.KafkaConfig;
-import kafka.common.security.LoginManager;
-import kafka.network.RequestChannel.Session;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.ranger.audit.provider.MiscUtil;
-import org.apache.ranger.authorization.utils.StringUtil;
-import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
-import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
-import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
-import org.apache.ranger.plugin.policyengine.RangerAccessResult;
-import org.apache.ranger.plugin.service.RangerBasePlugin;
-
-import scala.collection.immutable.HashSet;
-import scala.collection.immutable.Set;
-
-public class RangerKafkaAuthorizer implements Authorizer {
-       private static final Log logger = LogFactory
-                       .getLog(RangerKafkaAuthorizer.class);
-
-       public static final String KEY_TOPIC = "topic";
-       public static final String KEY_CLUSTER = "cluster";
-       public static final String KEY_CONSUMER_GROUP = "consumer_group";
-
-       public static final String ACCESS_TYPE_READ = "consume";
-       public static final String ACCESS_TYPE_WRITE = "publish";
-       public static final String ACCESS_TYPE_CREATE = "create";
-       public static final String ACCESS_TYPE_DELETE = "delete";
-       public static final String ACCESS_TYPE_CONFIGURE = "configure";
-       public static final String ACCESS_TYPE_DESCRIBE = "describe";
-       public static final String ACCESS_TYPE_KAFKA_ADMIN = "kafka_admin";
-
-       private static volatile RangerBasePlugin rangerPlugin = null;
-       long lastLogTime = 0;
-       int errorLogFreq = 30000; // Log after every 30 seconds
-
-       public RangerKafkaAuthorizer() {
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see 
kafka.security.auth.Authorizer#initialize(kafka.server.KafkaConfig)
-        */
-       @Override
-       public void initialize(KafkaConfig kafkaConfig) {
-
-               if (rangerPlugin == null) {
-                       try {
-                               Subject subject = LoginManager.subject();
-                               UserGroupInformation ugi = MiscUtil
-                                               .createUGIFromSubject(subject);
-                               if (ugi != null) {
-                                       MiscUtil.setUGILoginUser(ugi, subject);
-                               }
-                               logger.info("LoginUser=" + 
MiscUtil.getUGILoginUser());
-                       } catch (Throwable t) {
-                               logger.error("Error getting principal.", t);
-                       }
-
-                       rangerPlugin = new RangerBasePlugin("kafka", "kafka");
-                       logger.info("Calling plugin.init()");
-                       rangerPlugin.init();
-
-                       RangerDefaultAuditHandler auditHandler = new 
RangerDefaultAuditHandler();
-                       rangerPlugin.setResultProcessor(auditHandler);
-               }
-       }
-
-       @Override
-       public boolean authorize(Session session, Operation operation,
-                       Resource resource) {
-
-               if (rangerPlugin == null) {
-                       MiscUtil.logErrorMessageByInterval(logger,
-                                       "Authorizer is still not initialized");
-                       return false;
-               }
-
-               // TODO: If resource type if consumer group, then allow it by 
default
-               if 
(resource.resourceType().equals(ResourceType.CONSUMER_GROUP)) {
-                       return true;
-               }
-
-               String userName = null;
-               if (session.principal() != null) {
-                       userName = session.principal().getName();
-                       userName = StringUtils.substringBefore(userName, "/");
-                       userName = StringUtils.substringBefore(userName, "@");
-               }
-               java.util.Set<String> userGroups = MiscUtil
-                               .getGroupsForRequestUser(userName);
-               String ip = session.host();
-
-               Date eventTime = StringUtil.getUTCDate();
-               String accessType = mapToRangerAccessType(operation);
-               boolean validationFailed = false;
-               String validationStr = "";
-
-               if (accessType == null) {
-                       if (MiscUtil.logErrorMessageByInterval(logger,
-                                       "Unsupported access type. operation=" + 
operation)) {
-                               logger.fatal("Unsupported access type. 
session=" + session
-                                               + ", operation=" + operation + 
", resource=" + resource);
-                       }
-                       validationFailed = true;
-                       validationStr += "Unsupported access type. operation=" 
+ operation;
-               }
-               String action = accessType;
-
-               RangerAccessRequestImpl rangerRequest = new 
RangerAccessRequestImpl();
-               rangerRequest.setUser(userName);
-               rangerRequest.setUserGroups(userGroups);
-               rangerRequest.setClientIPAddress(ip);
-               rangerRequest.setAccessTime(eventTime);
-
-               RangerAccessResourceImpl rangerResource = new 
RangerAccessResourceImpl();
-               rangerRequest.setResource(rangerResource);
-               rangerRequest.setAccessType(accessType);
-               rangerRequest.setAction(action);
-               rangerRequest.setRequestData(resource.name());
-
-               if (resource.resourceType().equals(ResourceType.TOPIC)) {
-                       rangerResource.setValue(KEY_TOPIC, resource.name());
-               } else if 
(resource.resourceType().equals(ResourceType.CLUSTER)) {
-                       // CLUSTER should go as null
-                       // rangerResource.setValue(KEY_CLUSTER, 
resource.name());
-               } else if 
(resource.resourceType().equals(ResourceType.CONSUMER_GROUP)) {
-                       rangerResource.setValue(KEY_CONSUMER_GROUP, 
resource.name());
-               } else {
-                       logger.fatal("Unsupported resourceType=" + 
resource.resourceType());
-                       validationFailed = true;
-               }
-
-               boolean returnValue = true;
-               if (validationFailed) {
-                       MiscUtil.logErrorMessageByInterval(logger, validationStr
-                                       + ", request=" + rangerRequest);
-                       returnValue = false;
-               } else {
-
-                       try {
-                               RangerAccessResult result = rangerPlugin
-                                               .isAccessAllowed(rangerRequest);
-                               if (result == null) {
-                                       logger.error("Ranger Plugin returned 
null. Returning false");
-                                       returnValue = false;
-                               } else {
-                                       returnValue = result.getIsAllowed();
-                               }
-                       } catch (Throwable t) {
-                               logger.error("Error while calling 
isAccessAllowed(). request="
-                                               + rangerRequest, t);
-                       }
-               }
-               if (logger.isDebugEnabled()) {
-                       logger.debug("rangerRequest=" + rangerRequest + ", 
return="
-                                       + returnValue);
-               }
-               return returnValue;
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see
-        * 
kafka.security.auth.Authorizer#addAcls(scala.collection.immutable.Set,
-        * kafka.security.auth.Resource)
-        */
-       @Override
-       public void addAcls(Set<Acl> acls, Resource resource) {
-               logger.error("addAcls() is not supported by Ranger for Kafka");
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see
-        * 
kafka.security.auth.Authorizer#removeAcls(scala.collection.immutable.Set,
-        * kafka.security.auth.Resource)
-        */
-       @Override
-       public boolean removeAcls(Set<Acl> acls, Resource resource) {
-               logger.error("removeAcls() is not supported by Ranger for 
Kafka");
-               return false;
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see
-        * 
kafka.security.auth.Authorizer#removeAcls(kafka.security.auth.Resource)
-        */
-       @Override
-       public boolean removeAcls(Resource resource) {
-               logger.error("removeAcls() is not supported by Ranger for 
Kafka");
-               return false;
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see 
kafka.security.auth.Authorizer#getAcls(kafka.security.auth.Resource)
-        */
-       @Override
-       public Set<Acl> getAcls(Resource resource) {
-               Set<Acl> aclList = new HashSet<Acl>();
-               logger.error("getAcls() is not supported by Ranger for Kafka");
-
-               return aclList;
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see
-        * 
kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
-        * )
-        */
-       @Override
-       public Set<Acl> getAcls(KafkaPrincipal principal) {
-               Set<Acl> aclList = new HashSet<Acl>();
-               logger.error("getAcls() is not supported by Ranger for Kafka");
-               return aclList;
-       }
-
-       /**
-        * @param operation
-        * @return
-        */
-       private String mapToRangerAccessType(Operation operation) {
-               if (operation.equals(Operation.READ)) {
-                       return ACCESS_TYPE_READ;
-               } else if (operation.equals(Operation.WRITE)) {
-                       return ACCESS_TYPE_WRITE;
-               } else if (operation.equals(Operation.ALTER)) {
-                       return ACCESS_TYPE_CONFIGURE;
-               } else if (operation.equals(Operation.DESCRIBE)) {
-                       return ACCESS_TYPE_DESCRIBE;
-               } else if (operation.equals(Operation.CLUSTER_ACTION)) {
-                       return ACCESS_TYPE_KAFKA_ADMIN;
-               }
-               return null;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizerImpl.java
----------------------------------------------------------------------
diff --git 
a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizerImpl.java
 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizerImpl.java
new file mode 100644
index 0000000..608371a
--- /dev/null
+++ 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizerImpl.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.authorization.kafka.authorizer;
+
+import java.util.Date;
+import javax.security.auth.Subject;
+
+import kafka.security.auth.Acl;
+import kafka.security.auth.Authorizer;
+import kafka.security.auth.KafkaPrincipal;
+import kafka.security.auth.Operation;
+import kafka.security.auth.Resource;
+import kafka.security.auth.ResourceType;
+import kafka.server.KafkaConfig;
+import kafka.common.security.LoginManager;
+import kafka.network.RequestChannel.Session;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ranger.audit.provider.MiscUtil;
+import org.apache.ranger.authorization.utils.StringUtil;
+import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
+import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
+import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
+import org.apache.ranger.plugin.policyengine.RangerAccessResult;
+import org.apache.ranger.plugin.service.RangerBasePlugin;
+
+import scala.collection.immutable.HashSet;
+import scala.collection.immutable.Set;
+
+public class RangerKafkaAuthorizerImpl implements Authorizer {
+       private static final Log logger = LogFactory
+                       .getLog(RangerKafkaAuthorizerImpl.class);
+
+       public static final String KEY_TOPIC = "topic";
+       public static final String KEY_CLUSTER = "cluster";
+       public static final String KEY_CONSUMER_GROUP = "consumer_group";
+
+       public static final String ACCESS_TYPE_READ = "consume";
+       public static final String ACCESS_TYPE_WRITE = "publish";
+       public static final String ACCESS_TYPE_CREATE = "create";
+       public static final String ACCESS_TYPE_DELETE = "delete";
+       public static final String ACCESS_TYPE_CONFIGURE = "configure";
+       public static final String ACCESS_TYPE_DESCRIBE = "describe";
+       public static final String ACCESS_TYPE_KAFKA_ADMIN = "kafka_admin";
+
+       private static volatile RangerBasePlugin rangerPlugin = null;
+       long lastLogTime = 0;
+       int errorLogFreq = 30000; // Log after every 30 seconds
+
+       public RangerKafkaAuthorizerImpl() {
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see 
kafka.security.auth.Authorizer#initialize(kafka.server.KafkaConfig)
+        */
+       @Override
+       public void initialize(KafkaConfig kafkaConfig) {
+
+               if (rangerPlugin == null) {
+                       try {
+                               Subject subject = LoginManager.subject();
+                               UserGroupInformation ugi = MiscUtil
+                                               .createUGIFromSubject(subject);
+                               if (ugi != null) {
+                                       MiscUtil.setUGILoginUser(ugi, subject);
+                               }
+                               logger.info("LoginUser=" + 
MiscUtil.getUGILoginUser());
+                       } catch (Throwable t) {
+                               logger.error("Error getting principal.", t);
+                       }
+
+                       rangerPlugin = new RangerBasePlugin("kafka", "kafka");
+                       logger.info("Calling plugin.init()");
+                       rangerPlugin.init();
+
+                       RangerDefaultAuditHandler auditHandler = new 
RangerDefaultAuditHandler();
+                       rangerPlugin.setResultProcessor(auditHandler);
+               }
+       }
+
+       @Override
+       public boolean authorize(Session session, Operation operation, Resource 
resource) {
+
+               if (rangerPlugin == null) {
+                       MiscUtil.logErrorMessageByInterval(logger,
+                                       "Authorizer is still not initialized");
+                       return false;
+               }
+
+               // TODO: If resource type if consumer group, then allow it by 
default
+               if 
(resource.resourceType().equals(ResourceType.CONSUMER_GROUP)) {
+                       return true;
+               }
+
+               String userName = null;
+               if (session.principal() != null) {
+                       userName = session.principal().getName();
+                       userName = StringUtils.substringBefore(userName, "/");
+                       userName = StringUtils.substringBefore(userName, "@");
+               }
+               java.util.Set<String> userGroups = MiscUtil
+                               .getGroupsForRequestUser(userName);
+               String ip = session.host();
+
+               Date eventTime = StringUtil.getUTCDate();
+               String accessType = mapToRangerAccessType(operation);
+               boolean validationFailed = false;
+               String validationStr = "";
+
+               if (accessType == null) {
+                       if (MiscUtil.logErrorMessageByInterval(logger,
+                                       "Unsupported access type. operation=" + 
operation)) {
+                               logger.fatal("Unsupported access type. 
session=" + session
+                                               + ", operation=" + operation + 
", resource=" + resource);
+                       }
+                       validationFailed = true;
+                       validationStr += "Unsupported access type. operation=" 
+ operation;
+               }
+               String action = accessType;
+
+               RangerAccessRequestImpl rangerRequest = new 
RangerAccessRequestImpl();
+               rangerRequest.setUser(userName);
+               rangerRequest.setUserGroups(userGroups);
+               rangerRequest.setClientIPAddress(ip);
+               rangerRequest.setAccessTime(eventTime);
+
+               RangerAccessResourceImpl rangerResource = new 
RangerAccessResourceImpl();
+               rangerRequest.setResource(rangerResource);
+               rangerRequest.setAccessType(accessType);
+               rangerRequest.setAction(action);
+               rangerRequest.setRequestData(resource.name());
+
+               if (resource.resourceType().equals(ResourceType.TOPIC)) {
+                       rangerResource.setValue(KEY_TOPIC, resource.name());
+               } else if 
(resource.resourceType().equals(ResourceType.CLUSTER)) {
+                       // CLUSTER should go as null
+                       // rangerResource.setValue(KEY_CLUSTER, 
resource.name());
+               } else if 
(resource.resourceType().equals(ResourceType.CONSUMER_GROUP)) {
+                       rangerResource.setValue(KEY_CONSUMER_GROUP, 
resource.name());
+               } else {
+                       logger.fatal("Unsupported resourceType=" + 
resource.resourceType());
+                       validationFailed = true;
+               }
+
+               boolean returnValue = true;
+               if (validationFailed) {
+                       MiscUtil.logErrorMessageByInterval(logger, validationStr
+                                       + ", request=" + rangerRequest);
+                       returnValue = false;
+               } else {
+
+                       try {
+                               RangerAccessResult result = rangerPlugin
+                                               .isAccessAllowed(rangerRequest);
+                               if (result == null) {
+                                       logger.error("Ranger Plugin returned 
null. Returning false");
+                                       returnValue = false;
+                               } else {
+                                       returnValue = result.getIsAllowed();
+                               }
+                       } catch (Throwable t) {
+                               logger.error("Error while calling 
isAccessAllowed(). request="
+                                               + rangerRequest, t);
+                       }
+               }
+               if (logger.isDebugEnabled()) {
+                       logger.debug("rangerRequest=" + rangerRequest + ", 
return="
+                                       + returnValue);
+               }
+               return returnValue;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * 
kafka.security.auth.Authorizer#addAcls(scala.collection.immutable.Set,
+        * kafka.security.auth.Resource)
+        */
+       @Override
+       public void addAcls(Set<Acl> acls, Resource resource) {
+               logger.error("addAcls() is not supported by Ranger for Kafka");
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * 
kafka.security.auth.Authorizer#removeAcls(scala.collection.immutable.Set,
+        * kafka.security.auth.Resource)
+        */
+       @Override
+       public boolean removeAcls(Set<Acl> acls, Resource resource) {
+               logger.error("removeAcls() is not supported by Ranger for 
Kafka");
+               return false;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * 
kafka.security.auth.Authorizer#removeAcls(kafka.security.auth.Resource)
+        */
+       @Override
+       public boolean removeAcls(Resource resource) {
+               logger.error("removeAcls() is not supported by Ranger for 
Kafka");
+               return false;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see 
kafka.security.auth.Authorizer#getAcls(kafka.security.auth.Resource)
+        */
+       @Override
+       public Set<Acl> getAcls(Resource resource) {
+               Set<Acl> aclList = new HashSet<Acl>();
+               logger.error("getAcls() is not supported by Ranger for Kafka");
+
+               return aclList;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * 
kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
+        * )
+        */
+       @Override
+       public Set<Acl> getAcls(KafkaPrincipal principal) {
+               Set<Acl> aclList = new HashSet<Acl>();
+               logger.error("getAcls() is not supported by Ranger for Kafka");
+               return aclList;
+       }
+
+       /**
+        * @param operation
+        * @return
+        */
+       private String mapToRangerAccessType(Operation operation) {
+               if (operation.equals(Operation.READ)) {
+                       return ACCESS_TYPE_READ;
+               } else if (operation.equals(Operation.WRITE)) {
+                       return ACCESS_TYPE_WRITE;
+               } else if (operation.equals(Operation.ALTER)) {
+                       return ACCESS_TYPE_CONFIGURE;
+               } else if (operation.equals(Operation.DESCRIBE)) {
+                       return ACCESS_TYPE_DESCRIBE;
+               } else if (operation.equals(Operation.CLUSTER_ACTION)) {
+                       return ACCESS_TYPE_KAFKA_ADMIN;
+               }
+               return null;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/plugin-kms/conf/ranger-kms-security.xml
----------------------------------------------------------------------
diff --git a/plugin-kms/conf/ranger-kms-security.xml 
b/plugin-kms/conf/ranger-kms-security.xml
index a22e6cb..58f7076 100755
--- a/plugin-kms/conf/ranger-kms-security.xml
+++ b/plugin-kms/conf/ranger-kms-security.xml
@@ -72,12 +72,13 @@
                        RangerRestClient Connection Timeout in Milli Seconds
                </description>
        </property>
-       
+
        <property>
                <name>ranger.plugin.kms.policy.rest.client.read.timeoutMs</name>
                <value>30000</value>
                <description>
                        RangerRestClient read Timeout in Milli Seconds
                </description>
-       </property>     
+       </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/plugin-yarn/src/main/java/org/apache/ranger/authorization/yarn/authorizer/RangerYarnAuthorizer.java
----------------------------------------------------------------------
diff --git 
a/plugin-yarn/src/main/java/org/apache/ranger/authorization/yarn/authorizer/RangerYarnAuthorizer.java
 
b/plugin-yarn/src/main/java/org/apache/ranger/authorization/yarn/authorizer/RangerYarnAuthorizer.java
deleted file mode 100644
index ab9b7a9..0000000
--- 
a/plugin-yarn/src/main/java/org/apache/ranger/authorization/yarn/authorizer/RangerYarnAuthorizer.java
+++ /dev/null
@@ -1,354 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.authorization.yarn.authorizer;
-
-import java.net.InetAddress;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.yarn.security.*;
-import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
-import org.apache.ranger.audit.model.AuthzAuditEvent;
-import org.apache.ranger.authorization.hadoop.config.RangerConfiguration;
-import org.apache.ranger.authorization.hadoop.constants.RangerHadoopConstants;
-import org.apache.ranger.authorization.utils.StringUtil;
-import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
-import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
-import org.apache.ranger.plugin.policyengine.RangerAccessResult;
-import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
-import org.apache.ranger.plugin.service.RangerBasePlugin;
-
-import com.google.common.collect.Sets;
-
-public class RangerYarnAuthorizer extends YarnAuthorizationProvider {
-       public static final String ACCESS_TYPE_ADMIN_QUEUE = "admin-queue";
-       public static final String ACCESS_TYPE_SUBMIT_APP  = "submit-app";
-       public static final String ACCESS_TYPE_ADMIN       = "admin";
-
-       private static boolean yarnAuthEnabled = 
RangerHadoopConstants.RANGER_ADD_YARN_PERMISSION_DEFAULT;
-
-       private static final Log LOG = 
LogFactory.getLog(RangerYarnAuthorizer.class);
-
-       private static volatile RangerYarnPlugin yarnPlugin = null;
-
-       private AccessControlList admins = null;
-       private Map<PrivilegedEntity, Map<AccessType, AccessControlList>> 
yarnAcl = new HashMap<PrivilegedEntity, Map<AccessType, AccessControlList>>();
-
-       @Override
-       public void init(Configuration conf) {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> RangerYarnAuthorizer.init()");
-               }
-
-               RangerYarnPlugin plugin = yarnPlugin;
-
-               if(plugin == null) {
-                       synchronized(RangerYarnAuthorizer.class) {
-                               plugin = yarnPlugin;
-
-                               if(plugin == null) {
-                                       plugin = new RangerYarnPlugin();
-                                       plugin.init();
-                                       
-                                       yarnPlugin = plugin;
-                               }
-                       }
-               }
-
-               RangerYarnAuthorizer.yarnAuthEnabled = 
RangerConfiguration.getInstance().getBoolean(RangerHadoopConstants.RANGER_ADD_YARN_PERMISSION_PROP,
 RangerHadoopConstants.RANGER_ADD_YARN_PERMISSION_DEFAULT);
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== RangerYarnAuthorizer.init()");
-               }
-       }
-
-       @Override
-       public boolean checkPermission(AccessType accessType, PrivilegedEntity 
entity, UserGroupInformation ugi) {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> RangerYarnAuthorizer.checkPermission(" + 
accessType + ", " + toString(entity) + ", " + ugi + ")");
-               }
-
-               boolean                ret          = false;
-               RangerYarnPlugin       plugin       = yarnPlugin;
-               RangerYarnAuditHandler auditHandler = null;
-               RangerAccessResult     result       = null;
-
-               if(plugin != null) {
-                       RangerYarnAccessRequest request = new 
RangerYarnAccessRequest(entity, getRangerAccessType(accessType), 
accessType.name(), ugi);
-
-                       auditHandler = new RangerYarnAuditHandler();
-
-                       result = plugin.isAccessAllowed(request, auditHandler);
-               }
-
-               if(RangerYarnAuthorizer.yarnAuthEnabled && (result == null || 
!result.getIsAccessDetermined())) {
-                       ret = isAllowedByYarnAcl(accessType, entity, ugi, 
auditHandler);
-               } else {
-                       ret = result == null ? false : result.getIsAllowed();
-               }
-
-               if(auditHandler != null) {
-                       auditHandler.flushAudit();
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== RangerYarnAuthorizer.checkPermission(" + 
accessType + ", " + toString(entity) + ", " + ugi + "): " + ret);
-               }
-
-               return ret;
-       }
-
-       @Override
-       public boolean isAdmin(UserGroupInformation ugi) {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> RangerYarnAuthorizer.isAdmin(" + ugi + 
")");
-               }
-
-               boolean ret = false;
-               
-               AccessControlList admins = this.admins;
-
-               if(admins != null) {
-                       ret = admins.isUserAllowed(ugi);
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== RangerYarnAuthorizer.isAdmin(" + ugi + 
"): " + ret);
-               }
-
-               return ret;
-       }
-
-       @Override
-       public void setAdmins(AccessControlList acl, UserGroupInformation ugi) {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> RangerYarnAuthorizer.setAdmins(" + acl + 
", " + ugi + ")");
-               }
-
-               admins = acl;
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== RangerYarnAuthorizer.setAdmins(" + acl + 
", " + ugi + ")");
-               }
-       }
-
-       @Override
-       public void setPermission(PrivilegedEntity entity, Map<AccessType, 
AccessControlList> permission, UserGroupInformation ugi) {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> RangerYarnAuthorizer.setPermission(" + 
toString(entity) + ", " + permission + ", " + ugi + ")");
-               }
-
-               yarnAcl.put(entity, permission);
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== RangerYarnAuthorizer.setPermission(" + 
toString(entity) + ", " + permission + ", " + ugi + ")");
-               }
-       }
-
-       public boolean isAllowedByYarnAcl(AccessType accessType, 
PrivilegedEntity entity, UserGroupInformation ugi, RangerYarnAuditHandler 
auditHandler) {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> 
RangerYarnAuthorizer.isAllowedByYarnAcl(" + accessType + ", " + 
toString(entity) + ", " + ugi + ")");
-               }
-
-               boolean ret = false;
-
-               for(Map.Entry<PrivilegedEntity, Map<AccessType, 
AccessControlList>> e : yarnAcl.entrySet()) {
-                       PrivilegedEntity                   aclEntity         = 
e.getKey();
-                       Map<AccessType, AccessControlList> entityPermissions = 
e.getValue();
-
-                       AccessControlList acl = entityPermissions == null ? 
null : entityPermissions.get(accessType);
-
-                       if(acl == null || !acl.isUserAllowed(ugi)) {
-                               continue;
-                       }
-
-                       if(! isSelfOrChildOf(entity, aclEntity)) {
-                               continue;
-                       }
-
-                       ret = true;
-
-                       break;
-               }
-
-               if(auditHandler != null) {
-                       auditHandler.logYarnAclEvent(ret);
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== 
RangerYarnAuthorizer.isAllowedByYarnAcl(" + accessType + ", " + 
toString(entity) + ", " + ugi + "): " + ret);
-               }
-
-               return ret;
-       }
-
-       private static String getRangerAccessType(AccessType accessType) {
-               String ret = null;
-
-               switch(accessType) {
-                       case ADMINISTER_QUEUE:
-                               ret = 
RangerYarnAuthorizer.ACCESS_TYPE_ADMIN_QUEUE;
-                       break;
-
-                       case SUBMIT_APP:
-                               ret = 
RangerYarnAuthorizer.ACCESS_TYPE_SUBMIT_APP;
-                       break;
-               }
-
-               return ret;
-       }
-
-       private boolean isSelfOrChildOf(PrivilegedEntity queue, 
PrivilegedEntity parentQueue) {
-               boolean ret = queue.equals(parentQueue);
-
-               if(!ret && queue.getType() == EntityType.QUEUE) {
-                       String queueName       = queue.getName();
-                       String parentQueueName = parentQueue.getName();
-
-                       if(queueName.contains(".") && 
!StringUtil.isEmpty(parentQueueName)) {
-                               
if(parentQueueName.charAt(parentQueueName.length() - 1) != '.') {
-                                       parentQueueName += ".";
-                               }
-
-                               ret = queueName.startsWith(parentQueueName);
-                       }
-               }
-
-               return ret;
-       }
-
-       private String toString(PrivilegedEntity entity) {
-               if(entity != null) {
-                       return "{name=" + entity.getName() + "; type=" + 
entity.getType() + "}";
-               }
-
-               return "null";
-       }
-}
-
-class RangerYarnPlugin extends RangerBasePlugin {
-       public RangerYarnPlugin() {
-               super("yarn", "yarn");
-       }
-
-       @Override
-       public void init() {
-               super.init();
-
-               RangerDefaultAuditHandler auditHandler = new 
RangerDefaultAuditHandler();
-
-               super.setResultProcessor(auditHandler);
-       }
-}
-
-class RangerYarnResource extends RangerAccessResourceImpl {
-       private static final String KEY_QUEUE = "queue";
-
-       public RangerYarnResource(PrivilegedEntity entity) {
-               setValue(KEY_QUEUE, entity != null ? entity.getName() : null);
-       }
-}
-
-class RangerYarnAccessRequest extends RangerAccessRequestImpl {
-       public RangerYarnAccessRequest(PrivilegedEntity entity, String 
accessType, String action, UserGroupInformation ugi) {
-               super.setResource(new RangerYarnResource(entity));
-               super.setAccessType(accessType);
-               super.setUser(ugi.getShortUserName());
-               super.setUserGroups(Sets.newHashSet(ugi.getGroupNames()));
-               super.setAccessTime(StringUtil.getUTCDate());
-               super.setClientIPAddress(getRemoteIp());
-               super.setAction(accessType);
-       }
-       
-       private static String getRemoteIp() {
-               String ret = null ;
-               InetAddress ip = Server.getRemoteIp() ;
-               if (ip != null) {
-                       ret = ip.getHostAddress();
-               }
-               return ret ;
-       }
-}
-
-class RangerYarnAuditHandler extends RangerDefaultAuditHandler {
-       private static final Log LOG = 
LogFactory.getLog(RangerYarnAuditHandler.class);
-
-       private static final String YarnModuleName = 
RangerConfiguration.getInstance().get(RangerHadoopConstants.AUDITLOG_YARN_MODULE_ACL_NAME_PROP
 , RangerHadoopConstants.DEFAULT_YARN_MODULE_ACL_NAME) ;
-
-       private boolean         isAuditEnabled = false;
-       private AuthzAuditEvent auditEvent     = null;
-
-       public RangerYarnAuditHandler() {
-       }
-
-       @Override
-       public void processResult(RangerAccessResult result) {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> RangerYarnAuditHandler.logAudit(" + 
result + ")");
-               }
-
-               if(! isAuditEnabled && result.getIsAudited()) {
-                       isAuditEnabled = true;
-               }
-
-               auditEvent = super.getAuthzEvents(result);
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== RangerYarnAuditHandler.logAudit(" + 
result + "): " + auditEvent);
-               }
-       }
-
-       public void logYarnAclEvent(boolean accessGranted) {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> RangerYarnAuditHandler.logYarnAclEvent(" 
+ accessGranted + ")");
-               }
-
-               if(auditEvent != null) {
-                       auditEvent.setAccessResult((short) (accessGranted ? 1 : 
0));
-                       auditEvent.setAclEnforcer(YarnModuleName);
-                       auditEvent.setPolicyId(-1);
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== RangerYarnAuditHandler.logYarnAclEvent(" 
+ accessGranted + "): " + auditEvent);
-               }
-       }
-
-       public void flushAudit() {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> RangerYarnAuditHandler.flushAudit(" + 
isAuditEnabled + ", " + auditEvent + ")");
-               }
-
-               if(isAuditEnabled) {
-                       super.logAuthzAudit(auditEvent);
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== RangerYarnAuditHandler.flushAudit(" + 
isAuditEnabled + ", " + auditEvent + ")");
-               }
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/plugin-yarn/src/main/java/org/apache/ranger/authorization/yarn/authorizer/RangerYarnAuthorizerImpl.java
----------------------------------------------------------------------
diff --git 
a/plugin-yarn/src/main/java/org/apache/ranger/authorization/yarn/authorizer/RangerYarnAuthorizerImpl.java
 
b/plugin-yarn/src/main/java/org/apache/ranger/authorization/yarn/authorizer/RangerYarnAuthorizerImpl.java
new file mode 100644
index 0000000..d154539
--- /dev/null
+++ 
b/plugin-yarn/src/main/java/org/apache/ranger/authorization/yarn/authorizer/RangerYarnAuthorizerImpl.java
@@ -0,0 +1,354 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.authorization.yarn.authorizer;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.security.*;
+import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+import org.apache.ranger.authorization.hadoop.config.RangerConfiguration;
+import org.apache.ranger.authorization.hadoop.constants.RangerHadoopConstants;
+import org.apache.ranger.authorization.utils.StringUtil;
+import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
+import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
+import org.apache.ranger.plugin.policyengine.RangerAccessResult;
+import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
+import org.apache.ranger.plugin.service.RangerBasePlugin;
+
+import com.google.common.collect.Sets;
+
+public class RangerYarnAuthorizerImpl extends YarnAuthorizationProvider {
+       public static final String ACCESS_TYPE_ADMIN_QUEUE = "admin-queue";
+       public static final String ACCESS_TYPE_SUBMIT_APP  = "submit-app";
+       public static final String ACCESS_TYPE_ADMIN       = "admin";
+
+       private static boolean yarnAuthEnabled = 
RangerHadoopConstants.RANGER_ADD_YARN_PERMISSION_DEFAULT;
+
+       private static final Log LOG = 
LogFactory.getLog(RangerYarnAuthorizerImpl.class);
+
+       private static volatile RangerYarnPlugin yarnPlugin = null;
+
+       private AccessControlList admins = null;
+       private Map<PrivilegedEntity, Map<AccessType, AccessControlList>> 
yarnAcl = new HashMap<PrivilegedEntity, Map<AccessType, AccessControlList>>();
+
+       @Override
+       public void init(Configuration conf) {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> RangerYarnAuthorizer.init()");
+               }
+
+               RangerYarnPlugin plugin = yarnPlugin;
+
+               if(plugin == null) {
+                       synchronized(RangerYarnAuthorizerImpl.class) {
+                               plugin = yarnPlugin;
+
+                               if(plugin == null) {
+                                       plugin = new RangerYarnPlugin();
+                                       plugin.init();
+                                       
+                                       yarnPlugin = plugin;
+                               }
+                       }
+               }
+
+               RangerYarnAuthorizerImpl.yarnAuthEnabled = 
RangerConfiguration.getInstance().getBoolean(RangerHadoopConstants.RANGER_ADD_YARN_PERMISSION_PROP,
 RangerHadoopConstants.RANGER_ADD_YARN_PERMISSION_DEFAULT);
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== RangerYarnAuthorizer.init()");
+               }
+       }
+
+       @Override
+       public boolean checkPermission(AccessType accessType, PrivilegedEntity 
entity, UserGroupInformation ugi) {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> RangerYarnAuthorizer.checkPermission(" + 
accessType + ", " + toString(entity) + ", " + ugi + ")");
+               }
+
+               boolean                ret          = false;
+               RangerYarnPlugin       plugin       = yarnPlugin;
+               RangerYarnAuditHandler auditHandler = null;
+               RangerAccessResult     result       = null;
+
+               if(plugin != null) {
+                       RangerYarnAccessRequest request = new 
RangerYarnAccessRequest(entity, getRangerAccessType(accessType), 
accessType.name(), ugi);
+
+                       auditHandler = new RangerYarnAuditHandler();
+
+                       result = plugin.isAccessAllowed(request, auditHandler);
+               }
+
+               if(RangerYarnAuthorizerImpl.yarnAuthEnabled && (result == null 
|| !result.getIsAccessDetermined())) {
+                       ret = isAllowedByYarnAcl(accessType, entity, ugi, 
auditHandler);
+               } else {
+                       ret = result == null ? false : result.getIsAllowed();
+               }
+
+               if(auditHandler != null) {
+                       auditHandler.flushAudit();
+               }
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== RangerYarnAuthorizer.checkPermission(" + 
accessType + ", " + toString(entity) + ", " + ugi + "): " + ret);
+               }
+
+               return ret;
+       }
+
+       @Override
+       public boolean isAdmin(UserGroupInformation ugi) {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> RangerYarnAuthorizer.isAdmin(" + ugi + 
")");
+               }
+
+               boolean ret = false;
+               
+               AccessControlList admins = this.admins;
+
+               if(admins != null) {
+                       ret = admins.isUserAllowed(ugi);
+               }
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== RangerYarnAuthorizer.isAdmin(" + ugi + 
"): " + ret);
+               }
+
+               return ret;
+       }
+
+       @Override
+       public void setAdmins(AccessControlList acl, UserGroupInformation ugi) {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> RangerYarnAuthorizer.setAdmins(" + acl + 
", " + ugi + ")");
+               }
+
+               admins = acl;
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== RangerYarnAuthorizer.setAdmins(" + acl + 
", " + ugi + ")");
+               }
+       }
+
+       @Override
+       public void setPermission(PrivilegedEntity entity, Map<AccessType, 
AccessControlList> permission, UserGroupInformation ugi) {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> RangerYarnAuthorizer.setPermission(" + 
toString(entity) + ", " + permission + ", " + ugi + ")");
+               }
+
+               yarnAcl.put(entity, permission);
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== RangerYarnAuthorizer.setPermission(" + 
toString(entity) + ", " + permission + ", " + ugi + ")");
+               }
+       }
+
+       public boolean isAllowedByYarnAcl(AccessType accessType, 
PrivilegedEntity entity, UserGroupInformation ugi, RangerYarnAuditHandler 
auditHandler) {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> 
RangerYarnAuthorizer.isAllowedByYarnAcl(" + accessType + ", " + 
toString(entity) + ", " + ugi + ")");
+               }
+
+               boolean ret = false;
+
+               for(Map.Entry<PrivilegedEntity, Map<AccessType, 
AccessControlList>> e : yarnAcl.entrySet()) {
+                       PrivilegedEntity                   aclEntity         = 
e.getKey();
+                       Map<AccessType, AccessControlList> entityPermissions = 
e.getValue();
+
+                       AccessControlList acl = entityPermissions == null ? 
null : entityPermissions.get(accessType);
+
+                       if(acl == null || !acl.isUserAllowed(ugi)) {
+                               continue;
+                       }
+
+                       if(! isSelfOrChildOf(entity, aclEntity)) {
+                               continue;
+                       }
+
+                       ret = true;
+
+                       break;
+               }
+
+               if(auditHandler != null) {
+                       auditHandler.logYarnAclEvent(ret);
+               }
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== 
RangerYarnAuthorizer.isAllowedByYarnAcl(" + accessType + ", " + 
toString(entity) + ", " + ugi + "): " + ret);
+               }
+
+               return ret;
+       }
+
+       private static String getRangerAccessType(AccessType accessType) {
+               String ret = null;
+
+               switch(accessType) {
+                       case ADMINISTER_QUEUE:
+                               ret = 
RangerYarnAuthorizerImpl.ACCESS_TYPE_ADMIN_QUEUE;
+                       break;
+
+                       case SUBMIT_APP:
+                               ret = 
RangerYarnAuthorizerImpl.ACCESS_TYPE_SUBMIT_APP;
+                       break;
+               }
+
+               return ret;
+       }
+
+       private boolean isSelfOrChildOf(PrivilegedEntity queue, 
PrivilegedEntity parentQueue) {
+               boolean ret = queue.equals(parentQueue);
+
+               if(!ret && queue.getType() == EntityType.QUEUE) {
+                       String queueName       = queue.getName();
+                       String parentQueueName = parentQueue.getName();
+
+                       if(queueName.contains(".") && 
!StringUtil.isEmpty(parentQueueName)) {
+                               
if(parentQueueName.charAt(parentQueueName.length() - 1) != '.') {
+                                       parentQueueName += ".";
+                               }
+
+                               ret = queueName.startsWith(parentQueueName);
+                       }
+               }
+
+               return ret;
+       }
+
+       private String toString(PrivilegedEntity entity) {
+               if(entity != null) {
+                       return "{name=" + entity.getName() + "; type=" + 
entity.getType() + "}";
+               }
+
+               return "null";
+       }
+}
+
+class RangerYarnPlugin extends RangerBasePlugin {
+       public RangerYarnPlugin() {
+               super("yarn", "yarn");
+       }
+
+       @Override
+       public void init() {
+               super.init();
+
+               RangerDefaultAuditHandler auditHandler = new 
RangerDefaultAuditHandler();
+
+               super.setResultProcessor(auditHandler);
+       }
+}
+
+class RangerYarnResource extends RangerAccessResourceImpl {
+       private static final String KEY_QUEUE = "queue";
+
+       public RangerYarnResource(PrivilegedEntity entity) {
+               setValue(KEY_QUEUE, entity != null ? entity.getName() : null);
+       }
+}
+
+class RangerYarnAccessRequest extends RangerAccessRequestImpl {
+       public RangerYarnAccessRequest(PrivilegedEntity entity, String 
accessType, String action, UserGroupInformation ugi) {
+               super.setResource(new RangerYarnResource(entity));
+               super.setAccessType(accessType);
+               super.setUser(ugi.getShortUserName());
+               super.setUserGroups(Sets.newHashSet(ugi.getGroupNames()));
+               super.setAccessTime(StringUtil.getUTCDate());
+               super.setClientIPAddress(getRemoteIp());
+               super.setAction(accessType);
+       }
+       
+       private static String getRemoteIp() {
+               String ret = null ;
+               InetAddress ip = Server.getRemoteIp() ;
+               if (ip != null) {
+                       ret = ip.getHostAddress();
+               }
+               return ret ;
+       }
+}
+
+class RangerYarnAuditHandler extends RangerDefaultAuditHandler {
+       private static final Log LOG = 
LogFactory.getLog(RangerYarnAuditHandler.class);
+
+       private static final String YarnModuleName = 
RangerConfiguration.getInstance().get(RangerHadoopConstants.AUDITLOG_YARN_MODULE_ACL_NAME_PROP
 , RangerHadoopConstants.DEFAULT_YARN_MODULE_ACL_NAME) ;
+
+       private boolean         isAuditEnabled = false;
+       private AuthzAuditEvent auditEvent     = null;
+
+       public RangerYarnAuditHandler() {
+       }
+
+       @Override
+       public void processResult(RangerAccessResult result) {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> RangerYarnAuditHandler.logAudit(" + 
result + ")");
+               }
+
+               if(! isAuditEnabled && result.getIsAudited()) {
+                       isAuditEnabled = true;
+               }
+
+               auditEvent = super.getAuthzEvents(result);
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== RangerYarnAuditHandler.logAudit(" + 
result + "): " + auditEvent);
+               }
+       }
+
+       public void logYarnAclEvent(boolean accessGranted) {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> RangerYarnAuditHandler.logYarnAclEvent(" 
+ accessGranted + ")");
+               }
+
+               if(auditEvent != null) {
+                       auditEvent.setAccessResult((short) (accessGranted ? 1 : 
0));
+                       auditEvent.setAclEnforcer(YarnModuleName);
+                       auditEvent.setPolicyId(-1);
+               }
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== RangerYarnAuditHandler.logYarnAclEvent(" 
+ accessGranted + "): " + auditEvent);
+               }
+       }
+
+       public void flushAudit() {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> RangerYarnAuditHandler.flushAudit(" + 
isAuditEnabled + ", " + auditEvent + ")");
+               }
+
+               if(isAuditEnabled) {
+                       super.logAuthzAudit(auditEvent);
+               }
+
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== RangerYarnAuditHandler.flushAudit(" + 
isAuditEnabled + ", " + auditEvent + ")");
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0ccf12e..2452785 100644
--- a/pom.xml
+++ b/pom.xml
@@ -14,8 +14,7 @@
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+--><project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
   <parent>
         <groupId>org.apache</groupId>
@@ -97,6 +96,15 @@
   <module>unixauthservice</module>
   <module>ranger-util</module>
   <module>plugin-kms</module>
+  <module>plugin-kafka</module>
+  <module>ranger-hdfs-plugin-shim</module>
+  <module>ranger-plugin-classloader</module>
+  <module>ranger-hive-plugin-shim</module>
+  <module>ranger-hbase-plugin-shim</module>
+  <module>ranger-knox-plugin-shim</module>
+  <module>ranger-yarn-plugin-shim</module>
+  <module>ranger-storm-plugin-shim</module>
+  <module>ranger-kafka-plugin-shim</module>
   </modules>
   <properties>
         <javac.source.version>1.7</javac.source.version>
@@ -150,8 +158,9 @@
                <jersey-bundle.version>1.17.1</jersey-bundle.version>
                <jersey-client.version>2.6</jersey-client.version>
                <junit.version>4.11</junit.version>
-               <kafka.version>0.8.2.0</kafka.version>
-               <!-- <kafka.version>0.8.2.2.3.0.0-2208</kafka.version> -->
+               <!--  <kafka.version>0.8.2.0</kafka.version> -->
+               <!--  <kafka.version>0.8.2.2.3.0.0-2320</kafka.version> -->
+               <kafka.version>0.8.2.2.3.2.0-2950</kafka.version>
                <mockito.version>1.8.4</mockito.version>
                <hamcrest-version>1.3</hamcrest-version>
                <knox.gateway.version>0.6.0</knox.gateway.version>
@@ -505,7 +514,7 @@
              <phase>process-resources</phase>
              <configuration>
                <target>
-                  <echo message="${project.version}" 
file="${project.build.directory}/version" />
+                  <echo message="${project.version}" 
file="${project.build.directory}/version"/>
                </target>
              </configuration>
              <goals>
@@ -524,4 +533,4 @@
       </plugin>
     </plugins>
   </build>
-</project>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/ranger-hbase-plugin-shim/pom.xml
----------------------------------------------------------------------
diff --git a/ranger-hbase-plugin-shim/pom.xml b/ranger-hbase-plugin-shim/pom.xml
new file mode 100644
index 0000000..c95075b
--- /dev/null
+++ b/ranger-hbase-plugin-shim/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>security_plugins.ranger-hbase-plugin-shim</groupId>
+  <artifactId>ranger-hbase-plugin-shim</artifactId>
+  <name>HBase Security Plugin Shim</name>
+  <description>HBase Security Plugins Shim</description>
+  <packaging>jar</packaging>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+  </properties>
+  <parent>
+     <groupId>org.apache.ranger</groupId>
+     <artifactId>ranger</artifactId>
+     <version>0.5.0</version>
+     <relativePath>..</relativePath>
+  </parent>
+  <dependencies>
+    <dependency>
+       <groupId>org.apache.hbase</groupId>
+       <artifactId>hbase-server</artifactId>
+       <version>${hbase.version}</version>
+    </dependency>
+    <dependency>
+       <groupId>org.apache.hadoop</groupId>
+       <artifactId>hadoop-hdfs</artifactId>
+       <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>security_plugins.ranger-plugins-common</groupId>
+      <artifactId>ranger-plugins-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>security_plugins.ranger-plugins-audit</groupId>
+      <artifactId>ranger-plugins-audit</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+        <groupId>security_plugins.ranger-plugin-classloader</groupId>
+        <artifactId>ranger-plugin-classloader</artifactId>
+        <version>${project.version}</version>
+    </dependency>
+        <dependency>
+        <groupId>security_plugins.ranger-hbase-plugin</groupId>
+        <artifactId>ranger-hbase-plugin</artifactId>
+        <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+      </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-integration</artifactId>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/ranger-hbase-plugin-shim/src/main/java/com/xasecure/authorization/hbase/XaSecureAuthorizationCoprocessor.java
----------------------------------------------------------------------
diff --git 
a/ranger-hbase-plugin-shim/src/main/java/com/xasecure/authorization/hbase/XaSecureAuthorizationCoprocessor.java
 
b/ranger-hbase-plugin-shim/src/main/java/com/xasecure/authorization/hbase/XaSecureAuthorizationCoprocessor.java
new file mode 100644
index 0000000..bc01e51
--- /dev/null
+++ 
b/ranger-hbase-plugin-shim/src/main/java/com/xasecure/authorization/hbase/XaSecureAuthorizationCoprocessor.java
@@ -0,0 +1,33 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.xasecure.authorization.hbase;
+
+import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import 
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
+import org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor;
+/**
+ * This class exists only to provide for seamless upgrade/downgrade 
capabilities.  Coprocessor name is in hbase config files in /etc/.../conf which
+ * is not only out of bounds for any upgrade script but also must be of a form 
to allow for downgrad!  Thus when class names were changed XaSecure* -> Ranger* 
+ * this shell class serves to allow for seamles upgrade as well as downgrade.
+ * 
+ * This class is final because if one needs to customize coprocessor it is 
expected that RangerAuthorizationCoprocessor would be modified/extended as that 
is
+ * the "real" coprocessor!  This class, hence, should NEVER be more than an 
EMPTY shell!
+ */
+public final class XaSecureAuthorizationCoprocessor extends 
RangerAuthorizationCoprocessor implements AccessControlService.Interface, 
CoprocessorService {
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/ebe83454/ranger-hbase-plugin-shim/src/main/java/org/apache/hadoop/hbase/security/access/RangerAccessControlLists.java
----------------------------------------------------------------------
diff --git 
a/ranger-hbase-plugin-shim/src/main/java/org/apache/hadoop/hbase/security/access/RangerAccessControlLists.java
 
b/ranger-hbase-plugin-shim/src/main/java/org/apache/hadoop/hbase/security/access/RangerAccessControlLists.java
new file mode 100644
index 0000000..7f33b15
--- /dev/null
+++ 
b/ranger-hbase-plugin-shim/src/main/java/org/apache/hadoop/hbase/security/access/RangerAccessControlLists.java
@@ -0,0 +1,104 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.access;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.log4j.Logger;
+
+
+public class RangerAccessControlLists {
+       
+       private static final Logger LOG = 
Logger.getLogger(RangerAccessControlLists.class) ;
+       
+       public static void init(MasterServices master) throws IOException {
+
+               Class<AccessControlLists> accessControlListsClass = 
AccessControlLists.class ;
+               String cName = accessControlListsClass.getName() ;
+
+               Class<?>[] params = new Class[1] ;
+               params[0] = MasterServices.class ;
+               
+               for (String mname : new String[] { "init", "createACLTable" } ) 
{
+                       try {
+                               try {
+                                       Method m = 
accessControlListsClass.getDeclaredMethod(mname, params) ;
+                                       if (m != null) {
+                                               try {
+                                                       
+                                                       try {
+                                                               m.invoke(null, 
master) ;
+                                                               
logInfo("Execute method name [" + mname + "] in Class [" +  cName + "] is 
successful.");
+                                                       } catch 
(InvocationTargetException e) {
+                                                               Throwable cause 
= e ;
+                                                               boolean 
tableExistsExceptionFound = false ;
+                                                               if  (e != null) 
{       
+                                                                       
Throwable ecause = e.getTargetException() ;
+                                                                       if 
(ecause != null) {
+                                                                               
cause = ecause ;
+                                                                               
if (ecause instanceof TableExistsException) {
+                                                                               
        tableExistsExceptionFound = true ;
+                                                                               
}
+                                                                       }
+                                                               }
+                                                               if (! 
tableExistsExceptionFound) {
+                                                                       
logError("Unable to execute the method [" + mname + "] on [" + cName + "] due 
to exception", cause) ;
+                                                                       throw 
new IOException(cause) ;
+                                                               }
+                                                       }
+                                                       return ;
+                                               } catch 
(IllegalArgumentException e) {
+                                                       logError("Unable to 
execute method name [" + mname + "] in Class [" +  cName + "].", e);
+                                                       throw new 
IOException(e) ;
+                                               } catch (IllegalAccessException 
e) {
+                                                       logError("Unable to 
execute method name [" + mname + "] in Class [" +  cName + "].", e);
+                                                       throw new 
IOException(e) ;
+                                               }
+                                       }
+                               }
+                               catch(NoSuchMethodException nsme) {
+                                       logInfo("Unable to get method name [" + 
mname + "] in Class [" +  cName + "]. Ignoring the exception");
+                               }
+                       } catch (SecurityException e) {
+                               logError("Unable to get method name [" + mname 
+ "] in Class [" +  cName + "].", e);
+                               throw new IOException(e) ;
+                       }
+               }
+               throw new IOException("Unable to initialize() [" + cName + "]") 
;
+       }
+       
+       
+       private static void logInfo(String msg) {
+               // System.out.println(msg) ;
+               LOG.info(msg) ;
+       }
+
+       private static void logError(String msg, Throwable t) {
+//             System.err.println(msg) ;
+//             if (t != null) {
+//                     t.printStackTrace(System.err);
+//             }
+               LOG.error(msg, t);
+       }
+
+}


Reply via email to