http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryPermissions.java
----------------------------------------------------------------------
diff --git 
a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryPermissions.java
 
b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryPermissions.java
new file mode 100644
index 0000000..4b27e7b
--- /dev/null
+++ 
b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryPermissions.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.AclEntryType;
+import org.apache.hadoop.fs.permission.FsAction;
+
+import com.google.common.collect.Lists;
+
+public class SentryPermissions implements AuthzPermissions {
+  
+  public static class PrivilegeInfo {
+    private final String authzObj;
+    private final Map<String, FsAction> roleToPermission = new HashMap<String, 
FsAction>();
+    public PrivilegeInfo(String authzObj) {
+      this.authzObj = authzObj;
+    }
+    public PrivilegeInfo setPermission(String role, FsAction perm) {
+      roleToPermission.put(role, perm);
+      return this;
+    }
+    public PrivilegeInfo removePermission(String role) {
+      roleToPermission.remove(role);
+      return this;
+    }
+    public FsAction getPermission(String role) {
+      return roleToPermission.get(role);
+    }
+    public Map<String, FsAction> getAllPermissions() {
+      return roleToPermission;
+    }
+    public String getAuthzObj() {
+      return authzObj;
+    }
+  }
+
+  public static class RoleInfo {
+    private final String role;
+    private final Set<String> groups = new HashSet<String>();
+    public RoleInfo(String role) {
+      this.role = role;
+    }
+    public RoleInfo addGroup(String group) {
+      groups.add(group);
+      return this;
+    }
+    public RoleInfo delGroup(String group) {
+      groups.remove(group);
+      return this;
+    }
+    public String getRole() {
+      return role;
+    }
+    public Set<String> getAllGroups() {
+      return groups;
+    }
+  }
+
+  private final Map<String, PrivilegeInfo> privileges = new HashMap<String, 
PrivilegeInfo>();
+  private final Map<String, RoleInfo> roles = new HashMap<String, RoleInfo>();
+  private Map<String, Set<String>> authzObjChildren = new HashMap<String, 
Set<String>>();
+
+  String getParentAuthzObject(String authzObject) {
+    int dot = authzObject.indexOf('.');
+    if (dot > 0) {
+      return authzObject.substring(0, dot);
+    } else {
+      return authzObject;
+    }
+  }
+
+  void addParentChildMappings(String authzObject) {
+    String parent = getParentAuthzObject(authzObject);
+    if (parent != null) {
+      Set<String> children = authzObjChildren.get(parent);
+      if (children == null) {
+        children = new HashSet<String>();
+        authzObjChildren.put(parent, children);
+      }
+      children.add(authzObject);
+    }
+  }
+
+  void removeParentChildMappings(String authzObject) {
+    String parent = getParentAuthzObject(authzObject);
+    if (parent != null) {
+      Set<String> children = authzObjChildren.get(parent);
+      if (children != null) {
+        children.remove(authzObject);
+      }
+    } else {
+      // is parent
+      authzObjChildren.remove(authzObject);
+    }
+  }
+
+  private Map<String, FsAction> getGroupPerms(String authzObj) {
+    Map<String, FsAction> groupPerms = new HashMap<String, FsAction>();
+    if (authzObj == null) {
+      return groupPerms;
+    }
+    PrivilegeInfo privilegeInfo = privileges.get(authzObj);
+    if (privilegeInfo != null) {
+      for (Map.Entry<String, FsAction> privs : privilegeInfo
+          .getAllPermissions().entrySet()) {
+        constructAclEntry(privs.getKey(), privs.getValue(), groupPerms);
+      }
+    }
+    return groupPerms;
+  }
+
+  @Override
+  public List<AclEntry> getAcls(String authzObj) {
+    Map<String, FsAction> groupPerms = getGroupPerms(authzObj);
+    String parent = getParentAuthzObject(authzObj);
+    Map<String, FsAction> pGroupPerms = null;
+    if (parent == null) {
+      pGroupPerms = new HashMap<String, FsAction>();
+    } else {
+      pGroupPerms = getGroupPerms(getParentAuthzObject(authzObj));
+      if ((groupPerms == null)||(groupPerms.size() == 0)) {
+        groupPerms = pGroupPerms;
+      }
+    }
+    List<AclEntry> retList = new LinkedList<AclEntry>();
+    for (Map.Entry<String, FsAction> groupPerm : groupPerms.entrySet()) {
+      AclEntry.Builder builder = new AclEntry.Builder();
+      builder.setName(groupPerm.getKey());
+      builder.setType(AclEntryType.GROUP);
+      builder.setScope(AclEntryScope.ACCESS);
+      FsAction action = groupPerm.getValue();
+      FsAction pAction = pGroupPerms.get(groupPerm.getKey());
+      if (pAction != null) {
+        action.or(pAction);
+      }
+      if ((action == FsAction.READ) || (action == FsAction.WRITE)
+          || (action == FsAction.READ_WRITE)) {
+        action = action.or(FsAction.EXECUTE);
+      }
+      builder.setPermission(action);
+      retList.add(builder.build());
+    }
+    return retList;
+  }
+
+  private void constructAclEntry(String role, FsAction permission,
+      Map<String, FsAction> groupPerms) {
+    RoleInfo roleInfo = roles.get(role);
+    if (roleInfo != null) {
+      for (String group : roleInfo.groups) {
+        FsAction fsAction = groupPerms.get(group);
+        if (fsAction == null) {
+          fsAction = FsAction.NONE;
+        }
+        groupPerms.put(group, fsAction.or(permission));
+      }
+    }
+  }
+
+  public PrivilegeInfo getPrivilegeInfo(String authzObj) {
+    return privileges.get(authzObj);
+  }
+
+  Collection<PrivilegeInfo> getAllPrivileges() {
+    return privileges.values();
+  }
+
+  Collection<RoleInfo> getAllRoles() {
+    return roles.values();
+  }
+
+  public void delPrivilegeInfo(String authzObj) {
+    privileges.remove(authzObj);
+  }
+
+  public void addPrivilegeInfo(PrivilegeInfo privilegeInfo) {
+    privileges.put(privilegeInfo.authzObj, privilegeInfo);
+  }
+
+  public Set<String> getChildren(String authzObj) {
+    return authzObjChildren.get(authzObj);
+  }
+
+  public RoleInfo getRoleInfo(String role) {
+    return roles.get(role);
+  }
+
+  public void delRoleInfo(String role) {
+    roles.remove(role);
+  }
+
+  public void addRoleInfo(RoleInfo roleInfo) {
+    roles.put(roleInfo.role, roleInfo);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
----------------------------------------------------------------------
diff --git 
a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
 
b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
new file mode 100644
index 0000000..9540397
--- /dev/null
+++ 
b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.hdfs.SentryHDFSServiceClient.SentryAuthzUpdate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SentryUpdater {
+
+  private SentryHDFSServiceClient sentryClient;
+  private final Configuration conf;
+  private final SentryAuthorizationInfo authzInfo;
+
+  private static Logger LOG = LoggerFactory.getLogger(SentryUpdater.class);
+
+  public SentryUpdater(Configuration conf, SentryAuthorizationInfo authzInfo) 
throws Exception {
+    this.conf = conf;
+    this.authzInfo = authzInfo;
+  }
+
+  public SentryAuthzUpdate getUpdates() {
+    if (sentryClient == null) {
+      try {
+        sentryClient = new SentryHDFSServiceClient(conf);
+      } catch (Exception e) {
+        LOG.error("Error connecting to Sentry ['{}'] !!",
+            e.getMessage());
+        sentryClient = null;
+        return null;
+      }
+    }
+    try {
+      SentryAuthzUpdate sentryUpdates = sentryClient.getAllUpdatesFrom(
+          authzInfo.getAuthzPermissions().getLastUpdatedSeqNum() + 1,
+          authzInfo.getAuthzPaths().getLastUpdatedSeqNum() + 1);
+      return sentryUpdates;
+    } catch (Exception e)  {
+      sentryClient = null;
+      LOG.error("Error receiving updates from Sentry !!", e);
+      return null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java
----------------------------------------------------------------------
diff --git 
a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java
 
b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java
new file mode 100644
index 0000000..e5af802
--- /dev/null
+++ 
b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.sentry.hdfs.SentryPermissions.PrivilegeInfo;
+import org.apache.sentry.hdfs.SentryPermissions.RoleInfo;
+import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges;
+import org.apache.sentry.hdfs.service.thrift.TRoleChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateableAuthzPermissions implements AuthzPermissions, 
Updateable<PermissionsUpdate> {
+  private static final int MAX_UPDATES_PER_LOCK_USE = 99;
+  private volatile SentryPermissions perms = new SentryPermissions();
+  private final AtomicLong seqNum = new AtomicLong(0);
+  
+  private static Logger LOG = 
LoggerFactory.getLogger(UpdateableAuthzPermissions.class);
+
+  public static Map<String, FsAction> ACTION_MAPPING = new HashMap<String, 
FsAction>();
+  static {
+    ACTION_MAPPING.put("ALL", FsAction.ALL);
+    ACTION_MAPPING.put("*", FsAction.ALL);
+    ACTION_MAPPING.put("SELECT", FsAction.READ_EXECUTE);
+    ACTION_MAPPING.put("select", FsAction.READ_EXECUTE);
+    ACTION_MAPPING.put("INSERT", FsAction.WRITE_EXECUTE);
+    ACTION_MAPPING.put("insert", FsAction.WRITE_EXECUTE);
+  }
+
+  @Override
+  public List<AclEntry> getAcls(String authzObj) {
+    return perms.getAcls(authzObj);
+  }
+
+  @Override
+  public UpdateableAuthzPermissions updateFull(PermissionsUpdate update) {
+    UpdateableAuthzPermissions other = new UpdateableAuthzPermissions();
+    other.applyPartialUpdate(update);
+    other.seqNum.set(update.getSeqNum());
+    return other;
+  }
+
+  @Override
+  public void updatePartial(Iterable<PermissionsUpdate> updates, ReadWriteLock 
lock) {
+    lock.writeLock().lock();
+    try {
+      int counter = 0;
+      for (PermissionsUpdate update : updates) {
+        applyPartialUpdate(update);
+        if (++counter > MAX_UPDATES_PER_LOCK_USE) {
+          counter = 0;
+          lock.writeLock().unlock();
+          lock.writeLock().lock();
+        }
+        seqNum.set(update.getSeqNum());
+        LOG.debug("##### Updated perms seq Num [" + seqNum.get() + "]");
+      }
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+  
+
+  private void applyPartialUpdate(PermissionsUpdate update) {
+    applyPrivilegeUpdates(update);
+    applyRoleUpdates(update);
+  }
+
+  private void applyRoleUpdates(PermissionsUpdate update) {
+    for (TRoleChanges rUpdate : update.getRoleUpdates()) {
+      if (rUpdate.getRole().equals(PermissionsUpdate.ALL_ROLES)) {
+        // Request to remove group from all roles
+        String groupToRemove = rUpdate.getDelGroups().iterator().next();
+        for (RoleInfo rInfo : perms.getAllRoles()) {
+          rInfo.delGroup(groupToRemove);
+        }
+      }
+      RoleInfo rInfo = perms.getRoleInfo(rUpdate.getRole());
+      for (String group : rUpdate.getAddGroups()) {
+        if (rInfo == null) {
+          rInfo = new RoleInfo(rUpdate.getRole());
+        }
+        rInfo.addGroup(group);
+      }
+      if (rInfo != null) {
+        perms.addRoleInfo(rInfo);
+        for (String group : rUpdate.getDelGroups()) {
+          if (group.equals(PermissionsUpdate.ALL_GROUPS)) {
+            perms.delRoleInfo(rInfo.getRole());
+            break;
+          }
+          // If there are no groups to remove, rUpdate.getDelGroups() will
+          // return empty list and this code will not be reached
+          rInfo.delGroup(group);
+        }
+      }
+    }
+  }
+
+  private void applyPrivilegeUpdates(PermissionsUpdate update) {
+    for (TPrivilegeChanges pUpdate : update.getPrivilegeUpdates()) {
+      if (pUpdate.getAuthzObj().equals(PermissionsUpdate.RENAME_PRIVS)) {
+        String newAuthzObj = 
pUpdate.getAddPrivileges().keySet().iterator().next();
+        String oldAuthzObj = 
pUpdate.getDelPrivileges().keySet().iterator().next();
+        PrivilegeInfo privilegeInfo = perms.getPrivilegeInfo(oldAuthzObj);
+        Map<String, FsAction> allPermissions = 
privilegeInfo.getAllPermissions();
+        perms.delPrivilegeInfo(oldAuthzObj);
+        perms.removeParentChildMappings(oldAuthzObj);
+        PrivilegeInfo newPrivilegeInfo = new PrivilegeInfo(newAuthzObj);
+        for (Map.Entry<String, FsAction> e : allPermissions.entrySet()) {
+          newPrivilegeInfo.setPermission(e.getKey(), e.getValue());
+        }
+        perms.addPrivilegeInfo(newPrivilegeInfo);
+        perms.addParentChildMappings(newAuthzObj);
+        return;
+      }
+      if (pUpdate.getAuthzObj().equals(PermissionsUpdate.ALL_AUTHZ_OBJ)) {
+        // Request to remove role from all Privileges
+        String roleToRemove = pUpdate.getDelPrivileges().keySet().iterator()
+            .next();
+        for (PrivilegeInfo pInfo : perms.getAllPrivileges()) {
+          pInfo.removePermission(roleToRemove);
+        }
+      }
+      PrivilegeInfo pInfo = perms.getPrivilegeInfo(pUpdate.getAuthzObj());
+      for (Map.Entry<String, String> aMap : 
pUpdate.getAddPrivileges().entrySet()) {
+        if (pInfo == null) {
+          pInfo = new PrivilegeInfo(pUpdate.getAuthzObj());
+        }
+        FsAction fsAction = pInfo.getPermission(aMap.getKey());
+        if (fsAction == null) {
+          fsAction = getFAction(aMap.getValue());
+        } else {
+          fsAction = fsAction.or(getFAction(aMap.getValue()));
+        }
+        pInfo.setPermission(aMap.getKey(), fsAction);
+      }
+      if (pInfo != null) {
+        perms.addPrivilegeInfo(pInfo);
+        perms.addParentChildMappings(pUpdate.getAuthzObj());
+        for (Map.Entry<String, String> dMap : 
pUpdate.getDelPrivileges().entrySet()) {
+          if (dMap.getKey().equals(PermissionsUpdate.ALL_ROLES)) {
+            // Remove all privileges
+            perms.delPrivilegeInfo(pUpdate.getAuthzObj());
+            perms.removeParentChildMappings(pUpdate.getAuthzObj());
+            break;
+          }
+          List<PrivilegeInfo> parentAndChild = new LinkedList<PrivilegeInfo>();
+          parentAndChild.add(pInfo);
+          Set<String> children = perms.getChildren(pInfo.getAuthzObj());
+          if (children != null) {
+            for (String child : children) {
+              parentAndChild.add(perms.getPrivilegeInfo(child));
+            }
+          }
+          // recursive revoke
+          for (PrivilegeInfo pInfo2 : parentAndChild) {
+            FsAction fsAction = pInfo2.getPermission(dMap.getKey());
+            if (fsAction != null) {
+              fsAction = fsAction.and(getFAction(dMap.getValue()).not());
+              if (FsAction.NONE == fsAction) {
+                pInfo2.removePermission(dMap.getKey());
+              } else {
+                pInfo2.setPermission(dMap.getKey(), fsAction);
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  static FsAction getFAction(String sentryPriv) {
+    String[] strPrivs = sentryPriv.trim().split(",");
+    FsAction retVal = FsAction.NONE;
+    for (String strPriv : strPrivs) {
+      retVal = retVal.or(ACTION_MAPPING.get(strPriv.toUpperCase()));
+    }
+    return retVal;
+  }
+
+  @Override
+  public long getLastUpdatedSeqNum() {
+    return seqNum.get();
+  }
+
+  @Override
+  public PermissionsUpdate createFullImageUpdate(long currSeqNum) {
+    PermissionsUpdate retVal = new PermissionsUpdate(currSeqNum, true);
+    for (PrivilegeInfo pInfo : perms.getAllPrivileges()) {
+      TPrivilegeChanges pUpdate = 
retVal.addPrivilegeUpdate(pInfo.getAuthzObj());
+      for (Map.Entry<String, FsAction> ent : 
pInfo.getAllPermissions().entrySet()) {
+        pUpdate.putToAddPrivileges(ent.getKey(), ent.getValue().SYMBOL);
+      }
+    }
+    for (RoleInfo rInfo : perms.getAllRoles()) {
+      TRoleChanges rUpdate = retVal.addRoleUpdate(rInfo.getRole());
+      for (String group : rInfo.getAllGroups()) {
+        rUpdate.addToAddGroups(group);
+      }
+    }
+    return retVal;
+  }
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/MockSentryAuthorizationProvider.java
----------------------------------------------------------------------
diff --git 
a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/MockSentryAuthorizationProvider.java
 
b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/MockSentryAuthorizationProvider.java
new file mode 100644
index 0000000..2085b52
--- /dev/null
+++ 
b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/MockSentryAuthorizationProvider.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+public class MockSentryAuthorizationProvider extends
+    SentryAuthorizationProvider {
+
+  public MockSentryAuthorizationProvider() {
+    super(new SentryAuthorizationInfoX());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/SentryAuthorizationInfoX.java
----------------------------------------------------------------------
diff --git 
a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/SentryAuthorizationInfoX.java
 
b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/SentryAuthorizationInfoX.java
new file mode 100644
index 0000000..7a1539b
--- /dev/null
+++ 
b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/SentryAuthorizationInfoX.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.AclEntryType;
+import org.apache.hadoop.fs.permission.FsAction;
+
+public class SentryAuthorizationInfoX extends SentryAuthorizationInfo {
+
+  public SentryAuthorizationInfoX() {
+    super();
+  }
+
+  @Override
+  public void run() {
+    
+  }
+
+  @Override
+  public void start() {
+
+  }
+
+  @Override
+  public void stop() {
+
+  }
+
+  @Override
+  public boolean isStale() {
+    return false;
+  }
+
+  private static final String[] MANAGED = {"user", "authz"};
+  private static final String[] AUTHZ_OBJ = {"user", "authz", "obj"};
+
+  private boolean hasPrefix(String[] prefix, String[] pathElement) {
+    int i = 0;
+    for (; i < prefix.length && i < pathElement.length; i ++) {
+      if (!prefix[i].equals(pathElement[i])) {
+        return false;
+      }
+    }    
+    return (i == prefix.length);
+  }
+  
+  @Override
+  public boolean isManaged(String[] pathElements) {
+    return hasPrefix(MANAGED, pathElements);
+  }
+
+  @Override
+  public boolean doesBelongToAuthzObject(String[] pathElements) {
+    return hasPrefix(AUTHZ_OBJ, pathElements);
+  }
+
+  @Override
+  public List<AclEntry> getAclEntries(String[] pathElements) {
+    AclEntry acl = new AclEntry.Builder().setType(AclEntryType.USER).
+        setPermission(FsAction.ALL).setName("user-authz").
+        setScope(AclEntryScope.ACCESS).build();
+    return Arrays.asList(acl);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/TestSentryAuthorizationProvider.java
----------------------------------------------------------------------
diff --git 
a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/TestSentryAuthorizationProvider.java
 
b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/TestSentryAuthorizationProvider.java
new file mode 100644
index 0000000..b766a8f
--- /dev/null
+++ 
b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/java/org/apache/sentry/hdfs/TestSentryAuthorizationProvider.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.AclEntryType;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestSentryAuthorizationProvider {
+  private MiniDFSCluster miniDFS;
+  private UserGroupInformation admin;
+  
+  @Before
+  public void setUp() throws Exception {
+    admin = UserGroupInformation.createUserForTesting(
+        System.getProperty("user.name"), new String[] { "supergroup" });
+    admin.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA, 
"target/test/data");
+        Configuration conf = new HdfsConfiguration();
+        
conf.setBoolean("sentry.authorization-provider.include-hdfs-authz-as-acl", 
true);
+        conf.set(DFSConfigKeys.DFS_NAMENODE_AUTHORIZATION_PROVIDER_KEY,
+            MockSentryAuthorizationProvider.class.getName());
+        conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
+        EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
+        miniDFS = new MiniDFSCluster.Builder(conf).build();
+        return null;
+      }
+    });
+  }
+
+  @After
+  public void cleanUp() throws IOException {
+    if (miniDFS != null) {
+      miniDFS.shutdown();
+    }
+  }
+
+  @Test
+  public void testProvider() throws Exception {
+    admin.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        String sysUser = 
UserGroupInformation.getCurrentUser().getShortUserName();
+        FileSystem fs = FileSystem.get(miniDFS.getConfiguration(0));
+
+        List<AclEntry> baseAclList = new ArrayList<AclEntry>();
+        AclEntry.Builder builder = new AclEntry.Builder();
+        baseAclList.add(builder.setType(AclEntryType.USER)
+            .setScope(AclEntryScope.ACCESS).build());
+        baseAclList.add(builder.setType(AclEntryType.GROUP)
+            .setScope(AclEntryScope.ACCESS).build());
+        baseAclList.add(builder.setType(AclEntryType.OTHER)
+            .setScope(AclEntryScope.ACCESS).build());
+        Path path1 = new Path("/user/authz/obj/xxx");
+        fs.mkdirs(path1);
+        fs.setAcl(path1, baseAclList);
+
+        fs.mkdirs(new Path("/user/authz/xxx"));
+        fs.mkdirs(new Path("/user/xxx"));
+
+        // root
+        Path path = new Path("/");
+        Assert.assertEquals(sysUser, fs.getFileStatus(path).getOwner());
+        Assert.assertEquals("supergroup", fs.getFileStatus(path).getGroup());
+        Assert.assertEquals(new FsPermission((short) 0755), 
fs.getFileStatus(path).getPermission());
+        Assert.assertTrue(fs.getAclStatus(path).getEntries().isEmpty());
+
+        // dir before prefixes
+        path = new Path("/user");
+        Assert.assertEquals(sysUser, fs.getFileStatus(path).getOwner());
+        Assert.assertEquals("supergroup", fs.getFileStatus(path).getGroup());
+        Assert.assertEquals(new FsPermission((short) 0755), 
fs.getFileStatus(path).getPermission());
+        Assert.assertTrue(fs.getAclStatus(path).getEntries().isEmpty());
+
+        // prefix dir
+        path = new Path("/user/authz");
+        Assert.assertEquals(sysUser, fs.getFileStatus(path).getOwner());
+        Assert.assertEquals("supergroup", fs.getFileStatus(path).getGroup());
+        Assert.assertEquals(new FsPermission((short) 0755), 
fs.getFileStatus(path).getPermission());
+        Assert.assertTrue(fs.getAclStatus(path).getEntries().isEmpty());
+
+        // dir inside of prefix, no obj
+        path = new Path("/user/authz/xxx");
+        FileStatus status = fs.getFileStatus(path);
+        Assert.assertEquals(sysUser, status.getOwner());
+        Assert.assertEquals("supergroup", status.getGroup());
+        Assert.assertEquals(new FsPermission((short) 0755), 
status.getPermission());
+        Assert.assertTrue(fs.getAclStatus(path).getEntries().isEmpty());
+
+        // dir inside of prefix, obj
+        path = new Path("/user/authz/obj");
+        Assert.assertEquals("hive", fs.getFileStatus(path).getOwner());
+        Assert.assertEquals("hive", fs.getFileStatus(path).getGroup());
+        Assert.assertEquals(new FsPermission((short) 0770), 
fs.getFileStatus(path).getPermission());
+        Assert.assertFalse(fs.getAclStatus(path).getEntries().isEmpty());
+
+        List<AclEntry> acls = new ArrayList<AclEntry>();
+        acls.add(new 
AclEntry.Builder().setName(sysUser).setType(AclEntryType.USER).setScope(AclEntryScope.ACCESS).setPermission(FsAction.ALL).build());
+        acls.add(new 
AclEntry.Builder().setName("supergroup").setType(AclEntryType.GROUP).setScope(AclEntryScope.ACCESS).setPermission(FsAction.READ_EXECUTE).build());
+        acls.add(new 
AclEntry.Builder().setName(null).setType(AclEntryType.OTHER).setScope(AclEntryScope.ACCESS).setPermission(FsAction.READ_EXECUTE).build());
+        acls.add(new 
AclEntry.Builder().setName("user-authz").setType(AclEntryType.USER).setScope(AclEntryScope.ACCESS).setPermission(FsAction.ALL).build());
+        Assert.assertEquals(new LinkedHashSet<AclEntry>(acls), new 
LinkedHashSet<AclEntry>(fs.getAclStatus(path).getEntries()));
+
+        // dir inside of prefix, inside of obj
+        path = new Path("/user/authz/obj/xxx");
+        Assert.assertEquals("hive", fs.getFileStatus(path).getOwner());
+        Assert.assertEquals("hive", fs.getFileStatus(path).getGroup());
+        Assert.assertEquals(new FsPermission((short) 0770), 
fs.getFileStatus(path).getPermission());
+        Assert.assertFalse(fs.getAclStatus(path).getEntries().isEmpty());
+        
+        Path path2 = new Path("/user/authz/obj/path2");
+        fs.mkdirs(path2);
+        fs.setAcl(path2, baseAclList);
+
+        // dir outside of prefix
+        path = new Path("/user/xxx");
+        Assert.assertEquals(sysUser, fs.getFileStatus(path).getOwner());
+        Assert.assertEquals("supergroup", fs.getFileStatus(path).getGroup());
+        Assert.assertEquals(new FsPermission((short) 0755), 
fs.getFileStatus(path).getPermission());
+        Assert.assertTrue(fs.getAclStatus(path).getEntries().isEmpty());
+        return null;
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/resources/hdfs-sentry.xml
----------------------------------------------------------------------
diff --git 
a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/resources/hdfs-sentry.xml 
b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/resources/hdfs-sentry.xml
new file mode 100644
index 0000000..511bfdd
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/test/resources/hdfs-sentry.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<configuration>
+  <property>
+    <name>sentry.hdfs-plugin.path-prefixes</name>
+    <value>/user/hive/dw</value>
+  </property>
+  <property>
+    <name>sentry.hdfs-plugin.sentry-uri</name>
+    <value>thrift://localhost:1234</value>
+  </property>
+  <property>
+    <name>sentry.hdfs-plugin.stale-threshold.ms</name>
+    <value>-1</value>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-service/.gitignore
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/.gitignore 
b/sentry-hdfs/sentry-hdfs-service/.gitignore
new file mode 100644
index 0000000..91ad75b
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/.gitignore
@@ -0,0 +1,18 @@
+*.class
+target/
+.classpath
+.project
+.settings
+.metadata
+.idea/
+*.iml
+derby.log
+datanucleus.log
+sentry-core/sentry-core-common/src/gen
+**/TempStatsStore/
+# Package Files #
+*.jar
+*.war
+*.ear
+test-output/
+maven-repo/

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-service/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/pom.xml 
b/sentry-hdfs/sentry-hdfs-service/pom.xml
new file mode 100644
index 0000000..365380e
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/pom.xml
@@ -0,0 +1,104 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd"; 
xmlns="http://maven.apache.org/POM/4.0.0";
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.sentry</groupId>
+    <artifactId>sentry-hdfs</artifactId>
+    <version>1.5.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>sentry-hdfs-service</artifactId>
+  <name>Sentry HDFS service</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.shiro</groupId>
+      <artifactId>shiro-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sentry</groupId>
+      <artifactId>sentry-hdfs-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sentry</groupId>
+      <artifactId>sentry-provider-db</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-exec</artifactId>
+      <version>${hive.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-shims</artifactId>
+      <version>${hive.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libfb303</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>ant-contrib</groupId>
+      <artifactId>ant-contrib</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minikdc</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-metastore</artifactId>
+      <version>${hive.version}</version>
+    </dependency>
+  </dependencies>
+
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/ExtendedMetastoreClient.java
----------------------------------------------------------------------
diff --git 
a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/ExtendedMetastoreClient.java
 
b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/ExtendedMetastoreClient.java
new file mode 100644
index 0000000..e7677f2
--- /dev/null
+++ 
b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/ExtendedMetastoreClient.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link MetastoreClient}
+ *
+ */
+public class ExtendedMetastoreClient implements MetastoreClient {
+
+  private static Logger LOG = 
LoggerFactory.getLogger(ExtendedMetastoreClient.class);
+
+  private volatile HiveMetaStoreClient client;
+  private final HiveConf hiveConf;
+  public ExtendedMetastoreClient(HiveConf hiveConf) {
+    this.hiveConf = hiveConf;
+  }
+
+  @Override
+  public List<Database> getAllDatabases() {
+    List<Database> retList = new ArrayList<Database>();
+    HiveMetaStoreClient client = getClient();
+    if (client != null) {
+      try {
+        for (String dbName : client.getAllDatabases()) {
+          retList.add(client.getDatabase(dbName));
+        }
+      } catch (Exception e) {
+        LOG.error("Could not get All Databases !!", e);
+      }
+    }
+    return retList;
+  }
+
+  @Override
+  public List<Table> getAllTablesOfDatabase(Database db) {
+    List<Table> retList = new ArrayList<Table>();
+    HiveMetaStoreClient client = getClient();
+    if (client != null) {
+      try {
+        for (String tblName : client.getAllTables(db.getName())) {
+          retList.add(client.getTable(db.getName(), tblName));
+        }
+      } catch (Exception e) {
+        LOG.error(String.format(
+            "Could not get Tables for '%s' !!", db.getName()), e);
+      }
+    }
+    return retList;
+  }
+
+  @Override
+  public List<Partition> listAllPartitions(Database db, Table tbl) {
+    HiveMetaStoreClient client = getClient();
+    if (client != null) {
+      try {
+        return client.listPartitions(db.getName(), tbl.getTableName(), 
Short.MAX_VALUE);
+      } catch (Exception e) {
+        LOG.error(String.format(
+            "Could not get partitions for '%s'.'%s' !!", db.getName(),
+            tbl.getTableName()), e);
+      }
+    }
+    return new LinkedList<Partition>();
+  }
+
+  private HiveMetaStoreClient getClient() {
+    if (client == null) {
+      try {
+        client = new HiveMetaStoreClient(hiveConf);
+        return client;
+      } catch (MetaException e) {
+        client = null;
+        LOG.error("Could not create metastore client !!", e);
+        return null;
+      }
+    } else {
+      return client;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
----------------------------------------------------------------------
diff --git 
a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
 
b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
new file mode 100644
index 0000000..9a81e3a
--- /dev/null
+++ 
b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.MetaStorePreEventListener;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
+import org.apache.sentry.hdfs.service.thrift.TPathChanges;
+import org.apache.sentry.provider.db.SentryMetastoreListenerPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Plugin implementation of {@link SentryMetastoreListenerPlugin} that hooks
+ * into the sites in the {@link MetaStorePreEventListener} that deal with
+ * creation/updation and deletion for paths.
+ */
+public class MetastorePlugin extends SentryMetastoreListenerPlugin {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MetastorePlugin.class);
+
+  private final Configuration conf;
+  private SentryHDFSServiceClient sentryClient;
+  private UpdateableAuthzPaths authzPaths;
+  private Lock notificiationLock;
+
+  //Initialized to some value > 1 so that the first update notification
+ // will trigger a full Image fetch
+  private final AtomicLong seqNum = new AtomicLong(5);
+  private volatile long lastSentSeqNum = -1;
+  private final ExecutorService threadPool;
+
+  static class ProxyHMSHandler extends HMSHandler {
+       public ProxyHMSHandler(String name, HiveConf conf) throws MetaException 
{
+               super(name, conf);
+       }
+       @Override
+       public String startFunction(String function, String extraLogInfo) {
+               return function;
+       }
+  }
+
+  public MetastorePlugin(Configuration conf) {
+    this.notificiationLock = new ReentrantLock();
+    this.conf = new HiveConf((HiveConf)conf);
+    this.conf.unset(HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS.varname);
+    this.conf.unset(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname);
+    
this.conf.unset(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname);
+    this.conf.unset(HiveConf.ConfVars.METASTOREURIS.varname);
+    try {
+      this.authzPaths = createInitialUpdate(new ProxyHMSHandler("sentry.hdfs", 
(HiveConf)this.conf));
+    } catch (Exception e1) {
+      LOGGER.error("Could not create Initial AuthzPaths or HMSHandler !!", e1);
+      throw new RuntimeException(e1);
+    }
+    try {
+      sentryClient = new SentryHDFSServiceClient(conf);
+    } catch (Exception e) {
+      sentryClient = null;
+      LOGGER.error("Could not connect to Sentry HDFS Service !!", e);
+    }
+    ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1);
+    threadPool.scheduleWithFixedDelay(new Runnable() {
+      @Override
+      public void run() {
+        notificiationLock.lock();
+        try {
+          long lastSeenHMSPathSeqNum =
+              MetastorePlugin.this.getClient().getLastSeenHMSPathSeqNum();
+          if (lastSeenHMSPathSeqNum != lastSentSeqNum) {
+            LOGGER.warn("Sentry not in sync with HMS [" + 
lastSeenHMSPathSeqNum + ", " + lastSentSeqNum + "]");
+            PathsUpdate fullImageUpdate =
+                MetastorePlugin.this.authzPaths.createFullImageUpdate(
+                    lastSentSeqNum);
+            LOGGER.warn("Sentry not in sync with HMS !!");
+            notifySentryNoLock(fullImageUpdate, false);
+          }
+        } catch (Exception e) {
+          sentryClient = null;
+          LOGGER.error("Error talking to Sentry HDFS Service !!", e);
+        } finally {
+          notificiationLock.unlock();
+        }
+      }
+    }, this.conf.getLong(ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS,
+        ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT), 1000,
+        TimeUnit.MILLISECONDS);
+    this.threadPool = threadPool;
+  }
+
+  private UpdateableAuthzPaths createInitialUpdate(IHMSHandler hmsHandler) 
throws Exception {
+    UpdateableAuthzPaths authzPaths = new UpdateableAuthzPaths(new String[] 
{"/"});
+    PathsUpdate tempUpdate = new PathsUpdate(-1, false);
+    List<String> allDbStr = hmsHandler.get_all_databases();
+    for (String dbName : allDbStr) {
+      Database db = hmsHandler.get_database(dbName);
+      tempUpdate.newPathChange(db.getName()).addToAddPaths(
+          PathsUpdate.cleanPath(db.getLocationUri()));
+      List<String> allTblStr = hmsHandler.get_all_tables(db.getName());
+      for (String tblName : allTblStr) {
+        Table tbl = hmsHandler.get_table(db.getName(), tblName);
+        TPathChanges tblPathChange = tempUpdate.newPathChange(tbl
+            .getDbName() + "." + tbl.getTableName());
+        List<Partition> tblParts =
+            hmsHandler.get_partitions(db.getName(), tbl.getTableName(), 
(short) -1);
+        tblPathChange.addToAddPaths(PathsUpdate.cleanPath(tbl.getSd()
+            .getLocation() == null ? db.getLocationUri() : tbl
+            .getSd().getLocation()));
+        for (Partition part : tblParts) {
+          tblPathChange.addToAddPaths(PathsUpdate.cleanPath(part.getSd()
+              .getLocation()));
+        }
+      }
+    }
+    authzPaths.updatePartial(Lists.newArrayList(tempUpdate),
+        new ReentrantReadWriteLock());
+    return authzPaths;
+  }
+
+  @Override
+  public void addPath(String authzObj, String path) {
+    LOGGER.debug("#### HMS Path Update ["
+        + "OP : addPath, "
+        + "authzObj : " + authzObj + ", "
+        + "path : " + path + "]");
+    PathsUpdate update = createHMSUpdate();
+    update.newPathChange(authzObj).addToAddPaths(PathsUpdate.cleanPath(path));
+    notifySentry(update, true);
+  }
+
+  @Override
+  public void removeAllPaths(String authzObj, List<String> childObjects) {
+    LOGGER.debug("#### HMS Path Update ["
+        + "OP : removeAllPaths, "
+        + "authzObj : " + authzObj + ", "
+        + "childObjs : " + (childObjects == null ? "[]" : childObjects) + "]");
+    PathsUpdate update = createHMSUpdate();
+    if (childObjects != null) {
+      for (String childObj : childObjects) {
+        update.newPathChange(authzObj + "." + childObj).addToDelPaths(
+            Lists.newArrayList(PathsUpdate.ALL_PATHS));
+      }
+    }
+    update.newPathChange(authzObj).addToDelPaths(
+        Lists.newArrayList(PathsUpdate.ALL_PATHS));
+    notifySentry(update, true);
+  }
+
+  @Override
+  public void removePath(String authzObj, String path) {
+    if ("*".equals(path)) {
+      removeAllPaths(authzObj, null);
+    } else {
+      LOGGER.debug("#### HMS Path Update ["
+          + "OP : removePath, "
+          + "authzObj : " + authzObj + ", "
+          + "path : " + path + "]");
+      PathsUpdate update = createHMSUpdate();
+      
update.newPathChange(authzObj).addToDelPaths(PathsUpdate.cleanPath(path));
+      notifySentry(update, true);
+    }
+  }
+
+  @Override
+  public void renameAuthzObject(String oldName, String oldPath, String newName,
+      String newPath) {
+    PathsUpdate update = createHMSUpdate();
+    LOGGER.debug("#### HMS Path Update ["
+        + "OP : renameAuthzObject, "
+        + "oldName : " + oldName + ","
+        + "newPath : " + oldPath + ","
+        + "newName : " + newName + ","
+        + "newPath : " + newPath + "]");
+    
update.newPathChange(newName).addToAddPaths(PathsUpdate.cleanPath(newPath));
+    
update.newPathChange(oldName).addToDelPaths(PathsUpdate.cleanPath(oldPath));
+    notifySentry(update, true);
+  }
+
+  private SentryHDFSServiceClient getClient() {
+    if (sentryClient == null) {
+      try {
+        sentryClient = new SentryHDFSServiceClient(conf);
+      } catch (IOException e) {
+        sentryClient = null;
+        LOGGER.error("Could not connect to Sentry HDFS Service !!", e);
+      }
+    }
+    return sentryClient;
+  }
+
+  private PathsUpdate createHMSUpdate() {
+    PathsUpdate update = new PathsUpdate(seqNum.incrementAndGet(), false);
+    LOGGER.debug("#### HMS Path Update SeqNum : [" + seqNum.get() + "]");
+    return update;
+  }
+
+  private void notifySentryNoLock(PathsUpdate update, boolean applyLocal) {
+    if (applyLocal) {
+      authzPaths.updatePartial(Lists.newArrayList(update), new 
ReentrantReadWriteLock());
+    }
+    try {
+      getClient().notifyHMSUpdate(update);
+    } catch (Exception e) {
+      LOGGER.error("Could not send update to Sentry HDFS Service !!", e);
+    } finally {
+      lastSentSeqNum = update.getSeqNum();
+      LOGGER.debug("#### HMS Path Last update sent : [" + lastSentSeqNum + 
"]");
+    }
+  }
+
+  private void notifySentry(PathsUpdate update, boolean applyLocal) {
+    notificiationLock.lock();
+    try {
+      notifySentryNoLock(update, applyLocal);
+    } finally {
+      notificiationLock.unlock();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
----------------------------------------------------------------------
diff --git 
a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
 
b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
new file mode 100644
index 0000000..cc849b9
--- /dev/null
+++ 
b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.hdfs;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.sentry.hdfs.service.thrift.SentryHDFSService;
+import org.apache.sentry.hdfs.service.thrift.TAuthzUpdateResponse;
+import org.apache.sentry.hdfs.service.thrift.TPathsUpdate;
+import org.apache.sentry.hdfs.service.thrift.TPermissionsUpdate;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SentryHDFSServiceProcessor.class);
+
+  @Override
+  public TAuthzUpdateResponse get_all_authz_updates_from(long permSeqNum, long 
pathSeqNum)
+      throws TException {
+    TAuthzUpdateResponse retVal = new TAuthzUpdateResponse();
+    retVal.setAuthzPathUpdate(new LinkedList<TPathsUpdate>());
+    retVal.setAuthzPermUpdate(new LinkedList<TPermissionsUpdate>());
+    if (SentryPlugin.instance != null) {
+      List<PermissionsUpdate> permUpdates = 
SentryPlugin.instance.getAllPermsUpdatesFrom(permSeqNum);
+      List<PathsUpdate> pathUpdates = 
SentryPlugin.instance.getAllPathsUpdatesFrom(pathSeqNum);
+      try {
+        for (PathsUpdate update : pathUpdates) {
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("### Sending PATH preUpdate seq [" + 
update.getSeqNum() + "] ###");
+            LOGGER.debug("### Sending PATH preUpdate [" + update.toThrift() + 
"] ###");
+          }
+          retVal.getAuthzPathUpdate().add(update.toThrift());
+        }
+        for (PermissionsUpdate update : permUpdates) {
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("### Sending PERM preUpdate seq [" + 
update.getSeqNum() + "] ###");
+            LOGGER.debug("### Sending PERM preUpdate [" + update.toThrift() + 
"] ###");
+          }
+          retVal.getAuthzPermUpdate().add(update.toThrift());
+        }
+        if (LOGGER.isDebugEnabled()) {
+          StringBuilder permSeq = new StringBuilder("<");
+          for (PermissionsUpdate permUpdate : permUpdates) {
+            permSeq.append(permUpdate.getSeqNum()).append(",");
+          }
+          permSeq.append(">");
+          StringBuilder pathSeq = new StringBuilder("<");
+          for (PathsUpdate pathUpdate : pathUpdates) {
+            pathSeq.append(pathUpdate.getSeqNum()).append(",");
+          }
+          pathSeq.append(">");
+          LOGGER.debug("#### Updates requested from HDFS ["
+              + "permReq=" + permSeqNum + ", permResp=" + permSeq + "] "
+              + "[pathReq=" + pathSeqNum + ", pathResp=" + pathSeq + "]");
+        }
+      } catch (Exception e) {
+        LOGGER.error("Error Sending updates to downstream Cache", e);
+        throw new TException(e);
+      }
+    } else {
+      LOGGER.error("SentryPlugin not initialized yet !!");
+    }
+    
+    return retVal;
+  }
+
+  @Override
+  public void handle_hms_notification(TPathsUpdate update) throws TException {
+    try {
+      PathsUpdate hmsUpdate = new PathsUpdate(update);
+      if (SentryPlugin.instance != null) {
+        SentryPlugin.instance.handlePathUpdateNotification(hmsUpdate);
+        LOGGER.debug("Authz Paths update [" + hmsUpdate.getSeqNum() + "]..");
+      } else {
+        LOGGER.error("SentryPlugin not initialized yet !!");
+      }
+    } catch (Exception e) {
+      LOGGER.error("Error handling notification from HMS", e);
+      throw new TException(e);
+    }
+  }
+
+  @Override
+  public long check_hms_seq_num(long pathSeqNum) throws TException {
+    return SentryPlugin.instance.getLastSeenHMSPathSeqNum();
+  }
+
+  /**
+   * Not implemented for the time being..
+   */
+  @Override
+  public Map<String, List<String>> get_all_related_paths(String arg0,
+      boolean arg1) throws TException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java
----------------------------------------------------------------------
diff --git 
a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java
 
b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java
new file mode 100644
index 0000000..d35de75
--- /dev/null
+++ 
b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.hdfs;
+
+import java.net.Socket;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.hdfs.service.thrift.SentryHDFSService;
+import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Iface;
+import org.apache.sentry.provider.db.log.util.CommandUtil;
+import org.apache.sentry.service.thrift.ProcessorFactory;
+import org.apache.thrift.TException;
+import org.apache.thrift.TMultiplexedProcessor;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SentryHDFSServiceProcessorFactory extends ProcessorFactory{
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SentryHDFSServiceProcessorFactory.class);
+
+  static class ProcessorWrapper extends 
SentryHDFSService.Processor<SentryHDFSService.Iface> {
+
+    public ProcessorWrapper(Iface iface) {
+      super(iface);
+    }
+    @Override
+    public boolean process(TProtocol in, TProtocol out) throws TException {
+      setIpAddress(in);
+      setImpersonator(in);
+      return super.process(in, out);
+    }
+
+    private void setImpersonator(final TProtocol in) {
+      TTransport transport = in.getTransport();
+      if (transport instanceof TSaslServerTransport) {
+        String impersonator = ((TSaslServerTransport) 
transport).getSaslServer().getAuthorizationID();
+        CommandUtil.setImpersonator(impersonator);
+      }
+    }
+
+    private void setIpAddress(final TProtocol in) {
+      TTransport transport = in.getTransport();
+      TSocket tSocket = getUnderlyingSocketFromTransport(transport);
+      if (tSocket != null) {
+        setIpAddress(tSocket.getSocket());
+      } else {
+        LOGGER.warn("Unknown Transport, cannot determine ipAddress");
+      }
+    }
+
+    private void setIpAddress(Socket socket) {
+      CommandUtil.setIpAddress(socket.getInetAddress().toString());
+    }
+
+    private TSocket getUnderlyingSocketFromTransport(TTransport transport) {
+      if (transport != null) {
+        if (transport instanceof TSaslServerTransport) {
+          transport = ((TSaslServerTransport) 
transport).getUnderlyingTransport();
+        } else if (transport instanceof TSaslClientTransport) {
+          transport = ((TSaslClientTransport) 
transport).getUnderlyingTransport();
+        } else {
+          if (!(transport instanceof TSocket)) {
+            LOGGER.warn("Transport class [" + transport.getClass().getName() + 
"] is not of type TSocket");
+            return null;
+          }
+        }
+        return (TSocket) transport;
+      }
+      return null;
+    }
+  }
+
+  public SentryHDFSServiceProcessorFactory(Configuration conf) {
+    super(conf);
+  }
+
+
+  public boolean register(TMultiplexedProcessor multiplexedProcessor) throws 
Exception {
+    SentryHDFSServiceProcessor sentryServiceHandler =
+        new SentryHDFSServiceProcessor();
+    TProcessor processor = new ProcessorWrapper(sentryServiceHandler);
+    multiplexedProcessor.registerProcessor(
+        SentryHDFSServiceClient.SENTRY_HDFS_SERVICE_NAME, processor);
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
----------------------------------------------------------------------
diff --git 
a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
 
b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
new file mode 100644
index 0000000..55b7697
--- /dev/null
+++ 
b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.hdfs;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
+import org.apache.sentry.hdfs.UpdateForwarder.ExternalImageRetriever;
+import org.apache.sentry.hdfs.service.thrift.TPathChanges;
+import org.apache.sentry.hdfs.service.thrift.TPermissionsUpdate;
+import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges;
+import org.apache.sentry.hdfs.service.thrift.TRoleChanges;
+import org.apache.sentry.provider.db.SentryPolicyStorePlugin;
+import 
org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import 
org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleAddGroupsRequest;
+import 
org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleDeleteGroupsRequest;
+import 
org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleGrantPrivilegeRequest;
+import 
org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleRevokePrivilegeRequest;
+import org.apache.sentry.provider.db.service.thrift.TDropPrivilegesRequest;
+import org.apache.sentry.provider.db.service.thrift.TDropSentryRoleRequest;
+import org.apache.sentry.provider.db.service.thrift.TRenamePrivilegesRequest;
+import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
+import org.apache.sentry.provider.db.service.thrift.TSentryGroup;
+import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+
+public class SentryPlugin implements SentryPolicyStorePlugin {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SentryPlugin.class);
+
+  public static volatile SentryPlugin instance;
+
+  static class PermImageRetriever implements 
ExternalImageRetriever<PermissionsUpdate> {
+
+    private final SentryStore sentryStore;
+
+    public PermImageRetriever(SentryStore sentryStore) {
+      this.sentryStore = sentryStore;
+    }
+
+    @Override
+    public PermissionsUpdate retrieveFullImage(long currSeqNum) {
+      Map<String, HashMap<String, String>> privilegeImage = 
sentryStore.retrieveFullPrivilegeImage();
+      Map<String, LinkedList<String>> roleImage = 
sentryStore.retrieveFullRoleImage();
+      
+      TPermissionsUpdate tPermUpdate = new TPermissionsUpdate(true, currSeqNum,
+          new HashMap<String, TPrivilegeChanges>(),
+          new HashMap<String, TRoleChanges>());
+      for (Map.Entry<String, HashMap<String, String>> privEnt : 
privilegeImage.entrySet()) {
+        String authzObj = privEnt.getKey();
+        HashMap<String,String> privs = privEnt.getValue();
+        tPermUpdate.putToPrivilegeChanges(authzObj, new TPrivilegeChanges(
+            authzObj, privs, new HashMap<String, String>()));
+      }
+      for (Map.Entry<String, LinkedList<String>> privEnt : 
roleImage.entrySet()) {
+        String role = privEnt.getKey();
+        LinkedList<String> groups = privEnt.getValue();
+        tPermUpdate.putToRoleChanges(role, new TRoleChanges(role, groups, new 
LinkedList<String>()));
+      }
+      PermissionsUpdate permissionsUpdate = new PermissionsUpdate(tPermUpdate);
+      permissionsUpdate.setSeqNum(currSeqNum);
+      return permissionsUpdate;
+    }
+    
+  }
+
+  private UpdateForwarder<PathsUpdate> pathsUpdater;
+  private UpdateForwarder<PermissionsUpdate> permsUpdater;
+  private final AtomicLong permSeqNum = new AtomicLong(5);
+  private PermImageRetriever permImageRetriever;
+
+  long getLastSeenHMSPathSeqNum() {
+    return pathsUpdater.getLastSeen();
+  }
+
+  @Override
+  public void initialize(Configuration conf, SentryStore sentryStore) throws 
SentryPluginException {
+    final String[] pathPrefixes = conf
+        .getStrings(ServerConfig.SENTRY_HDFS_INTEGRATION_PATH_PREFIXES,
+            ServerConfig.SENTRY_HDFS_INTEGRATION_PATH_PREFIXES_DEFAULT);
+    final int initUpdateRetryDelayMs =
+        conf.getInt(ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS,
+            ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT);
+    pathsUpdater = new UpdateForwarder<PathsUpdate>(new UpdateableAuthzPaths(
+        pathPrefixes), null, 100, initUpdateRetryDelayMs);
+    permImageRetriever = new PermImageRetriever(sentryStore);
+    permsUpdater = new UpdateForwarder<PermissionsUpdate>(
+        new UpdateablePermissions(permImageRetriever), permImageRetriever,
+        100, initUpdateRetryDelayMs);
+    LOGGER.info("Sentry HDFS plugin initialized !!");
+    instance = this;
+  }
+
+  public List<PathsUpdate> getAllPathsUpdatesFrom(long pathSeqNum) {
+    return pathsUpdater.getAllUpdatesFrom(pathSeqNum);
+  }
+
+  public List<PermissionsUpdate> getAllPermsUpdatesFrom(long permSeqNum) {
+    return permsUpdater.getAllUpdatesFrom(permSeqNum);
+  }
+
+  public void handlePathUpdateNotification(PathsUpdate update) {
+    pathsUpdater.handleUpdateNotification(update);
+    LOGGER.debug("Recieved Authz Path update [" + update.getSeqNum() + "]..");
+  }
+
+  @Override
+  public void onAlterSentryRoleAddGroups(
+      TAlterSentryRoleAddGroupsRequest request) throws SentryPluginException {
+    PermissionsUpdate update = new 
PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+    TRoleChanges rUpdate = update.addRoleUpdate(request.getRoleName());
+    for (TSentryGroup group : request.getGroups()) {
+      rUpdate.addToAddGroups(group.getGroupName());
+    }
+    permsUpdater.handleUpdateNotification(update);
+    LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + 
request.getRoleName() + "]..");
+  }
+
+  @Override
+  public void onAlterSentryRoleDeleteGroups(
+      TAlterSentryRoleDeleteGroupsRequest request)
+      throws SentryPluginException {
+    PermissionsUpdate update = new 
PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+    TRoleChanges rUpdate = update.addRoleUpdate(request.getRoleName());
+    for (TSentryGroup group : request.getGroups()) {
+      rUpdate.addToDelGroups(group.getGroupName());
+    }
+    permsUpdater.handleUpdateNotification(update);
+    LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + 
request.getRoleName() + "]..");
+  }
+
+  @Override
+  public void onAlterSentryRoleGrantPrivilege(
+      TAlterSentryRoleGrantPrivilegeRequest request)
+      throws SentryPluginException {
+    String authzObj = getAuthzObj(request.getPrivilege());
+    if (authzObj != null) {
+      PermissionsUpdate update = new 
PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+      update.addPrivilegeUpdate(authzObj).putToAddPrivileges(
+          request.getRoleName(), 
request.getPrivilege().getAction().toUpperCase());
+      permsUpdater.handleUpdateNotification(update);
+      LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + "]..");
+    }
+  }
+
+  @Override
+  public void onRenameSentryPrivilege(TRenamePrivilegesRequest request)
+      throws SentryPluginException {
+    String oldAuthz = getAuthzObj(request.getOldAuthorizable());
+    String newAuthz = getAuthzObj(request.getNewAuthorizable());
+    PermissionsUpdate update = new 
PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+    TPrivilegeChanges privUpdate = 
update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS);
+    privUpdate.putToAddPrivileges(newAuthz, newAuthz);
+    privUpdate.putToDelPrivileges(oldAuthz, oldAuthz);
+    permsUpdater.handleUpdateNotification(update);
+    LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + 
newAuthz + ", " + oldAuthz + "]..");
+  }
+
+  @Override
+  public void onAlterSentryRoleRevokePrivilege(
+      TAlterSentryRoleRevokePrivilegeRequest request)
+      throws SentryPluginException {
+    String authzObj = getAuthzObj(request.getPrivilege());
+    if (authzObj != null) {
+      PermissionsUpdate update = new 
PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+      update.addPrivilegeUpdate(authzObj).putToDelPrivileges(
+          request.getRoleName(), 
request.getPrivilege().getAction().toUpperCase());
+      permsUpdater.handleUpdateNotification(update);
+      LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + 
authzObj + "]..");
+    }
+  }
+
+  @Override
+  public void onDropSentryRole(TDropSentryRoleRequest request)
+      throws SentryPluginException {
+    PermissionsUpdate update = new 
PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+    
update.addPrivilegeUpdate(PermissionsUpdate.ALL_AUTHZ_OBJ).putToDelPrivileges(
+        request.getRoleName(), PermissionsUpdate.ALL_AUTHZ_OBJ);
+    
update.addRoleUpdate(request.getRoleName()).addToDelGroups(PermissionsUpdate.ALL_GROUPS);
+    permsUpdater.handleUpdateNotification(update);
+    LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + 
request.getRoleName() + "]..");
+  }
+
+  @Override
+  public void onDropSentryPrivilege(TDropPrivilegesRequest request)
+      throws SentryPluginException {
+    PermissionsUpdate update = new 
PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+    String authzObj = getAuthzObj(request.getAuthorizable());
+    update.addPrivilegeUpdate(authzObj).putToDelPrivileges(
+        PermissionsUpdate.ALL_ROLES, PermissionsUpdate.ALL_ROLES);
+    permsUpdater.handleUpdateNotification(update);
+    LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + 
authzObj + "]..");
+  }
+
+  private String getAuthzObj(TSentryPrivilege privilege) {
+    String authzObj = null;
+    if (!SentryStore.isNULL(privilege.getDbName())) {
+      String dbName = privilege.getDbName();
+      String tblName = privilege.getTableName();
+      if (SentryStore.isNULL(tblName)) {
+        authzObj = dbName;
+      } else {
+        authzObj = dbName + "." + tblName;
+      }
+    }
+    return authzObj;
+  }
+
+  private String getAuthzObj(TSentryAuthorizable authzble) {
+    String authzObj = null;
+    if (!SentryStore.isNULL(authzble.getDb())) {
+      String dbName = authzble.getDb();
+      String tblName = authzble.getTable();
+      if (SentryStore.isNULL(tblName)) {
+        authzObj = dbName;
+      } else {
+        authzObj = dbName + "." + tblName;
+      }
+    }
+    return authzObj;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/2e509e4b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
----------------------------------------------------------------------
diff --git 
a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
 
b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
new file mode 100644
index 0000000..f321d3d
--- /dev/null
+++ 
b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class UpdateForwarder<K extends Updateable.Update> implements
+    Updateable<K> {
+
+  public static interface ExternalImageRetriever<K> {
+
+    public K retrieveFullImage(long currSeqNum);
+
+  }
+
+  private final AtomicLong lastSeenSeqNum = new AtomicLong(0);
+  private final AtomicLong lastCommittedSeqNum = new AtomicLong(0);
+  // Updates should be handled in order
+  private final Executor updateHandler = Executors.newSingleThreadExecutor();
+
+  // Update log is used when propagate updates to a downstream cache.
+  // The preUpdate log stores all commits that were applied to this cache.
+  // When the update log is filled to capacity (updateLogSize), all
+  // entries are cleared and a compact image if the state of the cache is
+  // appended to the log.
+  // The first entry in an update log (consequently the first preUpdate a
+  // downstream cache sees) will be a full image. All subsequent entries are
+  // partial edits
+  private final LinkedList<K> updateLog = new LinkedList<K>();
+  // UpdateLog is disabled when updateLogSize = 0;
+  private final int updateLogSize;
+
+  private final ExternalImageRetriever<K> imageRetreiver;
+
+  private volatile Updateable<K> updateable;
+
+  private final ReadWriteLock lock = new ReentrantReadWriteLock();
+  private static final long INIT_SEQ_NUM = -2;
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(UpdateForwarder.class);
+
+  public UpdateForwarder(Updateable<K> updateable,
+      ExternalImageRetriever<K> imageRetreiver, int updateLogSize) {
+    this(updateable, imageRetreiver, updateLogSize, 5000);
+  }
+  public UpdateForwarder(Updateable<K> updateable,
+      ExternalImageRetriever<K> imageRetreiver, int updateLogSize,
+      int initUpdateRetryDelay) {
+    this.updateLogSize = updateLogSize;
+    this.imageRetreiver = imageRetreiver;
+    if (imageRetreiver != null) {
+      spawnInitialUpdater(updateable, initUpdateRetryDelay);
+    } else {
+      this.updateable = updateable;
+    }
+  }
+
+  private void spawnInitialUpdater(final Updateable<K> updateable,
+      final int initUpdateRetryDelay) {
+    K firstFullImage = null;
+    try {
+      firstFullImage = imageRetreiver.retrieveFullImage(INIT_SEQ_NUM);
+    } catch (Exception e) {
+      LOGGER.warn("InitialUpdater encountered exception !! ", e);
+      firstFullImage = null;
+      Thread initUpdater = new Thread() {
+        @Override
+        public void run() {
+          while (UpdateForwarder.this.updateable == null) {
+            try {
+              Thread.sleep(initUpdateRetryDelay);
+            } catch (InterruptedException e) {
+              LOGGER.warn("Thread interrupted !! ", e);
+              break;
+            }
+            K fullImage = null;
+            try {
+              fullImage =
+                  UpdateForwarder.this.imageRetreiver
+                  .retrieveFullImage(INIT_SEQ_NUM);
+              appendToUpdateLog(fullImage);
+            } catch (Exception e) {
+              LOGGER.warn("InitialUpdater encountered exception !! ", e);
+            }
+            if (fullImage != null) {
+              UpdateForwarder.this.updateable = 
updateable.updateFull(fullImage);
+            }
+          }
+        }
+      };
+      initUpdater.start();
+    }
+    if (firstFullImage != null) {
+      appendToUpdateLog(firstFullImage);
+      this.updateable = updateable.updateFull(firstFullImage);
+    }
+  }
+  /**
+   * Handle notifications from HMS plug-in or upstream Cache
+   * @param update
+   */
+  public void handleUpdateNotification(final K update) {
+    // Correct the seqNums on the first update
+    if (lastCommittedSeqNum.get() == INIT_SEQ_NUM) {
+      K firstUpdate = updateLog.peek();
+      long firstSeqNum = update.getSeqNum() - 1; 
+      if (firstUpdate != null) {
+        firstUpdate.setSeqNum(firstSeqNum);
+      }
+      lastCommittedSeqNum.set(firstSeqNum);
+      lastSeenSeqNum.set(firstSeqNum);
+    }
+    final boolean editNotMissed = 
+        lastSeenSeqNum.incrementAndGet() == update.getSeqNum();
+    if (!editNotMissed) {
+      lastSeenSeqNum.set(update.getSeqNum());
+    }
+    Runnable task = new Runnable() {
+      @Override
+      public void run() {
+        K toUpdate = update;
+        if (update.hasFullImage()) {
+          updateable = updateable.updateFull(update);
+        } else {
+          if (editNotMissed) {
+            // apply partial preUpdate
+            updateable.updatePartial(Lists.newArrayList(update), lock);
+          } else {
+            // Retrieve full update from External Source and
+            if (imageRetreiver != null) {
+              toUpdate = imageRetreiver
+                  .retrieveFullImage(update.getSeqNum());
+              updateable = updateable.updateFull(toUpdate);
+            }
+          }
+        }
+        appendToUpdateLog(toUpdate);
+      }
+    };
+    updateHandler.execute(task);
+  }
+
+  private void appendToUpdateLog(K update) {
+    synchronized (updateLog) {
+      boolean logCompacted = false;
+      if (updateLogSize > 0) {
+        if (update.hasFullImage() || (updateLog.size() == updateLogSize)) {
+          // Essentially a log compaction
+          updateLog.clear();
+          updateLog.add(update.hasFullImage() ? update
+              : createFullImageUpdate(update.getSeqNum()));
+          logCompacted = true;
+        } else {
+          updateLog.add(update);
+        }
+      }
+      lastCommittedSeqNum.set(update.getSeqNum());
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("#### Appending to Update Log ["
+            + "type=" + update.getClass() + ", "
+            + "lastCommit=" + lastCommittedSeqNum.get() + ", "
+            + "lastSeen=" + lastSeenSeqNum.get() + ", "
+            + "logCompacted=" + logCompacted + "]");
+      }
+    }
+  }
+
+  /**
+   * Return all updates from requested seqNum (inclusive)
+   * @param seqNum
+   * @return
+   */
+  public List<K> getAllUpdatesFrom(long seqNum) {
+    List<K> retVal = new LinkedList<K>();
+    synchronized (updateLog) {
+      long currSeqNum = lastCommittedSeqNum.get();
+      if (LOGGER.isDebugEnabled() && (updateable != null)) {
+        LOGGER.debug("#### GetAllUpdatesFrom ["
+            + "type=" + updateable.getClass() + ", "
+            + "reqSeqNum=" + seqNum + ", "
+            + "lastCommit=" + lastCommittedSeqNum.get() + ", "
+            + "lastSeen=" + lastSeenSeqNum.get() + ", "
+            + "updateLogSize=" + updateLog.size() + "]");
+      }
+      if (updateLogSize == 0) {
+        // no updatelog configured..
+        return retVal;
+      }
+      K head = updateLog.peek();
+      if (head == null) {
+        return retVal;
+      }
+      if (seqNum > currSeqNum + 1) {
+        // This process has probably restarted since downstream
+        // recieved last update
+        retVal.addAll(updateLog);
+        return retVal;
+      }
+      if (head.getSeqNum() > seqNum) {
+        // Caller has diverged greatly..
+        if (head.hasFullImage()) {
+          // head is a refresh(full) image
+          // Send full image along with partial updates
+          for (K u : updateLog) {
+            retVal.add(u);
+          }
+        } else {
+          // Create a full image
+          // clear updateLog
+          // add fullImage to head of Log
+          // NOTE : This should ideally never happen
+          K fullImage = createFullImageUpdate(currSeqNum);
+          updateLog.clear();
+          updateLog.add(fullImage);
+          retVal.add(fullImage);
+        }
+      } else {
+        // increment iterator to requested seqNum
+        Iterator<K> iter = updateLog.iterator();
+        while (iter.hasNext()) {
+          K elem = iter.next();
+          if (elem.getSeqNum() >= seqNum) {
+            retVal.add(elem);
+          }
+        }
+      }
+    }
+    return retVal;
+  }
+ 
+  public boolean areAllUpdatesCommited() {
+    return lastCommittedSeqNum.get() == lastSeenSeqNum.get();
+  }
+
+  public long getLastCommitted() {
+    return lastCommittedSeqNum.get();
+  }
+
+  public long getLastSeen() {
+    return lastSeenSeqNum.get();
+  }
+
+  @Override
+  public Updateable<K> updateFull(K update) {
+    return (updateable != null) ? updateable.updateFull(update) : null;
+  }
+
+  @Override
+  public void updatePartial(Iterable<K> updates, ReadWriteLock lock) {
+    if (updateable != null) {
+      updateable.updatePartial(updates, lock);
+    }
+  }
+  
+  @Override
+  public long getLastUpdatedSeqNum() {
+    return (updateable != null) ? updateable.getLastUpdatedSeqNum() : 
INIT_SEQ_NUM;
+  }
+
+  @Override
+  public K createFullImageUpdate(long currSeqNum) {
+    return (updateable != null) ? updateable.createFullImageUpdate(currSeqNum) 
: null;
+  }
+
+}

Reply via email to