Repository: sentry Updated Branches: refs/heads/master ff623a944 -> b479df4ba
SENTRY-1508: MetastorePlugin.java does not handle properly initialization failure (Vadim Spector, Reviewed by: Sravya Tirukkovalur, Alexander Kolbasov and Hao Hao) Change-Id: I95c00a92257553da56ee1cae4ae5c8f8d04a2409 Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/b479df4b Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/b479df4b Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/b479df4b Branch: refs/heads/master Commit: b479df4ba383a8920661b1b20d086e40a8ac2e1c Parents: ff623a9 Author: hahao <[email protected]> Authored: Mon Dec 12 16:05:11 2016 -0800 Committer: hahao <[email protected]> Committed: Mon Dec 12 16:05:11 2016 -0800 ---------------------------------------------------------------------- .../apache/sentry/hdfs/ServiceConstants.java | 2 - .../org/apache/sentry/hdfs/MetastorePlugin.java | 634 +++++++++++++------ .../sentry/hdfs/MetastorePluginWithHA.java | 2 +- 3 files changed, 442 insertions(+), 196 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/b479df4b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java index 23552c2..cf94785 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java @@ -59,8 +59,6 @@ public class ServiceConstants { public static final int SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT = 1000; public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE = "sentry.hdfs.sync.metastore.cache.fail.on.partial.update"; public static final boolean SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE_DEFAULT = true; - public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE = "sentry.hdfs.sync.metastore.cache.async-init.enable"; - public static final boolean SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE_DEFAULT = false; public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC = "sentry.hdfs.sync.metastore.cache.max-partitions-per-rpc"; public static final int SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT = 100; http://git-wip-us.apache.org/repos/asf/sentry/blob/b479df4b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java index 085971b..f6661fd 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java @@ -17,17 +17,11 @@ */ package org.apache.sentry.hdfs; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.LinkedList; import java.util.List; -import java.util.Queue; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -42,87 +36,211 @@ import org.apache.sentry.provider.db.SentryMetastoreListenerPlugin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; /** - * Plugin implementation of {@link SentryMetastoreListenerPlugin} that hooks - * into the sites in the {@link MetaStorePreEventListener} that deal with - * creation/updation and deletion for paths. + * Plugin for the components that need to send path creation, update, and deletion + * notifications to the Sentry daemon. + * + * <p> + * Implements {@link SentryMetastoreListenerPlugin} that hooks + * into the sites in the {@link MetaStorePreEventListener}. + * + * <p> + * Implementation Notes: + * + * <ol> + * <li>MetastorePlugin performs the following functions: + * + * <ul> + * <li> At the construction time: + * <ul> + * <li> Initializes local HMS cache with HMS paths information. + * <li> Sends initial HMS paths information to the Sentry daemon. + * </ul> + * </li> + * <li> Upon receiving path update notification from the hosting client code, via addPath(), + * removePath(), removeAllPaths(), and renameAuthzObject() callback methods: + * <ul> + * <li> Updates local HMS cache accordingly. + * <li> Sends partial update with the assigned sequence number to the Sentry daemon. + * <li> Maintains the latest Sentry partial update sequence number, incrementing it by 1 on each update. + * </ul> + * </li> + * <li> Periodically, from the housekeeping thread: + * <ul> + * <li> Contacts the Sentry daemon to ask for the sequence number of the latest received update. + * <li> If the sequence number returned by the Sentry daemon does not match the sequence number of the + * latest update sent from MetastorePlugin, send the full HMS paths image to the Sentry daemon. + * </ul> + * </li> + * </ul> + * + * <p> + * <li>MetastorePlugin must be a singleton.<br> + * Only a single instance of MetastorePlugin can be used. MetastorePlugin has HMS cache + * that is updated via calling addPath(), removePath(), removeAllPaths(), renameAuthzObject(). + * This cache must represent full HMS state at any point, so that full updates, when they are + * needed, would be correct. Channelling different update requests through different MetastorePlugin + * instances would make those caches partial and mutually inconsistent. + * + * <p> + * <li>MetastorePlugin is always created, even though ininitialization may fail.<br> + * MetastorePlugin initialization (object construction) may fail for two reasons: + * <ul> + * <li> HMS cache cannot be initialized, usually due to some invalid HMS path entries. + * <li> Initial cache cannot be sent to Sentry, e.g. due to the communication problems. + * </ul> + * + * <p> + * In either case, MetastorePlugin is still constructed, in consideration with the design of + * the existing client code. However, such an instance is marked as invalid; all update APIs + * throw IllegalStateException with the appropriate error message and root cause exception. + * <br>TODO: failing to construct MetastorePlugin on initialization failure would be much cleaner, + * but it has to be done in coordination with the HMS client code. + * + * <p> + * <li>MetastorePlugin guarantees delivery of HMS paths updates to Sentry daemon in the right order.<br> + * Each invocation of addPath(), removePath(), removeAllPaths(), renameAuthzObject() + * triggers two actions: + * <ul> + * <li> increment update sequence number and update the local cache and + * <li> send partial update to the Sentry daemon. + * </ul> + * + * <p> + * Update sequence number is created at first step, and then it travels as part of the update information, + * to the Sentry daemon on the second step. Therefore, the sequence of both steps must be + * atomic, to guarantee that updates arrive to the Sentry daemon in the right order, + * with sequential update number. This is achieved by using notificationLock. The same lock is used + * inside the SyncTask during full Sentry update, when the local and Sentry-side update sequence + * numbers are out of sync. + * + * <p> + * <li>MetastorePlugin validates input paths.<br> + * Parsing malformed input paths generates SentryMalformedPathException. Since this is a checked + * exception, it is re-thrown wrapped into (un-checked) IllegalArgumentException, to preserve + * public APIs' signatures. + * + * </ol> */ + public class MetastorePlugin extends SentryMetastoreListenerPlugin { private static final Logger LOGGER = LoggerFactory.getLogger(MetastorePlugin.class); - private static final String initializationFailureMsg = "Cache failed to initialize, cannot send path updates to Sentry." + - " Please review HMS error logs during startup for additional information. If the initialization failure is due" + - " to SentryMalformedPathException, you will need to rectify the malformed path in HMS db and restart HMS"; + /* MetastorePlugin initialization may fail for two different reasons: + * a) Failure to initialize HMS paths cache. + * b) Failure to send the initial HMS paths to the Sentry daemon. + * Each of the two messages below conveys the reason. + */ + private static final String CACHE_INIT_FAILURE_MSG = + "Cache failed to initialize, cannot send path updates to Sentry." + + " Please review HMS error logs during startup for additional information. If the initialization failure is due" + + " to SentryMalformedPathException, you will need to rectify the malformed path in HMS db and restart HMS"; + private static final String SENTRY_INIT_UPDATE_FAILURE_MSG = + "Metastore Plugin failed to initialize - cannot send initial HMS updates to Sentry"; + + private static final String SENTRY_COMM_FAILURE_MSG = "Cannot Communicate with Sentry"; + private final Configuration conf; + private final Configuration sentryConf; + + // guard for all local+Sentry notifications + private final ReentrantLock notificationLock = new ReentrantLock(); + // sentryClient may be re-instantiated in case of suspected communication failure + // This code ensures that access to sentryClient is protected by notificationLock + private SentryHDFSServiceClient sentryClient; + // Has to match the value of seqNum + // This code ensures that access to lastSentSeqNum is protected by notificationLock + protected long lastSentSeqNum; + + // pathUpdateLock guards access to UpdateableAuthzPaths which is not thread-safe + private final ReentrantReadWriteLock pathUpdateLock = new ReentrantReadWriteLock(); + // access to authzPaths must be protected by pathUpdateLock + private final UpdateableAuthzPaths authzPaths; + + // Initialized to some value > 1. + protected final AtomicLong seqNum = new AtomicLong(5); + private final Throwable initError; + private final String initErrorMsg; + private final ScheduledExecutorService threadPool; //NOPMD + + private static volatile ScheduledExecutorService lastThreadPool = null; + + /* + * This task is scheduled to run periodically, to make sure Sentry has all updates + * -- only if MetastorePlugin has been successfully initialized. + */ class SyncTask implements Runnable { @Override public void run() { - if (!notificiationLock.tryLock()) { + if (!notificationLock.tryLock()) { // No need to sync.. as metastore is in the process of pushing an update.. return; } - if (MetastorePlugin.this.authzPaths == null) { - LOGGER.warn(initializationFailureMsg); - return; - } try { - long lastSeenBySentry = - MetastorePlugin.this.getClient().getLastSeenHMSPathSeqNum(); + long lastSeenBySentry = getLastSeenHMSPathSeqNum(); long lastSent = lastSentSeqNum; if (lastSeenBySentry != lastSent) { LOGGER.warn("#### Sentry not in sync with HMS [" + lastSeenBySentry + ", " + lastSent + "]"); - PathsUpdate fullImageUpdate = - MetastorePlugin.this.authzPaths.createFullImageUpdate(lastSent); - notifySentryNoLock(fullImageUpdate); - LOGGER.warn("#### Synced Sentry with update [" + lastSent + "]"); + notifySentryFullUpdate(lastSent); } - } catch (Exception e) { - sentryClient = null; - LOGGER.error("Error talking to Sentry HDFS Service !!", e); + } catch (Exception ignore) { + // all methods inside try {} log errors anyway } finally { - syncSent = true; - notificiationLock.unlock(); + notificationLock.unlock(); } } } - private final Configuration conf; - private SentryHDFSServiceClient sentryClient; - private volatile UpdateableAuthzPaths authzPaths; - private Lock notificiationLock; - - // Initialized to some value > 1. - protected static final AtomicLong seqNum = new AtomicLong(5); - - // Has to match the value of seqNum - protected static volatile long lastSentSeqNum = seqNum.get(); - private volatile boolean syncSent = false; - private volatile boolean initComplete = false; - private volatile boolean queueFlushComplete = false; - private volatile Throwable initError = null; - private final Queue<PathsUpdate> updateQueue = new LinkedList<PathsUpdate>(); - - private final ExecutorService threadPool; //NOPMD - private final Configuration sentryConf; - + /* + * Proxy class for RPC calls to the Sentry daemon + */ static class ProxyHMSHandler extends HMSHandler { public ProxyHMSHandler(String name, HiveConf conf) throws MetaException { super(name, conf); } } - public MetastorePlugin(Configuration conf, Configuration sentryConf) { - this.notificiationLock = new ReentrantLock(); + /* + * Test-only logic. Testing framework may create multiple MetastorePlugin + * instances in sequence, without explicitly shutting down the previous + * instance, which does not even have any shutdown API (obvious oversight). + * This results in multiple housekeeping thread pools, completely messing + * up HMS state on Sentry daemon. + * Previous thread pool must be shut down. + * In real deployments this code does nothing, because there is only one + * instance of MetastorePlugin. + */ + private static synchronized void shutdownPreviousHousekeepingThreadPool() { + if (lastThreadPool != null) { + LOGGER.info("#### Metastore Plugin: shutting down previous housekeeping thread"); + try { + lastThreadPool.shutdownNow(); + } catch (Throwable t) { + LOGGER.error("#### Metastore Plugin: failure shutting down previous housekeeping thread", t); + } + lastThreadPool = null; + } + } + public MetastorePlugin(Configuration conf, Configuration sentryConf) { + Preconditions.checkNotNull(conf, "NULL Hive Configuration"); + Preconditions.checkNotNull(sentryConf, "NULL Sentry Configuration"); if (!(conf instanceof HiveConf)) { - String error = "Configuration is not an instanceof HiveConf"; + String error = "Hive Configuration is not an instanceof HiveConf: " + conf.getClass().getName(); LOGGER.error(error); - throw new RuntimeException(error); + throw new IllegalArgumentException(error); } + + /* + * Test-only logic. See javadoc for this method. + */ + shutdownPreviousHousekeepingThreadPool(); + this.conf = new HiveConf((HiveConf)conf); this.sentryConf = new Configuration(sentryConf); @@ -130,113 +248,156 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin { this.conf.unset(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname); this.conf.unset(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname); this.conf.unset(HiveConf.ConfVars.METASTOREURIS.varname); - Thread initUpdater = new Thread() { - @Override - public void run() { - MetastoreCacheInitializer cacheInitializer = null; - try { - cacheInitializer = - new MetastoreCacheInitializer(new ProxyHMSHandler("sentry.hdfs", - (HiveConf) MetastorePlugin.this.conf), - MetastorePlugin.this.conf); - MetastorePlugin.this.authzPaths = - cacheInitializer.createInitialUpdate(); - LOGGER.info("#### Metastore Plugin initialization complete !!"); - synchronized (updateQueue) { - while (!updateQueue.isEmpty()) { - PathsUpdate update = updateQueue.poll(); - if (update != null) { - processUpdate(update); - } - } - queueFlushComplete = true; - } - LOGGER.info("#### Finished flushing queued updates to Sentry !!"); - } catch (Exception e) { - LOGGER.error("#### Could not create Initial AuthzPaths or HMSHandler !!", e); - initError = e; - } finally { - if (cacheInitializer != null) { - try { - cacheInitializer.close(); - } catch (Exception e) { - LOGGER.info("#### Exception while closing cacheInitializer !!", e); - } - } - initComplete = true; - } + + Throwable tmpInitError = null; + String tmpInitErrorMsg = null; + + /* Initialization Step #1: initialize local HMS state cache. + * To preserve the contract with the existing Hive client code, + * MetastorePlugin shall be constructed even if initialization fails, + * though it will be completely unoperable. + */ + UpdateableAuthzPaths tmpAuthzPaths; + try (MetastoreCacheInitializer cacheInitializer = new MetastoreCacheInitializer( + new ProxyHMSHandler("sentry.hdfs", (HiveConf) this.conf), + this.conf)) + { + // initialize HMS cache. + tmpAuthzPaths = cacheInitializer.createInitialUpdate(); + LOGGER.info("#### Metastore Plugin HMS cache initialization complete"); + } catch (Throwable e) { + tmpInitError = e; + tmpInitErrorMsg = CACHE_INIT_FAILURE_MSG; + tmpAuthzPaths = null; + LOGGER.error("#### " + tmpInitErrorMsg, e); + for (Throwable thr : e.getSuppressed()) { + LOGGER.warn("#### Exception while closing cacheInitializer", thr); } - }; - if (this.conf.getBoolean( - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE, - ServerConfig - .SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE_DEFAULT)) { - LOGGER.warn("#### Metastore Cache initialization is set to aync..." + - "HDFS ACL synchronization will not happen until metastore" + - "cache initialization is completed !!"); - initUpdater.start(); - } else { - initUpdater.run(); //NOPMD } + this.authzPaths = tmpAuthzPaths; + + /* If HMS cache initialization failed, further initialization shall be skipped. + * MetastorePlugin is considered non-operational, and all of its public APIs + * shall be throwing an exception. + */ + if (tmpInitError != null) { + this.threadPool = null; + this.initError = tmpInitError; + this.initErrorMsg = tmpInitErrorMsg; + return; + } + + /* Initialization Step #2: push initial HMS state to Sentry. + * Synchronization by notificationLock is for visibility of changes to sentryClient. + */ + notificationLock.lock(); try { - sentryClient = SentryHDFSServiceClientFactory.create(sentryConf); - } catch (Exception e) { - sentryClient = null; - LOGGER.error("Could not connect to Sentry HDFS Service !!", e); + this.lastSentSeqNum = seqNum.get(); + notifySentryFullUpdate(lastSentSeqNum); + LOGGER.info("#### Metastore Plugin Sentry full initial update complete"); + } catch (Throwable e) { + tmpInitError = e; + tmpInitErrorMsg = SENTRY_INIT_UPDATE_FAILURE_MSG; + LOGGER.error("#### " + tmpInitErrorMsg, e); + } finally { + notificationLock.unlock(); + } + + this.initError = tmpInitError; + this.initErrorMsg = tmpInitErrorMsg; + + /* If sending HMS state to Sentry failed, further initialization shall be skipped. + * MetastorePlugin is considered non-operational, and all of its public APIs + * shall be throwing an exception. + */ + if (this.initError != null) { + this.threadPool = null; + return; } - ScheduledExecutorService newThreadPool = Executors.newScheduledThreadPool(1); - newThreadPool.scheduleWithFixedDelay(new SyncTask(), - this.conf.getLong(ServerConfig - .SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS, - ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT), - this.conf.getLong(ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_MS, - ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_DEFAULT), - TimeUnit.MILLISECONDS); - this.threadPool = newThreadPool; + + /* Initialization Step #3: schedulle SyncTask to run periodically, to make + * sure Sentry has the current HMS state. + */ + this.threadPool = Executors.newScheduledThreadPool(1); + this.threadPool.scheduleWithFixedDelay(new SyncTask(), + this.conf.getLong(ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS, + ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT), + this.conf.getLong(ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_MS, + ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_DEFAULT), + TimeUnit.MILLISECONDS); + MetastorePlugin.lastThreadPool = this.threadPool; + LOGGER.info("#### Metastore Plugin Sentry initialization complete"); } @Override public void addPath(String authzObj, String path) { + assertInit(); + + // validate / parse inputs List<String> pathTree = null; try { pathTree = PathsUpdate.parsePath(path); } catch (SentryMalformedPathException e) { - LOGGER.error("Unexpected path in addPath: authzObj = " + authzObj + " , path = " + path); - e.printStackTrace(); - return; + String err = "Unexpected path in addPath: authzObj = " + authzObj + " , path = " + path; + LOGGER.error(err, e); + throw new IllegalArgumentException(err, e); } if(pathTree == null) { + LOGGER.debug("#### HMS Path Update [" + + "OP : addPath, " + + "authzObj : " + authzObj.toLowerCase() + ", " + + "path : " + path + "] - nothing to add"); return; } LOGGER.debug("#### HMS Path Update [" + "OP : addPath, " + "authzObj : " + authzObj.toLowerCase() + ", " + "path : " + path + "]"); - PathsUpdate update = createHMSUpdate(); - update.newPathChange(authzObj.toLowerCase()).addToAddPaths(pathTree); - notifySentryAndApplyLocal(update); + + // do local and remote updates + notificationLock.lock(); + try { + PathsUpdate update = createHMSUpdate(); + update.newPathChange(authzObj.toLowerCase()).addToAddPaths(pathTree); + updateLocalCacheAndNotifySentry(update); + } finally { + notificationLock.unlock(); + } } @Override public void removeAllPaths(String authzObj, List<String> childObjects) { + assertInit(); + + // validate / parse inputs LOGGER.debug("#### HMS Path Update [" + "OP : removeAllPaths, " + "authzObj : " + authzObj.toLowerCase() + ", " + "childObjs : " + (childObjects == null ? "[]" : childObjects) + "]"); - PathsUpdate update = createHMSUpdate(); - if (childObjects != null) { - for (String childObj : childObjects) { - update.newPathChange(authzObj.toLowerCase() + "." + childObj).addToDelPaths( + + // do local and remote updates + notificationLock.lock(); + try { + PathsUpdate update = createHMSUpdate(); + if (childObjects != null) { + for (String childObj : childObjects) { + update.newPathChange(authzObj.toLowerCase() + "." + childObj).addToDelPaths( Lists.newArrayList(PathsUpdate.ALL_PATHS)); + } } - } - update.newPathChange(authzObj.toLowerCase()).addToDelPaths( + update.newPathChange(authzObj.toLowerCase()).addToDelPaths( Lists.newArrayList(PathsUpdate.ALL_PATHS)); - notifySentryAndApplyLocal(update); + updateLocalCacheAndNotifySentry(update); + } finally { + notificationLock.unlock(); + } } @Override public void removePath(String authzObj, String path) { + assertInit(); + + // validate / parse inputs if ("*".equals(path)) { removeAllPaths(authzObj.toLowerCase(), null); } else { @@ -244,154 +405,241 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin { try { pathTree = PathsUpdate.parsePath(path); } catch (SentryMalformedPathException e) { - LOGGER.error("Unexpected path in removePath: authzObj = " + authzObj + " , path = " + path); - e.printStackTrace(); - return; + String err = "Unexpected path in removePath: authzObj = " + authzObj + " , path = " + path; + LOGGER.error(err, e); + throw new IllegalArgumentException(err, e); } if(pathTree == null) { + LOGGER.debug("#### HMS Path Update [" + + "OP : removePath, " + + "authzObj : " + authzObj.toLowerCase() + ", " + + "path : " + path + "] - nothing to remove"); return; } LOGGER.debug("#### HMS Path Update [" + "OP : removePath, " + "authzObj : " + authzObj.toLowerCase() + ", " + "path : " + path + "]"); - PathsUpdate update = createHMSUpdate(); - update.newPathChange(authzObj.toLowerCase()).addToDelPaths(pathTree); - notifySentryAndApplyLocal(update); + + // do local and remote updates + notificationLock.lock(); + try { + PathsUpdate update = createHMSUpdate(); + update.newPathChange(authzObj.toLowerCase()).addToDelPaths(pathTree); + updateLocalCacheAndNotifySentry(update); + } finally { + notificationLock.unlock(); + } } } @Override public void renameAuthzObject(String oldName, String oldPath, String newName, String newPath) { + assertInit(); + + // validate / parse inputs String oldNameLC = oldName != null ? oldName.toLowerCase() : null; String newNameLC = newName != null ? newName.toLowerCase() : null; - PathsUpdate update = createHMSUpdate(); LOGGER.debug("#### HMS Path Update [" + "OP : renameAuthzObject, " - + "oldName : " + oldNameLC + "," - + "oldPath : " + oldPath + "," - + "newName : " + newNameLC + "," - + "newPath : " + newPath + "]"); + + "oldName : " + oldNameLC + ", " + + "oldPath : " + oldPath + ", " + + "newName : " + newNameLC + ", " + + "newPath : " + newPath + "]"); List<String> newPathTree = null; try { newPathTree = PathsUpdate.parsePath(newPath); } catch (SentryMalformedPathException e) { - LOGGER.error("Unexpected path in renameAuthzObject while parsing newPath: oldName=" + oldName + ", oldPath=" + oldPath + - ", newName=" + newName + ", newPath=" + newPath); - e.printStackTrace(); - return; - } - - if( newPathTree != null ) { - update.newPathChange(newNameLC).addToAddPaths(newPathTree); + String err = "Unexpected path in renameAuthzObject while parsing newPath: oldName=" + oldName + ", oldPath=" + oldPath + + ", newName=" + newName + ", newPath=" + newPath; + LOGGER.error(err, e); + throw new IllegalArgumentException(err, e); } List<String> oldPathTree = null; try { oldPathTree = PathsUpdate.parsePath(oldPath); } catch (SentryMalformedPathException e) { - LOGGER.error("Unexpected path in renameAuthzObject while parsing oldPath: oldName=" + oldName + ", oldPath=" + oldPath + - ", newName=" + newName + ", newPath=" + newPath); - e.printStackTrace(); - return; + String err = "Unexpected path in renameAuthzObject while parsing oldPath: oldName=" + oldName + ", oldPath=" + oldPath + + ", newName=" + newName + ", newPath=" + newPath; + LOGGER.error(err, e); + throw new IllegalArgumentException(err, e); } - if( oldPathTree != null ) { - update.newPathChange(oldNameLC).addToDelPaths(oldPathTree); + // do local and remote updates + notificationLock.lock(); + try { + PathsUpdate update = createHMSUpdate(); + if( newPathTree != null ) { + update.newPathChange(newNameLC).addToAddPaths(newPathTree); + } + if( oldPathTree != null ) { + update.newPathChange(oldNameLC).addToDelPaths(oldPathTree); + } + updateLocalCacheAndNotifySentry(update); + } finally { + notificationLock.unlock(); } - notifySentryAndApplyLocal(update); } - private SentryHDFSServiceClient getClient() { + /* + * Instantiate client (unless it's already instantiated) to talk to Sentry service. + * Call must be protected by notificationLock. + */ + private SentryHDFSServiceClient getClient() throws Exception { + assert notificationLock.isHeldByCurrentThread() : "Internal Faulure: access to Sentry client is nt protected by notificationLock"; if (sentryClient == null) { try { sentryClient = SentryHDFSServiceClientFactory.create(sentryConf); } catch (Exception e) { sentryClient = null; - LOGGER.error("#### Could not connect to Sentry HDFS Service !!", e); + final String err = SENTRY_COMM_FAILURE_MSG; + LOGGER.error(err, e); + throw new Exception(err, e); } } return sentryClient; } + /* + * Initialize HMS update object and assign its sequence number. + * Call must be protected by notificationLock. + */ private PathsUpdate createHMSUpdate() { PathsUpdate update = new PathsUpdate(seqNum.incrementAndGet(), false); LOGGER.debug("#### Creating HMS Path Update SeqNum : [" + seqNum.get() + "]"); return update; } - protected void notifySentryNoLock(PathsUpdate update) { + /* + * Get the last seen HMS path update sequence number from Sentry service. + * Call must be protected by notificationLock. + */ + private long getLastSeenHMSPathSeqNum() throws Exception { + try { + return getClient().getLastSeenHMSPathSeqNum(); + } catch (Exception e) { + final String err = "Could not fetch the last seen HMS Path Sequence number from Sentry HDFS Service"; + LOGGER.error(err, e); + resetClient(); + throw e; + } + } + + /* + * Send update to Sentry service. + * This method, when called from notifySentry(), is followed by updating lastSentSeqNumber. + * When called directly, to send full updates (i.e. during initialization and from SyncTask), + * the update sequence number does not change. + * Call must be protected by notificationLock. + */ + private void notifySentry_NoSeqNumIncr(PathsUpdate update) { final Timer.Context timerContext = SentryHdfsMetricsUtil.getNotifyHMSUpdateTimer.time(); try { getClient().notifyHMSUpdate(update); } catch (Exception e) { - LOGGER.error("Could not send update to Sentry HDFS Service !!", e); + final String err = "Could not send update to Sentry HDFS Service"; + LOGGER.error(err, e); + resetClient(); SentryHdfsMetricsUtil.getFailedNotifyHMSUpdateCounter.inc(); + throw new RuntimeException(err, e); } finally { timerContext.stop(); } } + /** + * Send update to Sentry service and update last sent sequence number. + * Called only if MetastorePlugin has been successfully initialized. + * Call must be protected by notificationLock. + */ protected void notifySentry(PathsUpdate update) { - notificiationLock.lock(); try { - if (!syncSent) { - new SyncTask().run(); - } - - notifySentryNoLock(update); + notifySentry_NoSeqNumIncr(update); } finally { lastSentSeqNum = update.getSeqNum(); - notificiationLock.unlock(); LOGGER.debug("#### HMS Path Last update sent : ["+ lastSentSeqNum + "]"); } } + /* + * Send full update to Sentry service. + * Called only if MetastorePlugin has been successfully initialized. + * Call must be protected by notificationLock. + */ + private void notifySentryFullUpdate(long lastSent) { + PathsUpdate fullImageUpdate = null; + // access to authzPaths should be consistently protected by pathUpdateLock + pathUpdateLock.readLock().lock(); + try { + fullImageUpdate = authzPaths.createFullImageUpdate(lastSent); + } finally { + pathUpdateLock.readLock().unlock(); + } + notifySentry_NoSeqNumIncr(fullImageUpdate); + LOGGER.warn("#### Synced Sentry with update [" + lastSent + "]"); + } + + /* + * When suspecting sentryClient comm error - reset the client + * Call must be protected by notificationLock. + */ + private void resetClient() { + if (sentryClient != null) { + try { + sentryClient.close(); + } catch (Exception ignore) { + } + sentryClient = null; + } + } + + /** + * Apply paths update to local cache. + * Called only if MetastorePlugin has been successfully initialized. + * Call must be protected by notificationLock. + */ protected void applyLocal(PathsUpdate update) { final Timer.Context timerContext = SentryHdfsMetricsUtil.getApplyLocalUpdateTimer.time(); - if(authzPaths == null) { - LOGGER.error(initializationFailureMsg); - return; + try { + authzPaths.updatePartial(Lists.newArrayList(update), pathUpdateLock); + } finally { + timerContext.stop(); } - authzPaths.updatePartial(Lists.newArrayList(update), new ReentrantReadWriteLock()); - timerContext.stop(); SentryHdfsMetricsUtil.getApplyLocalUpdateHistogram.update( update.getPathChanges().size()); } - private void notifySentryAndApplyLocal(PathsUpdate update) { - if(authzPaths == null) { - LOGGER.error(initializationFailureMsg); - return; - } - if (initComplete) { - processUpdate(update); - } else { - if (initError == null) { - synchronized (updateQueue) { - if (!queueFlushComplete) { - updateQueue.add(update); - } else { - processUpdate(update); - } - } - } else { - StringWriter sw = new StringWriter(); - initError.printStackTrace(new PrintWriter(sw)); - LOGGER.error("#### Error initializing Metastore Plugin" + - "[" + sw.toString() + "] !!"); - throw new RuntimeException(initError); - } - LOGGER.warn("#### Path update [" + update.getSeqNum() + "] not sent to Sentry.." + - "Metastore hasn't been initialized yet !!"); - } + /* + * Apply paths update to local cache. + * Send partial update to Sentry service. + * Called only if MetastorePlugin has been successfully initialized. + * Call must be protected by notificationLock. + */ + private void updateLocalCacheAndNotifySentry(PathsUpdate update) { + applyLocal(update); + notifySentry(update); } + /** + * Apply paths update to local cache and send partial update to Sentry. + * Called only if MetastorePlugin has been successfully initialized. + * Call must be protected by notificationLock. + */ protected void processUpdate(PathsUpdate update) { - applyLocal(update); - notifySentry(update); + updateLocalCacheAndNotifySentry(update); + } + + /* + * Check successfull initialization first, in each update callback method. + * Null initError guarantees successful initialization. + */ + private void assertInit() { + if (initError != null) { + throw new IllegalStateException(initErrorMsg, initError); + } } } http://git-wip-us.apache.org/repos/asf/sentry/blob/b479df4b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java index 6476a01..32b635f 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java @@ -75,7 +75,7 @@ public class MetastorePluginWithHA extends MetastorePlugin { new SentryMetastoreHACacheListener(this)); // start seq# from the last global seq seqNum.set(pluginCacheSync.getUpdateCounter()); - MetastorePlugin.lastSentSeqNum = seqNum.get(); + this.lastSentSeqNum = seqNum.get(); } @Override
