http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/d66ebb71/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java index 90192dc..476e9c4 100644 --- a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java +++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java @@ -17,6 +17,7 @@ */ package org.apache.sentry.hdfs; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -34,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class UpdateableAuthzPermissions implements AuthzPermissions, Updateable<PermissionsUpdate> { + private static final int MAX_UPDATES_PER_LOCK_USE = 99; private volatile SentryPermissions perms = new SentryPermissions(); private final AtomicLong seqNum = new AtomicLong(0); @@ -67,7 +69,7 @@ public class UpdateableAuthzPermissions implements AuthzPermissions, Updateable< int counter = 0; for (PermissionsUpdate update : updates) { applyPartialUpdate(update); - if (++counter > 99) { + if (++counter > MAX_UPDATES_PER_LOCK_USE) { counter = 0; lock.writeLock().unlock(); lock.writeLock().lock(); @@ -119,6 +121,19 @@ public class UpdateableAuthzPermissions implements AuthzPermissions, Updateable< private void applyPrivilegeUpdates(PermissionsUpdate update) { for (TPrivilegeChanges pUpdate : update.getPrivilegeUpdates()) { + if (pUpdate.getAuthzObj().equals(PermissionsUpdate.RENAME_PRIVS)) { + String newAuthzObj = pUpdate.getAddPrivileges().keySet().iterator().next(); + String oldAuthzObj = pUpdate.getDelPrivileges().keySet().iterator().next(); + PrivilegeInfo privilegeInfo = perms.getPrivilegeInfo(oldAuthzObj); + Map<String, FsAction> allPermissions = privilegeInfo.getAllPermissions(); + perms.delPrivilegeInfo(oldAuthzObj); + PrivilegeInfo newPrivilegeInfo = new PrivilegeInfo(newAuthzObj); + for (Map.Entry<String, FsAction> e : allPermissions.entrySet()) { + newPrivilegeInfo.setPermission(e.getKey(), e.getValue()); + } + perms.addPrivilegeInfo(newPrivilegeInfo); + return; + } if (pUpdate.getAuthzObj().equals(PermissionsUpdate.ALL_PRIVS)) { // Request to remove role from all Privileges String roleToRemove = pUpdate.getDelPrivileges().keySet().iterator()
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/d66ebb71/sentry-hdfs/sentry-hdfs-service/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/pom.xml b/sentry-hdfs/sentry-hdfs-service/pom.xml index 74c4f20..365380e 100644 --- a/sentry-hdfs/sentry-hdfs-service/pom.xml +++ b/sentry-hdfs/sentry-hdfs-service/pom.xml @@ -67,10 +67,6 @@ limitations under the License. <artifactId>sentry-provider-db</artifactId> </dependency> <dependency> - <groupId>org.apache.sentry</groupId> - <artifactId>sentry-service-client</artifactId> - </dependency> - <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/d66ebb71/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/ExtendedMetastoreClient.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/ExtendedMetastoreClient.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/ExtendedMetastoreClient.java index c0358f4..e7677f2 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/ExtendedMetastoreClient.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/ExtendedMetastoreClient.java @@ -29,11 +29,15 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Implementation of {@link MetastoreClient} + * + */ public class ExtendedMetastoreClient implements MetastoreClient { - + private static Logger LOG = LoggerFactory.getLogger(ExtendedMetastoreClient.class); - private HiveMetaStoreClient client; + private volatile HiveMetaStoreClient client; private final HiveConf hiveConf; public ExtendedMetastoreClient(HiveConf hiveConf) { this.hiveConf = hiveConf; http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/d66ebb71/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java index 17f15f0..08e1319 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java @@ -18,68 +18,149 @@ package org.apache.sentry.hdfs; import java.io.IOException; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStore; +import org.apache.hadoop.hive.metastore.IHMSHandler; +import org.apache.hadoop.hive.metastore.MetaStorePreEventListener; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; +import org.apache.sentry.hdfs.service.thrift.TPathChanges; import org.apache.sentry.provider.db.SentryMetastoreListenerPlugin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; +/** + * Plugin implementation of {@link SentryMetastoreListenerPlugin} that hooks + * into the sites in the {@link MetaStorePreEventListener} that deal with + * creation/updation and deletion for paths. + */ public class MetastorePlugin extends SentryMetastoreListenerPlugin { - + private static final Logger LOGGER = LoggerFactory.getLogger(MetastorePlugin.class); - + private final Configuration conf; private SentryHDFSServiceClient sentryClient; + private UpdateableAuthzPaths authzPaths; //Initialized to some value > 1 so that the first update notification // will trigger a full Image fetch private final AtomicInteger seqNum = new AtomicInteger(5); + private final ExecutorService threadPool; public MetastorePlugin(Configuration conf) { - this.conf = conf; + this.conf = new HiveConf((HiveConf)conf); + this.conf.unset(HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS.varname); + this.conf.unset(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname); + this.conf.unset(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname); + this.conf.unset(HiveConf.ConfVars.METASTOREURIS.varname); + try { + this.authzPaths = createInitialUpdate(HiveMetaStore.newHMSHandler("sentry.hdfs", (HiveConf)this.conf)); + } catch (Exception e1) { + LOGGER.error("Could not create Initial AuthzPaths or HMSHandler !!", e1); + throw new RuntimeException(e1); + } try { sentryClient = new SentryHDFSServiceClient(conf); - } catch (IOException e) { + } catch (Exception e) { sentryClient = null; LOGGER.error("Could not connect to Sentry HDFS Service !!", e); } + ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1); + threadPool.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + long lastSeenHMSPathSeqNum = + MetastorePlugin.this.getClient().getLastSeenHMSPathSeqNum(); + if (lastSeenHMSPathSeqNum != seqNum.get()) { + LOGGER.warn("Sentry not in sync with HMS [" + lastSeenHMSPathSeqNum + ", " + seqNum.get() + "]"); + PathsUpdate fullImageUpdate = + MetastorePlugin.this.authzPaths.createFullImageUpdate( + seqNum.get()); + LOGGER.warn("Sentry not in sync with HMS !!"); + notifySentry(fullImageUpdate); + } + } catch (Exception e) { + sentryClient = null; + LOGGER.error("Error talking to Sentry HDFS Service !!", e); + } + } + }, this.conf.getLong(ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS, + ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT), 1000, + TimeUnit.MILLISECONDS); + this.threadPool = threadPool; + } + + private UpdateableAuthzPaths createInitialUpdate(IHMSHandler hmsHandler) throws Exception { + UpdateableAuthzPaths authzPaths = new UpdateableAuthzPaths(new String[] {"/"}); + PathsUpdate tempUpdate = new PathsUpdate(-1, false); + List<String> allDbStr = hmsHandler.get_all_databases(); + for (String dbName : allDbStr) { + Database db = hmsHandler.get_database(dbName); + tempUpdate.newPathChange(db.getName()).addToAddPaths( + PathsUpdate.cleanPath(db.getLocationUri())); + List<String> allTblStr = hmsHandler.get_all_tables(db.getName()); + for (String tblName : allTblStr) { + Table tbl = hmsHandler.get_table(db.getName(), tblName); + TPathChanges tblPathChange = tempUpdate.newPathChange(tbl + .getDbName() + "." + tbl.getTableName()); + List<Partition> tblParts = + hmsHandler.get_partitions(db.getName(), tbl.getTableName(), (short) -1); + tblPathChange.addToAddPaths(PathsUpdate.cleanPath(tbl.getSd() + .getLocation() == null ? db.getLocationUri() : tbl + .getSd().getLocation())); + for (Partition part : tblParts) { + tblPathChange.addToAddPaths(PathsUpdate.cleanPath(part.getSd() + .getLocation())); + } + } + } + authzPaths.updatePartial(Lists.newArrayList(tempUpdate), + new ReentrantReadWriteLock()); + return authzPaths; } @Override public void addPath(String authzObj, String path) { PathsUpdate update = createHMSUpdate(); update.newPathChange(authzObj).addToAddPaths(PathsUpdate.cleanPath(path)); - try { - notifySentry(update); - } catch (MetaException e) { - LOGGER.error("Could not send update to Sentry HDFS Service !!", e); - } + notifySentry(update); } @Override public void removeAllPaths(String authzObj) { PathsUpdate update = createHMSUpdate(); update.newPathChange(authzObj).addToDelPaths(Lists.newArrayList(PathsUpdate.ALL_PATHS)); - try { - notifySentry(update); - } catch (MetaException e) { - LOGGER.error("Could not send update to Sentry HDFS Service !!", e); - } + notifySentry(update); } @Override public void removePath(String authzObj, String path) { PathsUpdate update = createHMSUpdate(); update.newPathChange(authzObj).addToDelPaths(PathsUpdate.cleanPath(path)); - try { - notifySentry(update); - } catch (MetaException e) { - LOGGER.error("Could not send update to Sentry HDFS Service !!", e); - } + notifySentry(update); + } + + @Override + public void renameAuthzObject(String oldName, String oldPath, String newName, + String newPath) { + PathsUpdate update = createHMSUpdate(); + update.newPathChange(newName).addToAddPaths(PathsUpdate.cleanPath(newPath)); + update.newPathChange(oldName).addToDelPaths(PathsUpdate.cleanPath(oldPath)); + notifySentry(update); } private SentryHDFSServiceClient getClient() { @@ -99,11 +180,12 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin { return update; } - private void notifySentry(PathsUpdate update) throws MetaException { + private void notifySentry(PathsUpdate update) { + authzPaths.updatePartial(Lists.newArrayList(update), new ReentrantReadWriteLock()); try { getClient().notifyHMSUpdate(update); - } catch (IOException e) { - throw new MetaException("Error sending update to Sentry [" + e.getMessage() + "]"); + } catch (Exception e) { + LOGGER.error("Could not send update to Sentry HDFS Service !!", e); } } http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/d66ebb71/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java index ab07494..fd33d29 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java @@ -47,16 +47,16 @@ public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface { for (PathsUpdate update : pathUpdates) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("### Sending PATH preUpdate seq [" + update.getSeqNum() + "] ###"); - LOGGER.debug("### Sending PATH preUpdate [" + update.getThriftObject() + "] ###"); + LOGGER.debug("### Sending PATH preUpdate [" + update.toThrift() + "] ###"); } - retVal.getAuthzPathUpdate().add(update.getThriftObject()); + retVal.getAuthzPathUpdate().add(update.toThrift()); } for (PermissionsUpdate update : permUpdates) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("### Sending PERM preUpdate seq [" + update.getSeqNum() + "] ###"); - LOGGER.debug("### Sending PERM preUpdate [" + update.getThriftObject() + "] ###"); + LOGGER.debug("### Sending PERM preUpdate [" + update.toThrift() + "] ###"); } - retVal.getAuthzPermUpdate().add(update.getThriftObject()); + retVal.getAuthzPermUpdate().add(update.toThrift()); } } catch (Exception e) { LOGGER.error("Error Sending updates to downstream Cache", e); @@ -85,6 +85,11 @@ public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface { } } + @Override + public long check_hms_seq_num(long pathSeqNum) throws TException { + return SentryPlugin.instance.getLastSeenHMSPathSeqNum(); + } + /** * Not implemented for the time being.. */ http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/d66ebb71/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java index 5bb6d45..40e952f 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java @@ -23,15 +23,10 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; import org.apache.sentry.hdfs.UpdateForwarder.ExternalImageRetriever; -import org.apache.sentry.hdfs.service.thrift.TPathChanges; import org.apache.sentry.hdfs.service.thrift.TPermissionsUpdate; import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges; import org.apache.sentry.hdfs.service.thrift.TRoleChanges; @@ -42,14 +37,13 @@ import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleDeleteGroups import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleGrantPrivilegeRequest; import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleRevokePrivilegeRequest; import org.apache.sentry.provider.db.service.thrift.TDropSentryRoleRequest; +import org.apache.sentry.provider.db.service.thrift.TRenamePrivilegesRequest; +import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable; import org.apache.sentry.provider.db.service.thrift.TSentryGroup; import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege; -import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; - public class SentryPlugin implements SentryPolicyStorePlugin { private static final Logger LOGGER = LoggerFactory.getLogger(SentryPlugin.class); @@ -94,18 +88,24 @@ public class SentryPlugin implements SentryPolicyStorePlugin { private UpdateForwarder<PermissionsUpdate> permsUpdater; private final AtomicLong permSeqNum = new AtomicLong(5); + long getLastSeenHMSPathSeqNum() { + return pathsUpdater.getLastSeen(); + } + @Override public void initialize(Configuration conf, SentryStore sentryStore) throws SentryPluginException { - HiveConf hiveConf = new HiveConf(conf, Configuration.class); - final MetastoreClient hmsClient = new ExtendedMetastoreClient(hiveConf); final String[] pathPrefixes = conf .getStrings(ServerConfig.SENTRY_HDFS_INTEGRATION_PATH_PREFIXES, ServerConfig.SENTRY_HDFS_INTEGRATION_PATH_PREFIXES_DEFAULT); + final int initUpdateRetryDelayMs = + conf.getInt(ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS, + ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT); pathsUpdater = new UpdateForwarder<PathsUpdate>(new UpdateableAuthzPaths( - pathPrefixes), createHMSImageRetriever(pathPrefixes, hmsClient), 100); + pathPrefixes), null, 100, initUpdateRetryDelayMs); PermImageRetriever permImageRetriever = new PermImageRetriever(sentryStore); permsUpdater = new UpdateForwarder<PermissionsUpdate>( - new UpdateablePermissions(permImageRetriever), permImageRetriever, 100); + new UpdateablePermissions(permImageRetriever), permImageRetriever, + 100, initUpdateRetryDelayMs); instance = this; } @@ -122,42 +122,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin { LOGGER.info("Recieved Authz Path update [" + update.getSeqNum() + "].."); } - private ExternalImageRetriever<PathsUpdate> createHMSImageRetriever( - final String[] pathPrefixes, final MetastoreClient hmsClient) { - return new ExternalImageRetriever<PathsUpdate>() { - @Override - public PathsUpdate retrieveFullImage(long currSeqNum) { - PathsUpdate tempUpdate = new PathsUpdate(currSeqNum, false); - List<Database> allDatabases = hmsClient.getAllDatabases(); - for (Database db : allDatabases) { - tempUpdate.newPathChange(db.getName()).addToAddPaths( - PathsUpdate.cleanPath(db.getLocationUri())); - List<Table> allTables = hmsClient.getAllTablesOfDatabase(db); - for (Table tbl : allTables) { - TPathChanges tblPathChange = tempUpdate.newPathChange(tbl - .getDbName() + "." + tbl.getTableName()); - List<Partition> tblParts = hmsClient.listAllPartitions(db, tbl); - tblPathChange.addToAddPaths(PathsUpdate.cleanPath(tbl.getSd() - .getLocation() == null ? db.getLocationUri() : tbl - .getSd().getLocation())); - for (Partition part : tblParts) { - tblPathChange.addToAddPaths(PathsUpdate.cleanPath(part.getSd() - .getLocation())); - } - } - } - UpdateableAuthzPaths tmpAuthzPaths = new UpdateableAuthzPaths( - pathPrefixes); - tmpAuthzPaths.updatePartial(Lists.newArrayList(tempUpdate), - new ReentrantReadWriteLock()); - PathsUpdate retUpdate = new PathsUpdate(currSeqNum, true); - retUpdate.getThriftObject().setPathsDump( - tmpAuthzPaths.getPathsDump().createPathsDump()); - return retUpdate; - } - }; - } - @Override public void onAlterSentryRoleAddGroups( TAlterSentryRoleAddGroupsRequest request) throws SentryPluginException { @@ -198,6 +162,19 @@ public class SentryPlugin implements SentryPolicyStorePlugin { } @Override + public void onRenameSentryPrivilege(TRenamePrivilegesRequest request) + throws SentryPluginException { + String oldAuthz = getAuthzObj(request.getOldAuthorizable()); + String newAuthz = getAuthzObj(request.getNewAuthorizable()); + PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false); + TPrivilegeChanges privUpdate = update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS); + privUpdate.putToAddPrivileges(newAuthz, newAuthz); + privUpdate.putToDelPrivileges(oldAuthz, oldAuthz); + permsUpdater.handleUpdateNotification(update); + LOGGER.info("Authz Perm preUpdate [" + update.getSeqNum() + ", " + newAuthz + ", " + oldAuthz + "].."); + } + + @Override public void onAlterSentryRoleRevokePrivilege( TAlterSentryRoleRevokePrivilegeRequest request) throws SentryPluginException { @@ -236,4 +213,17 @@ public class SentryPlugin implements SentryPolicyStorePlugin { return authzObj; } + private String getAuthzObj(TSentryAuthorizable authzble) { + String authzObj = null; + if (!SentryStore.isNULL(authzble.getDb())) { + String dbName = authzble.getDb(); + String tblName = authzble.getTable(); + if (tblName == null) { + authzObj = dbName; + } else { + authzObj = dbName + "." + tblName; + } + } + return authzObj; + } } http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/d66ebb71/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java index b0fc5ed..2815880 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java @@ -26,7 +26,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.sentry.hdfs.Updateable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; @@ -53,7 +54,7 @@ public class UpdateForwarder<K extends Updateable.Update> implements // downstream cache sees) will be a full image. All subsequent entries are // partial edits private final LinkedList<K> updateLog = new LinkedList<K>(); - // UpdateLog is dissabled when updateLogSize = 0; + // UpdateLog is disabled when updateLogSize = 0; private final int updateLogSize; private final ExternalImageRetriever<K> imageRetreiver; @@ -63,15 +64,64 @@ public class UpdateForwarder<K extends Updateable.Update> implements private final ReadWriteLock lock = new ReentrantReadWriteLock(); private static final long INIT_SEQ_NUM = -2; + private static final Logger LOGGER = LoggerFactory.getLogger(UpdateForwarder.class); + public UpdateForwarder(Updateable<K> updateable, ExternalImageRetriever<K> imageRetreiver, int updateLogSize) { + this(updateable, imageRetreiver, updateLogSize, 5000); + } + public UpdateForwarder(Updateable<K> updateable, + ExternalImageRetriever<K> imageRetreiver, int updateLogSize, + int initUpdateRetryDelay) { this.updateLogSize = updateLogSize; this.imageRetreiver = imageRetreiver; - K fullImage = imageRetreiver.retrieveFullImage(INIT_SEQ_NUM); - appendToUpdateLog(fullImage); - this.updateable = updateable.updateFull(fullImage); + if (imageRetreiver != null) { + spawnInitialUpdater(updateable, initUpdateRetryDelay); + } else { + this.updateable = updateable; + } } + private void spawnInitialUpdater(final Updateable<K> updateable, + final int initUpdateRetryDelay) { + K firstFullImage = null; + try { + firstFullImage = imageRetreiver.retrieveFullImage(INIT_SEQ_NUM); + } catch (Exception e) { + LOGGER.warn("InitialUpdater encountered exception !! ", e); + firstFullImage = null; + Thread initUpdater = new Thread() { + @Override + public void run() { + while (UpdateForwarder.this.updateable == null) { + try { + Thread.sleep(initUpdateRetryDelay); + } catch (InterruptedException e) { + LOGGER.warn("Thread interrupted !! ", e); + break; + } + K fullImage = null; + try { + fullImage = + UpdateForwarder.this.imageRetreiver + .retrieveFullImage(INIT_SEQ_NUM); + appendToUpdateLog(fullImage); + } catch (Exception e) { + LOGGER.warn("InitialUpdater encountered exception !! ", e); + } + if (fullImage != null) { + UpdateForwarder.this.updateable = updateable.updateFull(fullImage); + } + } + } + }; + initUpdater.start(); + } + if (firstFullImage != null) { + appendToUpdateLog(firstFullImage); + this.updateable = updateable.updateFull(firstFullImage); + } + } /** * Handle notifications from HMS plug-in or upstream Cache * @param update @@ -80,8 +130,10 @@ public class UpdateForwarder<K extends Updateable.Update> implements // Correct the seqNums on the first update if (lastCommittedSeqNum.get() == INIT_SEQ_NUM) { K firstUpdate = updateLog.peek(); - long firstSeqNum = update.getSeqNum() - 1; - firstUpdate.setSeqNum(firstSeqNum); + long firstSeqNum = update.getSeqNum() - 1; + if (firstUpdate != null) { + firstUpdate.setSeqNum(firstSeqNum); + } lastCommittedSeqNum.set(firstSeqNum); lastSeenSeqNum.set(firstSeqNum); } @@ -101,10 +153,12 @@ public class UpdateForwarder<K extends Updateable.Update> implements // apply partial preUpdate updateable.updatePartial(Lists.newArrayList(update), lock); } else { - // Retrieve full update from External Source and - toUpdate = imageRetreiver - .retrieveFullImage(update.getSeqNum()); - updateable = updateable.updateFull(toUpdate); + // Retrieve full update from External Source and + if (imageRetreiver != null) { + toUpdate = imageRetreiver + .retrieveFullImage(update.getSeqNum()); + updateable = updateable.updateFull(toUpdate); + } } } appendToUpdateLog(toUpdate); @@ -143,6 +197,9 @@ public class UpdateForwarder<K extends Updateable.Update> implements return retVal; } K head = updateLog.peek(); + if (head == null) { + return retVal; + } if (seqNum > currSeqNum + 1) { // This process has probably restarted since downstream // recieved last update @@ -206,22 +263,24 @@ public class UpdateForwarder<K extends Updateable.Update> implements @Override public Updateable<K> updateFull(K update) { - return updateable.updateFull(update); + return (updateable != null) ? updateable.updateFull(update) : null; } @Override public void updatePartial(Iterable<K> updates, ReadWriteLock lock) { - updateable.updatePartial(updates, lock); + if (updateable != null) { + updateable.updatePartial(updates, lock); + } } @Override public long getLastUpdatedSeqNum() { - return updateable.getLastUpdatedSeqNum(); + return (updateable != null) ? updateable.getLastUpdatedSeqNum() : INIT_SEQ_NUM; } @Override public K createFullImageUpdate(long currSeqNum) { - return updateable.createFullImageUpdate(currSeqNum); + return (updateable != null) ? updateable.createFullImageUpdate(currSeqNum) : null; } } http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/d66ebb71/sentry-provider/sentry-provider-db/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/pom.xml b/sentry-provider/sentry-provider-db/pom.xml index 29b1f13..2f8f66a 100644 --- a/sentry-provider/sentry-provider-db/pom.xml +++ b/sentry-provider/sentry-provider-db/pom.xml @@ -85,10 +85,6 @@ limitations under the License. </dependency> <dependency> <groupId>org.apache.sentry</groupId> - <artifactId>sentry-service-client</artifactId> - </dependency> - <dependency> - <groupId>org.apache.sentry</groupId> <artifactId>sentry-provider-common</artifactId> </dependency> <dependency>