Repository: sentry Updated Branches: refs/heads/sentry-ha-redesign f0f77668b -> ed2752258
SENTRY-1676: FullUpdateInitializer#createInitialUpdate should not throw RuntimeException (Alex Kolbasov, reviewed by Sergio Pena) Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/ed275225 Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/ed275225 Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/ed275225 Branch: refs/heads/sentry-ha-redesign Commit: ed27522584773075a484a4828fe9b7aeb0922961 Parents: f0f7766 Author: Alexander Kolbasov <[email protected]> Authored: Wed Apr 5 10:55:36 2017 -0700 Committer: Alexander Kolbasov <[email protected]> Committed: Wed Apr 5 10:55:36 2017 -0700 ---------------------------------------------------------------------- .../sentry/hdfs/FullUpdateInitializer.java | 71 ++++++++++---------- .../sentry/service/thrift/HMSFollower.java | 6 +- 2 files changed, 37 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/ed275225/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java index 146cea2..7be63ea 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java @@ -24,32 +24,30 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; import org.apache.sentry.hdfs.service.thrift.TPathChanges; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.Map; import java.util.HashMap; -import java.util.concurrent.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; /** - * FullUpdateInitializer is for fetching hive full update, - * the <hiveObj, paths> mapping. Multiple tasks will be - * running concurrently, and throw exception or inform - * sync incomplete paths update based on user configurable - * after retry configurable times. + * Fetch full snapshot of {@code <hiveObj, paths>} mappings from Hive. + * Mappings for different tables are fetched concurrently by multiple threads from a pool. */ -public class FullUpdateInitializer implements Closeable { +public final class FullUpdateInitializer implements AutoCloseable { private final ExecutorService threadPool; - private HiveMetaStoreClient client; + private final HiveMetaStoreClient client; private final int maxPartitionsPerCall; private final int maxTablesPerCall; private final List<Future<CallResult>> results = new ArrayList<Future<CallResult>>(); @@ -60,16 +58,16 @@ public class FullUpdateInitializer implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(FullUpdateInitializer.class); - final static class CallResult { - final private Exception failure; - final private boolean successStatus; + static final class CallResult { + private final Exception failure; + private final boolean successStatus; CallResult(Exception ex, boolean successStatus) { failure = ex; this.successStatus = successStatus; } - public boolean getSuccessStatus() { + boolean success() { return successStatus; } @@ -85,7 +83,7 @@ public class FullUpdateInitializer implements Closeable { */ private final class RetryStrategy { private int retryStrategyMaxRetries = 0; - private int retryStrategyWaitDurationMillis; + private final int retryStrategyWaitDurationMillis; private int retries; private Exception exception; @@ -134,13 +132,13 @@ public class FullUpdateInitializer implements Closeable { } // Task fails, return the failure flag. - LOGGER.error("Task did not complete successfully after " + retries - + " tries. Exception got: " + exception.toString()); + LOGGER.error("Task did not complete successfully after " + retries + 1 + + " tries", exception); return new CallResult(exception, false); } } - private RetryStrategy retryStrategy; + private final RetryStrategy retryStrategy; BaseTask() { taskCounter.incrementAndGet(); @@ -258,8 +256,8 @@ public class FullUpdateInitializer implements Closeable { Database db = client.getDatabase(dbName); List<String> dbPath = PathsUpdate.parsePath(db.getLocationUri()); if (dbPath != null) { + Preconditions.checkArgument(dbName.equalsIgnoreCase(db.getName())); synchronized (update) { - Preconditions.checkArgument(dbName.equalsIgnoreCase(db.getName())); update.newPathChange(dbName).addToAddPaths(dbPath); } } @@ -280,27 +278,26 @@ public class FullUpdateInitializer implements Closeable { public FullUpdateInitializer(HiveMetaStoreClient client, Configuration conf) { this.client = client; this.maxPartitionsPerCall = conf.getInt( - ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC, - ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT); + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC, + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT); this.maxTablesPerCall = conf.getInt( - ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC, - ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT); + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC, + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT); threadPool = Executors.newFixedThreadPool(conf.getInt( - ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS, - ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT)); + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS, + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT)); maxRetries = conf.getInt( - ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM, - ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM_DEFAULT); + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM, + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM_DEFAULT); waitDurationMillis = conf.getInt( - ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS, - ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT); + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS, + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT); failOnRetry = conf.getBoolean( - ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE, - ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE_DEFAULT); + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE, + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE_DEFAULT); } - public Map<String, Set<String>> createInitialUpdate() throws ExecutionException, - InterruptedException, TException { + public Map<String, Set<String>> createInitialUpdate() throws Exception { PathsUpdate tempUpdate = new PathsUpdate(-1, false); List<String> allDbStr = client.getAllDatabases(); for (String dbName : allDbStr) { @@ -318,8 +315,8 @@ public class FullUpdateInitializer implements Closeable { // Fail the HMS startup if tasks are not all successful and // fail on partial updates flag is set in the config. - if (!callResult.getSuccessStatus() && failOnRetry) { - throw new RuntimeException(callResult.getFailure()); + if (!callResult.success() && failOnRetry) { + throw callResult.getFailure(); } } @@ -356,7 +353,7 @@ public class FullUpdateInitializer implements Closeable { } @Override - public void close() throws IOException { + public void close() { if (threadPool != null) { threadPool.shutdownNow(); } http://git-wip-us.apache.org/repos/asf/sentry/blob/ed275225/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java index 6c14f5e..16676fb 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java @@ -303,20 +303,20 @@ public class HMSFollower implements Runnable { * @throws ExecutionException, InterruptedException, TException */ private Map<String, Set<String>> fetchFullUpdate() - throws ExecutionException, InterruptedException, TException { + throws Exception { FullUpdateInitializer updateInitializer = null; try { updateInitializer = new FullUpdateInitializer(client, authzConf); Map<String, Set<String>> pathsUpdate = updateInitializer.createInitialUpdate(); - LOGGER.info("#### Hive full update initialization complete !!"); + LOGGER.info("Obtained full snapshot from HMS"); return pathsUpdate; } finally { if (updateInitializer != null) { try { updateInitializer.close(); } catch (Exception e) { - LOGGER.info("#### Exception while closing updateInitializer !!", e); + LOGGER.error("Exception while closing updateInitializer", e); } } }
