Repository: sentry Updated Branches: refs/heads/sentry-ha-redesign a13f2fb83 -> bfb0456fc
SENTRY-1687: FullUpdateInitializer can be more efficient (Alex Kolbasov, reviewed by: Hao Hao, Vamsee Yarlagadda and Na Li) Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/bfb0456f Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/bfb0456f Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/bfb0456f Branch: refs/heads/sentry-ha-redesign Commit: bfb0456fc3575a97e81886b4a9e2c5cb2f86b7e4 Parents: a13f2fb Author: Alexander Kolbasov <[email protected]> Authored: Wed Apr 26 11:12:52 2017 -0700 Committer: Alexander Kolbasov <[email protected]> Committed: Wed Apr 26 11:13:38 2017 -0700 ---------------------------------------------------------------------- .../sentry/hdfs/FullUpdateInitializer.java | 383 +++++++++++-------- .../org/apache/sentry/hdfs/PathsUpdate.java | 128 +++---- .../apache/sentry/hdfs/ServiceConstants.java | 2 - .../sentry/hdfs/TestFullUpdateInitializer.java | 357 ++++++++++++----- .../org/apache/sentry/hdfs/TestPathsUpdate.java | 33 +- .../sentry/hdfs/TestUpdateableAuthzPaths.java | 16 +- .../apache/sentry/hdfs/PathImageRetriever.java | 7 +- .../db/service/persistent/SentryStore.java | 8 +- .../sentry/service/thrift/HMSFollower.java | 30 +- .../service/thrift/NotificationProcessor.java | 100 ++--- .../e2e/hdfs/TestHDFSIntegrationAdvanced.java | 2 +- 11 files changed, 650 insertions(+), 416 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb0456f/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java index d876d23..2fe2bb5 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java @@ -17,68 +17,166 @@ */ package org.apache.sentry.hdfs; -import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; -import org.apache.sentry.hdfs.service.thrift.TPathChanges; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Deque; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.Map; import java.util.HashMap; -import java.util.Vector; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeUnit; /** - * Fetch full snapshot of {@code <hiveObj, paths>} mappings from Hive. - * Mappings for different tables are fetched concurrently by multiple threads from a pool. + * Manage fetching full snapshot from HMS. + * Snapshot is represented as a map from the hive object name to + * the set of paths for this object. + * The hive object name is either the Hive database name or + * Hive database name joined with Hive table name as {@code dbName.tableName}. + * All table partitions are stored under the table object. + * <p> + * Once {@link FullUpdateInitializer}, the {@link FullUpdateInitializer#getFullHMSSnapshot()} + * method should be called to get the initial update. + * <p> + * It is important to close the {@link FullUpdateInitializer} object to prevent resource + * leaks. + * <p> + * The usual way of using {@link FullUpdateInitializer} is + * <pre> + * {@code + * try (FullUpdateInitializer updateInitializer = + * new FullUpdateInitializer(client, authzConf)) { + * Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot(); + * return pathsUpdate; + * } */ public final class FullUpdateInitializer implements AutoCloseable { + /* + * Implementation note. + * + * The snapshot is obtained using an executor. We follow the map/reduce model. + * Each executor thread (mapper) obtains and returns a partial snapshot which are then + * reduced to a single combined snapshot by getFullHMSSnapshot(). + * + * Synchronization between the getFullHMSSnapshot() and executors is done using the + * 'results' queue. The queue holds the futures for each scheduled task. + * It is initially populated by getFullHMSSnapshot and each task may add new future + * results to it. Only getFullHMSSnapshot() removes entries from the results queue. + * This guarantees that once the results queue is empty there are no pending jobs. + * + * Since there are no other data sharing, the implementation is safe without + * any other synchronization. It is not thread-safe for concurrent calls + * to getFullHMSSnapshot(). + * + */ + private final ExecutorService threadPool; private final HiveMetaStoreClient client; private final int maxPartitionsPerCall; private final int maxTablesPerCall; - private final Collection<Future<CallResult>> results = new Vector<>(); - private final AtomicInteger taskCounter = new AtomicInteger(0); + private final Deque<Future<CallResult>> results = new ConcurrentLinkedDeque<>(); private final int maxRetries; private final int waitDurationMillis; - private final boolean failOnRetry; private static final Logger LOGGER = LoggerFactory.getLogger(FullUpdateInitializer.class); - static final class CallResult { - private final Exception failure; + private static final ObjectMapping emptyObjectMapping = + new ObjectMapping(Collections.<String, Set<String>>emptyMap()); + + /** + * Extract path (not starting with "/") from the full URI + * @param uri - resource URI (usually with scheme) + * @return path if uri is valid or null + */ + private static String pathFromURI(String uri) { + try { + return PathsUpdate.parsePath(uri); + } catch (SentryMalformedPathException e) { + LOGGER.warn(String.format("Ignoring invalid uri %s: %s", + uri, e.getReason())); + return null; + } + } + + /** + * Mapping of object to set of paths. + * Used to represent partial results from executor threads. Multiple + * ObjectMapping objects are combined in a single mapping + * to get the final result. + */ + private static final class ObjectMapping { + private final Map<String, Set<String>> objects; + + ObjectMapping(Map<String, Set<String>> objects) { + this.objects = objects; + } + + ObjectMapping(String authObject, String path) { + Set<String> values = Collections.singleton(path); + objects = ImmutableMap.of(authObject, values); + } + + ObjectMapping(String authObject, Collection<String> paths) { + Set<String> values = new HashSet<>(paths); + objects = ImmutableMap.of(authObject, values); + } + + Map<String, Set<String>> getObjects() { + return objects; + } + } + + private static final class CallResult { + private final TException failure; private final boolean successStatus; + private final ObjectMapping objectMapping; - CallResult(Exception ex, boolean successStatus) { + CallResult(TException ex) { failure = ex; - this.successStatus = successStatus; + successStatus = false; + objectMapping = emptyObjectMapping; + } + + CallResult(ObjectMapping objectMapping) { + failure = null; + successStatus = true; + this.objectMapping = objectMapping; } boolean success() { return successStatus; } - public Exception getFailure() { + ObjectMapping getObjectMapping() { + return objectMapping; + } + + public TException getFailure() { return failure; } } - abstract class BaseTask implements Callable<CallResult> { + private abstract class BaseTask implements Callable<CallResult> { /** * Class represents retry strategy for BaseTask. @@ -87,7 +185,6 @@ public final class FullUpdateInitializer implements AutoCloseable { private int retryStrategyMaxRetries = 0; private final int retryStrategyWaitDurationMillis; private int retries; - private Exception exception; private RetryStrategy(int retryStrategyMaxRetries, int retryStrategyWaitDurationMillis) { this.retryStrategyMaxRetries = retryStrategyMaxRetries; @@ -107,24 +204,22 @@ public final class FullUpdateInitializer implements AutoCloseable { // synchronous waiting on getting the result. // Retry the failure task until reach the max retry number. // Wait configurable duration for next retry. + TException exception = null; for (int i = 0; i < retryStrategyMaxRetries; i++) { try { - doTask(); - - // Task succeeds, reset the exception and return - // the successful flag. - exception = null; - return new CallResult(exception, true); - } catch (Exception ex) { + return new CallResult(doTask()); + } catch (TException ex) { LOGGER.debug("Failed to execute task on " + (i + 1) + " attempts." + - " Sleeping for " + retryStrategyWaitDurationMillis + " ms. Exception: " + ex.toString(), ex); + " Sleeping for " + retryStrategyWaitDurationMillis + " ms. Exception: " + + ex.toString(), ex); exception = ex; try { Thread.sleep(retryStrategyWaitDurationMillis); - } catch (InterruptedException exception) { + } catch (InterruptedException ignored) { // Skip the rest retries if get InterruptedException. // And set the corresponding retries number. + LOGGER.warn("Interrupted during update fetch during iteration " + (i + 1)); retries = i; i = retryStrategyMaxRetries; } @@ -134,227 +229,215 @@ public final class FullUpdateInitializer implements AutoCloseable { } // Task fails, return the failure flag. - LOGGER.error("Task did not complete successfully after " + retries + 1 + LOGGER.error("Task did not complete successfully after " + (retries + 1) + " tries", exception); - return new CallResult(exception, false); + return new CallResult(exception); } } private final RetryStrategy retryStrategy; BaseTask() { - taskCounter.incrementAndGet(); retryStrategy = new RetryStrategy(maxRetries, waitDurationMillis); } @Override public CallResult call() throws Exception { - CallResult callResult = retryStrategy.exec(); - taskCounter.decrementAndGet(); - return callResult; + return retryStrategy.exec(); } - abstract void doTask() throws Exception; + abstract ObjectMapping doTask() throws TException; } - class PartitionTask extends BaseTask { + private class PartitionTask extends BaseTask { private final String dbName; private final String tblName; + private final String authName; private final List<String> partNames; - private final TPathChanges tblPathChange; - PartitionTask(String dbName, String tblName, List<String> partNames, - TPathChanges tblPathChange) { - super(); + PartitionTask(String dbName, String tblName, String authName, + List<String> partNames) { this.dbName = dbName; this.tblName = tblName; + this.authName = authName; this.partNames = partNames; - this.tblPathChange = tblPathChange; } @Override - public void doTask() throws Exception { + ObjectMapping doTask() throws TException { List<Partition> tblParts = client.getPartitionsByNames(dbName, tblName, partNames); if (LOGGER.isDebugEnabled()) { LOGGER.debug("#### Fetching partitions " + - "[" + dbName + "." + tblName + "]" + "[" + partNames + "]"); + "[" + dbName + "." + tblName + "]" + "[" + partNames + "]"); } + Collection<String> partitionNames = new ArrayList<>(tblParts.size()); for (Partition part : tblParts) { - List<String> partPath = PathsUpdate.parsePath(part.getSd() - .getLocation()); + String partPath = pathFromURI(part.getSd().getLocation()); if (partPath != null) { - synchronized (tblPathChange) { - tblPathChange.addToAddPaths(partPath); - } + partitionNames.add(partPath); } } + return new ObjectMapping(authName, partitionNames); } } - class TableTask extends BaseTask { - private final Database db; + private class TableTask extends BaseTask { + private final String dbName; private final List<String> tableNames; - private final PathsUpdate update; - TableTask(Database db, List<String> tableNames, PathsUpdate update) { - super(); - this.db = db; + TableTask(Database db, List<String> tableNames) { + dbName = db.getName(); this.tableNames = tableNames; - this.update = update; } @Override - public void doTask() throws Exception { - List<Table> tables = client.getTableObjectsByName(db.getName(), tableNames); + ObjectMapping doTask() throws TException { + List<Table> tables = client.getTableObjectsByName(dbName, tableNames); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("#### Fetching tables [" + db.getName() + "][" + + LOGGER.debug("#### Fetching tables [" + dbName + "][" + tableNames + "]"); } + Map<String, Set<String>> objectMapping = new HashMap<>(tables.size()); for (Table tbl : tables) { - TPathChanges tblPathChange; // Table names are case insensitive + if (!tbl.getDbName().equalsIgnoreCase(dbName)) { + // Inconsistency in HMS data + LOGGER.warn(String.format("DB name %s for table %s does not match %s", + tbl.getDbName(), tbl.getTableName(), dbName)); + continue; + } + String tableName = tbl.getTableName().toLowerCase(); - Preconditions.checkArgument(tbl.getDbName().equalsIgnoreCase(db.getName())); - synchronized (update) { - tblPathChange = update.newPathChange(db.getName() + "." + tableName); + String authzObject = dbName + "." + tableName; + List<String> tblPartNames = client.listPartitionNames(dbName, tableName, (short) -1); + for (int i = 0; i < tblPartNames.size(); i += maxPartitionsPerCall) { + List<String> partsToFetch = tblPartNames.subList(i, + Math.min(i + maxPartitionsPerCall, tblPartNames.size())); + Callable<CallResult> partTask = new PartitionTask(dbName, + tableName, authzObject, partsToFetch); + results.add(threadPool.submit(partTask)); } - if (tbl.getSd().getLocation() != null) { - List<String> tblPath = - PathsUpdate.parsePath(tbl.getSd().getLocation()); - if (tblPath != null) { - tblPathChange.addToAddPaths(tblPath); - } - List<String> tblPartNames = client.listPartitionNames(db.getName(), tableName, (short) -1); - for (int i = 0; i < tblPartNames.size(); i += maxPartitionsPerCall) { - List<String> partsToFetch = - tblPartNames.subList(i, Math.min( - i + maxPartitionsPerCall, tblPartNames.size())); - Callable<CallResult> partTask = - new PartitionTask(db.getName(), tableName, - partsToFetch, tblPathChange); - results.add(threadPool.submit(partTask)); - } + String tblPath = pathFromURI(tbl.getSd().getLocation()); + if (tblPath == null) { + continue; } + Set<String> paths = objectMapping.get(authzObject); + if (paths == null) { + paths = new HashSet<>(1); + objectMapping.put(authzObject, paths); + } + paths.add(tblPath); } + return new ObjectMapping(Collections.unmodifiableMap(objectMapping)); } } - class DbTask extends BaseTask { + private class DbTask extends BaseTask { - private final PathsUpdate update; private final String dbName; - DbTask(PathsUpdate update, String dbName) { - super(); - this.update = update; + DbTask(String dbName) { //Database names are case insensitive this.dbName = dbName.toLowerCase(); } @Override - public void doTask() throws Exception { + ObjectMapping doTask() throws TException { Database db = client.getDatabase(dbName); - List<String> dbPath = PathsUpdate.parsePath(db.getLocationUri()); - if (dbPath != null) { - Preconditions.checkArgument(dbName.equalsIgnoreCase(db.getName())); - synchronized (update) { - update.newPathChange(dbName).addToAddPaths(dbPath); - } + if (!dbName.equalsIgnoreCase(db.getName())) { + LOGGER.warn(String.format("Database name %s does not match %s", + db.getName(), dbName)); + return emptyObjectMapping; } List<String> allTblStr = client.getAllTables(dbName); for (int i = 0; i < allTblStr.size(); i += maxTablesPerCall) { - List<String> tablesToFetch = - allTblStr.subList(i, Math.min( - i + maxTablesPerCall, allTblStr.size())); - Callable<CallResult> tableTask = - new TableTask(db, tablesToFetch, update); + List<String> tablesToFetch = allTblStr.subList(i, + Math.min(i + maxTablesPerCall, allTblStr.size())); + Callable<CallResult> tableTask = new TableTask(db, tablesToFetch); results.add(threadPool.submit(tableTask)); } + String dbPath = pathFromURI(db.getLocationUri()); + return (dbPath != null) ? new ObjectMapping(dbName, dbPath) : + emptyObjectMapping; } } public FullUpdateInitializer(HiveMetaStoreClient client, Configuration conf) { this.client = client; - this.maxPartitionsPerCall = conf.getInt( + maxPartitionsPerCall = conf.getInt( ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC, ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT); - this.maxTablesPerCall = conf.getInt( + maxTablesPerCall = conf.getInt( ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC, ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT); + maxRetries = conf.getInt( + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM, + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM_DEFAULT); + waitDurationMillis = conf.getInt( + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS, + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT); threadPool = Executors.newFixedThreadPool(conf.getInt( ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS, ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT)); - maxRetries = conf.getInt( - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM, - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM_DEFAULT); - waitDurationMillis = conf.getInt( - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS, - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT); - failOnRetry = conf.getBoolean( - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE, - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE_DEFAULT); } - public Map<String, Set<String>> createInitialUpdate() throws Exception { - PathsUpdate tempUpdate = new PathsUpdate(-1, false); + /** + * Get Full HMS snapshot. + * @return Full snapshot of HMS objects. + * @throws TException if Thrift error occured + * @throws ExecutionException if there was a scheduling error + * @throws InterruptedException if processing was interrupted + */ + public Map<String, Set<String>> getFullHMSSnapshot() + throws TException, ExecutionException, InterruptedException { + // Get list of all HMS databases List<String> allDbStr = client.getAllDatabases(); + // Schedule async task for each database responsible for fetching per-database + // objects. for (String dbName : allDbStr) { - Callable<CallResult> dbTask = new DbTask(tempUpdate, dbName); - results.add(threadPool.submit(dbTask)); - } - - while (taskCounter.get() != 0) { - // Wait until no more tasks remain - Thread.sleep(250); + results.add(threadPool.submit(new DbTask(dbName))); } - for (Future<CallResult> result : results) { + // Resulting full snapshot + Map<String, Set<String>> fullSnapshot = new HashMap<>(); + + // As async tasks complete, merge their results into full snapshot. + while (!results.isEmpty()) { + // This is the only thread that takes elements off the results list - all other threads + // only add to it. Once the list is empty it can't become non-empty + // This means that if we check that results is non-empty we can safely call pop() and + // know that the result of poll() is not null. + Future<CallResult> result = results.pop(); + // Wait for the task to complete CallResult callResult = result.get(); - - // Fail the HMS startup if tasks are not all successful and - // fail on partial updates flag is set in the config. - if (!callResult.success() && failOnRetry) { + // Fail if we got Thrift errors + if (!callResult.success()) { throw callResult.getFailure(); } - } - - return getAuthzObjToPathMapping(tempUpdate); - } - - - /** - * Parsing a pathsUpdate to get the mapping of hiveObj -> [Paths]. - * It only processes {@link TPathChanges}.addPaths, since - * {@link FullUpdateInitializer} only add paths when fetching - * full HMS Paths snapshot. Each path represented as path tree - * concatenated by "/". e.g 'usr/hive/warehouse'. - * - * @return mapping of hiveObj -> [Paths]. - */ - private Map<String, Set<String>> getAuthzObjToPathMapping(PathsUpdate pathsUpdate) { - List<TPathChanges> tPathChanges = pathsUpdate.getPathChanges(); - if (tPathChanges.isEmpty()) { - return Collections.emptyMap(); - } - Map<String, Set<String>> authzObjToPath = new HashMap<>(tPathChanges.size()); - - for (TPathChanges pathChanges : tPathChanges) { - // Only processes TPathChanges.addPaths - List<List<String>> addPaths = pathChanges.getAddPaths(); - Set<String> paths = new HashSet<>(addPaths.size()); - for (List<String> addPath : addPaths) { - paths.add(PathsUpdate.concatenatePath(addPath)); + // Merge values into fullUpdate + Map<String, Set<String>> objectMapping = + callResult.getObjectMapping().getObjects(); + for (Map.Entry<String, Set<String>> entry: objectMapping.entrySet()) { + String key = entry.getKey(); + Set<String> val = entry.getValue(); + Set<String> existingSet = fullSnapshot.get(key); + if (existingSet == null) { + fullSnapshot.put(key, val); + continue; + } + existingSet.addAll(val); } - authzObjToPath.put(pathChanges.getAuthzObj(), paths); } - - return authzObjToPath; + return fullSnapshot; } @Override public void close() { - if (threadPool != null) { - threadPool.shutdownNow(); + threadPool.shutdownNow(); + try { + threadPool.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException ignored) { + LOGGER.warn("Interrupted shutdown"); } } } http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb0456f/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java index e32d4a7..6b31f7a 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java @@ -24,8 +24,6 @@ import java.util.LinkedList; import java.util.List; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Splitter; import org.apache.sentry.hdfs.service.thrift.TPathChanges; import org.apache.sentry.hdfs.service.thrift.TPathsUpdate; import org.apache.commons.httpclient.util.URIUtil; @@ -34,10 +32,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.conf.Configuration; -import com.google.common.collect.Lists; import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A wrapper class over the TPathsUpdate thrift generated class. Please see @@ -45,10 +40,13 @@ import org.slf4j.LoggerFactory; */ public class PathsUpdate implements Updateable.Update { - private static final Logger LOGGER = LoggerFactory.getLogger(PathsUpdate.class); - public static final String ALL_PATHS = "__ALL_PATHS__"; + + private static final Configuration CONF = new Configuration(); + private static String DEFAULT_SCHEME = FileSystem.getDefaultUri(CONF).getScheme(); + private static final String SUPPORTED_SCHEME = "hdfs"; + private final TPathsUpdate tPathsUpdate; public PathsUpdate() { @@ -95,85 +93,65 @@ public class PathsUpdate implements Updateable.Update { return tPathsUpdate; } + /** + * Only used for testing. + * @param scheme new default scheme + */ @VisibleForTesting - public static Configuration getConfiguration() { - return CONF; + public static void setDefaultScheme(String scheme) { + DEFAULT_SCHEME = scheme; } /** - * - * @param path : Needs to be a HDFS location in the forms: - * - hdfs://hostname:port/path - * - hdfs:///path - * - /path, in which case, scheme will be constructed from FileSystem.getDefaultURI - * - URIs with non hdfs schemee will just be ignored - * @return Path in the form a list containing the path tree with scheme/ authority stripped off. - * Returns null if a non HDFS path or if path is null/empty + * Convert URI to path, trimming leading slash. + * @param path HDFS location in one of the forms: + * <ul> + * <li>hdfs://hostname:port/path + * <li>hdfs:///path + * <li>/path, in which case, scheme will be constructed from FileSystem.getDefaultURI + * <li>URIs with non hdfs schemee will just be ignored + * </ul> + * @return Path with scheme/ authority stripped off. + * Returns null if a non HDFS path or if path is null/empty. */ - public static List<String> parsePath(String path) throws SentryMalformedPathException { - try { - LOGGER.debug("Parsing path " + path); - URI uri = null; - if (StringUtils.isNotEmpty(path)) { - uri = new URI(URIUtil.encodePath(path)); - } else { - String msg = "Input is empty"; - throw new SentryMalformedPathException(msg); - } - - String scheme = uri.getScheme(); - if (scheme == null) { - // Use the default URI scheme only if the path has no scheme. - URI defaultUri = FileSystem.getDefaultUri(CONF); - scheme = defaultUri.getScheme(); - if(scheme == null) { - String msg = "Scheme is missing and could not be constructed from defaultURI=" + defaultUri; - throw new SentryMalformedPathException(msg); - } - } + public static String parsePath(String path) throws SentryMalformedPathException { + if (StringUtils.isEmpty(path)) { + return null; + } - // Non-HDFS paths will be skipped. - if(scheme.equalsIgnoreCase("hdfs")) { - String uriPath = uri.getPath(); - if(uriPath == null) { - throw new SentryMalformedPathException("Path is empty. uri=" + uri); - } - if(uriPath.split("^/").length < 2) { - throw new SentryMalformedPathException("Path part of uri does not seem right, was expecting a non empty path" + - ": path = " + uriPath + ", uri=" + uri); - } - return Lists.newArrayList(uriPath.split("^/")[1].split("/")); - } else { - LOGGER.warn("Invalid FS: " + scheme + "://; expected hdfs://"); - return null; - } + URI uri; + try { + uri = new URI(URIUtil.encodePath(path)); } catch (URISyntaxException e) { throw new SentryMalformedPathException("Incomprehensible path [" + path + "]", e); - } catch (URIException e){ - throw new SentryMalformedPathException("Unable to create URI: ", e); + } catch (URIException e) { + throw new SentryMalformedPathException("Unable to create URI from path[" + path + "]", e); } - } - /** - * Given a path tree in a list, return a string concatenated by "/". - * e.g < usr, hive, warehouse > -> 'usr/hive/warehouse'. - * - * @param paths - * @return a path string concatenated by "/". - */ - public static String concatenatePath(Iterable<String> paths) { - return Joiner.on("/").join(paths); - } + String scheme = uri.getScheme(); + if (scheme == null) { + scheme = DEFAULT_SCHEME; + if(scheme == null) { + throw new SentryMalformedPathException( + "Scheme is missing and could not be constructed from configuration"); + } + } - /** - * Split a path a path concatenated by "/" into a path tree represented - * as a list. - * - * @param path - * @return a path tree represented as a list. - */ - public static List<String> splitPath(String path) { - return Lists.newArrayList(Splitter.on("/").split(path)); + // Non-HDFS paths are skipped. + if(!scheme.equalsIgnoreCase(SUPPORTED_SCHEME)) { + return null; + } + + String uriPath = uri.getPath(); + if(uriPath == null) { + throw new SentryMalformedPathException("Path is empty. uri=" + uri); + } + if (!uriPath.startsWith("/")) { + throw new SentryMalformedPathException("Path part of uri does not seem right, was expecting a non empty path" + + ": path = " + uriPath + ", uri=" + uri); + } + // Remove leading slash + return uriPath.substring(1); } @Override http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb0456f/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java index 23552c2..48e2e49 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java @@ -57,8 +57,6 @@ public class ServiceConstants { public static final int SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM_DEFAULT = 1; public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS = "sentry.hdfs.sync.metastore.cache.retry.wait.duration.millis"; public static final int SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT = 1000; - public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE = "sentry.hdfs.sync.metastore.cache.fail.on.partial.update"; - public static final boolean SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE_DEFAULT = true; public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE = "sentry.hdfs.sync.metastore.cache.async-init.enable"; public static final boolean SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE_DEFAULT = false; http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb0456f/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java index f338ce8..389e9b8 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java +++ b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java @@ -25,108 +25,220 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; public class TestFullUpdateInitializer { - @Test - public void testInitializer() throws Exception { - - Database db1 = Mockito.mock(Database.class); - Mockito.when(db1.getName()).thenReturn("db1"); - Mockito.when(db1.getLocationUri()).thenReturn("hdfs:///db1"); - Database db2 = Mockito.mock(Database.class); - Mockito.when(db2.getName()).thenReturn("db2"); - Mockito.when(db2.getLocationUri()).thenReturn("hdfs:///db2"); - Database db3 = Mockito.mock(Database.class); - Mockito.when(db3.getName()).thenReturn("db3"); - Mockito.when(db3.getLocationUri()).thenReturn("hdfs:///db3"); - - Table tab21 = Mockito.mock(Table.class); - Mockito.when(tab21.getDbName()).thenReturn("db2"); - Mockito.when(tab21.getTableName()).thenReturn("tab21"); - StorageDescriptor sd21 = Mockito.mock(StorageDescriptor.class); - Mockito.when(sd21.getLocation()).thenReturn("hdfs:///db2/tab21"); - Mockito.when(tab21.getSd()).thenReturn(sd21); - - Table tab31 = Mockito.mock(Table.class); - Mockito.when(tab31.getDbName()).thenReturn("db3"); - Mockito.when(tab31.getTableName()).thenReturn("tab31"); - StorageDescriptor sd31 = Mockito.mock(StorageDescriptor.class); - Mockito.when(sd31.getLocation()).thenReturn("hdfs:///db3/tab31"); - Mockito.when(tab31.getSd()).thenReturn(sd31); - - Partition part311 = Mockito.mock(Partition.class); - StorageDescriptor sd311 = Mockito.mock(StorageDescriptor.class); - Mockito.when(sd311.getLocation()).thenReturn("hdfs:///db3/tab31/part311"); - Mockito.when(part311.getSd()).thenReturn(sd311); - - Partition part312 = Mockito.mock(Partition.class); - StorageDescriptor sd312 = Mockito.mock(StorageDescriptor.class); - Mockito.when(sd312.getLocation()).thenReturn("hdfs:///db3/tab31/part312"); - Mockito.when(part312.getSd()).thenReturn(sd312); + private static Configuration conf = new Configuration(); - HiveMetaStoreClient client = Mockito.mock(HiveMetaStoreClient.class); - Mockito.when(client.getAllDatabases()).thenReturn(Lists - .newArrayList("db1", "db2", "db3")); - Mockito.when(client.getDatabase("db1")).thenReturn(db1); - Mockito.when(client.getAllTables("db1")).thenReturn(new ArrayList<String>()); - - Mockito.when(client.getDatabase("db2")).thenReturn(db2); - Mockito.when(client.getAllTables("db2")).thenReturn(Lists.newArrayList("tab21")); - Mockito.when(client.getTableObjectsByName("db2", Lists.newArrayList("tab21"))) - .thenReturn(Lists.newArrayList(tab21)); - Mockito.when(client.listPartitionNames("db2", "tab21", (short) -1)) - .thenReturn(new ArrayList<String>()); - - Mockito.when(client.getDatabase("db3")).thenReturn(db3); - Mockito.when(client.getAllTables("db3")).thenReturn(Lists - .newArrayList("tab31")); - Mockito.when(client.getTableObjectsByName("db3", Lists.newArrayList("tab31"))) - .thenReturn(Lists.newArrayList(tab31)); - Mockito.when(client.listPartitionNames("db3", "tab31", (short) -1)) - .thenReturn(Lists.newArrayList("part311", "part312")); - - Mockito.when(client.getPartitionsByNames("db3", "tab31", Lists.newArrayList("part311"))) - .thenReturn(Lists.newArrayList(part311)); - Mockito.when(client.getPartitionsByNames("db3", "tab31", Lists.newArrayList("part312"))) - .thenReturn(Lists.newArrayList(part312)); - - Configuration conf = new Configuration(); + static { conf.setInt(ServiceConstants.ServerConfig - .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC, 1); + .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC, 1); conf.setInt(ServiceConstants.ServerConfig - .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC, 1); + .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC, 1); conf.setInt(ServiceConstants.ServerConfig - .SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS, 1); + .SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS, 8); + } + + /** + * Representation of a Hive table. A table has a name and a list of partitions. + */ + private static class HiveTable { + String name; + List<String> partitions; + + HiveTable(String name) { + this.name = name; + this.partitions = new ArrayList<>(); + } + + HiveTable(String name, List<String> partitions) { + this.name = name; + this.partitions = partitions; + if (this.partitions == null) { + this.partitions = new ArrayList<>(); + } + } + + HiveTable add(String partition) { + partitions.add(partition); + return this; + } + } + + /** + * Representation of a Hive database. A database has a name and a list of tables + */ + private static class HiveDb { + String name; + Collection<HiveTable> tables; + + HiveDb(String name) { + this.name = name; + tables = new ArrayList<>(); + } + + HiveDb(String name, Collection<HiveTable> tables) { + this.name = name; + this.tables = tables; + if (this.tables == null) { + this.tables = new ArrayList<>(); + } + } + + void add(HiveTable table) { + this.tables.add(table); + } + } + + /** + * Representation of a full Hive snapshot. A snapshot is collection of databases + */ + private static class HiveSnapshot { + List<HiveDb> databases = new ArrayList<>(); + + HiveSnapshot() { + } + + HiveSnapshot(Collection<HiveDb> dblist) { + if (dblist != null) { + databases.addAll(dblist); + } + } + + HiveSnapshot add(HiveDb db) { + this.databases.add(db); + return this; + } + } - FullUpdateInitializer cacheInitializer = new - FullUpdateInitializer(client, conf); - Map<String, Set<String>> update = cacheInitializer.createInitialUpdate(); + /** + * Convert Hive snapshot to mock client that will return proper values + * for the snapshot. + */ + private static class MockClient { + HiveMetaStoreClient client; - Assert.assertEquals(update.get("db1"), Sets.newHashSet("db1")); - Assert.assertEquals(update.get("db2"), Sets.newHashSet("db2")); - Assert.assertEquals(update.get("db2.tab21"), Sets.newHashSet("db2/tab21")); - Assert.assertEquals(update.get("db3.tab31"), Sets.newHashSet("db3/tab31", - "db3/tab31/part311", "db3/tab31/part312")); + MockClient(HiveSnapshot snapshot) throws TException { + client = Mockito.mock(HiveMetaStoreClient.class); + List<String> dbNames = new ArrayList<>(snapshot.databases.size()); + // Walk over all databases and mock appropriate objects + for (HiveDb mdb: snapshot.databases) { + String dbName = mdb.name; + dbNames.add(dbName); + Database db = makeDb(dbName); + Mockito.when(client.getDatabase(dbName)).thenReturn(db); + List<String> tableNames = new ArrayList<>(mdb.tables.size()); + // Walk over all tables for the database and mock appropriate objects + for (HiveTable table: mdb.tables) { + String tableName = table.name; + tableNames.add(tableName); + Table mockTable = makeTable(dbName, tableName); + Mockito.when(client.getTableObjectsByName(dbName, + Lists.newArrayList(tableName))) + .thenReturn(Lists.newArrayList(mockTable)); + Mockito.when(client.listPartitionNames(dbName, tableName, (short) -1)) + .thenReturn(table.partitions); + // Walk across all partitions and mock appropriate objects + for (String partName: table.partitions) { + Partition p = makePartition(dbName, tableName, partName); + Mockito.when(client.getPartitionsByNames(dbName, tableName, + Lists.<String>newArrayList(partName))) + .thenReturn(Lists.<Partition>newArrayList(p)); + } + } + Mockito.when(client.getAllTables(dbName)).thenReturn(tableNames); + } + // Return all database names + Mockito.when(client.getAllDatabases()).thenReturn(dbNames); + } + } + + /** + * Create mock database with the given name + * @param name Database name + * @return Mock database object + */ + private static Database makeDb(String name) { + Database db = Mockito.mock(Database.class); + Mockito.when(db.getName()).thenReturn(name); + Mockito.when(db.getLocationUri()).thenReturn("hdfs:///" + name); + return db; + } - cacheInitializer.close(); + /** + * Create mock table + * @param dbName db for this table + * @param tableName name of the table + * @return mock table object + */ + private static Table makeTable(String dbName, String tableName) { + Table table = Mockito.mock(Table.class); + Mockito.when(table.getDbName()).thenReturn(dbName); + Mockito.when(table.getTableName()).thenReturn(tableName); + StorageDescriptor sd = Mockito.mock(StorageDescriptor.class); + Mockito.when(sd.getLocation()).thenReturn( + String.format("hdfs:///%s/%s", dbName, tableName)); + Mockito.when(table.getSd()).thenReturn(sd); + return table; + } + /** + * Create mock partition + * @param dbName database for this partition + * @param tableName table for this partition + * @param partName partition name + * @return mock partition object + */ + private static Partition makePartition(String dbName, String tableName, String partName) { + Partition partition = Mockito.mock(Partition.class); + StorageDescriptor sd = Mockito.mock(StorageDescriptor.class); + Mockito.when(sd.getLocation()).thenReturn( + String.format("hdfs:///%s/%s/%s", dbName, tableName, partName)); + Mockito.when(partition.getSd()).thenReturn(sd); + return partition; } - // Make sure exceptions in initializer parallel tasks are propagated well @Test - public void testExceptionInTask() throws Exception { + // Test basic operation with small database + public void testSimple() throws Exception { + HiveTable tab21 = new HiveTable("tab21"); + HiveTable tab31 = new HiveTable("tab31").add("part311").add("part312"); + HiveDb db3 = new HiveDb("db3", Lists.newArrayList(tab31)); + HiveDb db2 = new HiveDb("db2", Lists.newArrayList(tab21)); + HiveDb db1 = new HiveDb("db1"); + HiveSnapshot snap = new HiveSnapshot().add(db1).add(db2).add(db3); + MockClient c = new MockClient(snap); + + Map<String, Set<String>> update; + try(FullUpdateInitializer cacheInitializer = new FullUpdateInitializer(c.client, conf)) { + update = cacheInitializer.getFullHMSSnapshot(); + } + Assert.assertEquals(5, update.size()); + Assert.assertEquals(Sets.newHashSet("db1"), update.get("db1")); + Assert.assertEquals(Sets.newHashSet("db2"), update.get("db2")); + Assert.assertEquals(Sets.newHashSet("db3"), update.get("db3")); + Assert.assertEquals(Sets.newHashSet("db2/tab21"), update.get("db2.tab21")); + Assert.assertEquals(Sets.newHashSet("db3/tab31", + "db3/tab31/part311", "db3/tab31/part312"), update.get("db3.tab31")); + } + + @Test + // Test that invalid paths are handled correctly + public void testInvalidPaths() throws Exception { //Set up mocks: db1.tb1, with tb1 returning a wrong dbname (db2) - Database db1 = Mockito.mock(Database.class); - Mockito.when(db1.getName()).thenReturn("db1"); - Mockito.when(db1.getLocationUri()).thenReturn("hdfs:///db1"); + Database db1 = makeDb("db1"); Table tab1 = Mockito.mock(Table.class); //Return a wrong db name, so that this triggers an exception @@ -136,28 +248,73 @@ public class TestFullUpdateInitializer { HiveMetaStoreClient client = Mockito.mock(HiveMetaStoreClient.class); Mockito.when(client.getAllDatabases()).thenReturn(Lists.newArrayList("db1")); Mockito.when(client.getDatabase("db1")).thenReturn(db1); + + Table tab12 = Mockito.mock(Table.class); + Mockito.when(tab12.getDbName()).thenReturn("db1"); + Mockito.when(tab12.getTableName()).thenReturn("tab21"); + StorageDescriptor sd21 = Mockito.mock(StorageDescriptor.class); + Mockito.when(sd21.getLocation()).thenReturn("hdfs:///db1/tab21"); + Mockito.when(tab12.getSd()).thenReturn(sd21); + + Mockito.when(client.getTableObjectsByName("db1", + Lists.newArrayList("tab1"))).thenReturn(Lists.newArrayList(tab1)); Mockito.when(client.getTableObjectsByName("db1", - Lists.newArrayList("tab1"))) - .thenReturn(Lists.newArrayList(tab1)); - Mockito.when(client.getAllTables("db1")).thenReturn(Lists - .newArrayList("tab1")); + Lists.newArrayList("tab12"))).thenReturn(Lists.newArrayList(tab12)); + Mockito.when(client.getAllTables("db1")). + thenReturn(Lists.newArrayList("tab1", "tab12")); - Configuration conf = new Configuration(); - conf.setInt(ServiceConstants.ServerConfig - .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC, 1); - conf.setInt(ServiceConstants.ServerConfig - .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC, 1); - conf.setInt(ServiceConstants.ServerConfig - .SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS, 1); - conf.setInt(ServiceConstants.ServerConfig - .SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM, 2); - - try { - FullUpdateInitializer cacheInitializer = new FullUpdateInitializer(client, conf); - cacheInitializer.createInitialUpdate(); - Assert.fail("Expected cacheInitializer to fail"); - } catch (Exception e) { - Assert.assertTrue(e instanceof RuntimeException); + + Map<String, Set<String>> update; + try(FullUpdateInitializer cacheInitializer = new FullUpdateInitializer(client, conf)) { + update = cacheInitializer.getFullHMSSnapshot(); + } + Assert.assertEquals(2, update.size()); + Assert.assertEquals(Sets.newHashSet("db1"), update.get("db1")); + Assert.assertEquals(Sets.newHashSet("db1/tab21"), update.get("db1.tab21")); + } + + @Test + // Test handling of a big tables and partitions + public void testBig() throws Exception { + int ndbs = 3; + int ntables = 51; + int nparts = 131; + + HiveSnapshot snap = new HiveSnapshot(); + + for (int i = 0; i < ndbs; i++) { + HiveDb db = new HiveDb("db" + i); + for (int j = 0; j < ntables; j++) { + HiveTable table = new HiveTable("table" + i + j); + for (int k = 0; k < nparts; k++) { + table.add("part" + i + j + k); + } + db.add(table); + } + snap.add(db); + } + MockClient c = new MockClient(snap); + Map<String, Set<String>> update; + try(FullUpdateInitializer cacheInitializer = new FullUpdateInitializer(c.client, conf)) { + update = cacheInitializer.getFullHMSSnapshot(); + } + Assert.assertEquals((ntables * ndbs) + ndbs, update.size()); + for (int i = 0; i < ndbs; i++) { + String dbName = "db" + i; + Assert.assertEquals(Sets.newHashSet(dbName), update.get(dbName)); + + for (int j = 0; j < ntables; j++) { + String tableName = "table" + i + j; + Set<String> values = new HashSet<>(); + values.add(String.format("%s/%s", dbName, tableName)); + for (int k = 0; k < nparts; k++) { + String partName = "part" + i + j + k; + values.add(String.format("%s/%s/%s", dbName, tableName, partName)); + } + String authz = dbName + "." + tableName; + Assert.assertEquals(values, update.get(authz)); + } } } + } http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb0456f/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestPathsUpdate.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestPathsUpdate.java b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestPathsUpdate.java index b5cbea9..c1a8a74 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestPathsUpdate.java +++ b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestPathsUpdate.java @@ -19,6 +19,7 @@ package org.apache.sentry.hdfs; import java.util.List; +import com.google.common.collect.Lists; import org.apache.sentry.hdfs.service.thrift.TPathChanges; import org.apache.sentry.hdfs.service.thrift.TPathsUpdate; import org.apache.thrift.TException; @@ -26,28 +27,29 @@ import org.junit.Test; import org.junit.Assert; public class TestPathsUpdate { + private List<String> uriToList(String uri) throws SentryMalformedPathException { + String path = PathsUpdate.parsePath(uri); + return Lists.newArrayList(path.split("/")); + } + @Test public void testParsePathComplexCharacters() throws SentryMalformedPathException{ - List<String> results = PathsUpdate.parsePath( + List<String> results = uriToList( "hdfs://hostname.test.com:8020/user/hive/warehouse/break/b=all | ' & the spaces/c=in PartKeys/With fun chars *%!|" ); - System.out.println(results); Assert.assertNotNull("Parse path without throwing exception",results); } @Test public void testPositiveParsePath() throws SentryMalformedPathException { - List<String> results = PathsUpdate.parsePath("hdfs://hostname.test.com:8020/path"); - Assert.assertTrue("Parsed path is unexpected", results.get(0).equals("path")); - Assert.assertTrue("Parsed path size is unexpected", results.size() == 1); + String result = PathsUpdate.parsePath("hdfs://hostname.test.com:8020/path"); + Assert.assertTrue("Parsed path is unexpected", result.equals("path")); - results = PathsUpdate.parsePath("hdfs://hostname.test.com/path"); - Assert.assertTrue("Parsed path is unexpected", results.get(0).equals("path")); - Assert.assertTrue("Parsed path size is unexpected", results.size() == 1); + result = PathsUpdate.parsePath("hdfs://hostname.test.com/path"); + Assert.assertTrue("Parsed path is unexpected", result.equals("path")); - results = PathsUpdate.parsePath("hdfs:///path"); - Assert.assertTrue("Parsed path is unexpected", results.get(0).equals("path")); - Assert.assertTrue("Parsed path size is unexpected", results.size() == 1); + result = PathsUpdate.parsePath("hdfs:///path"); + Assert.assertTrue("Parsed path is unexpected", result.equals("path")); } @Test(expected = SentryMalformedPathException.class) @@ -58,16 +60,17 @@ public class TestPathsUpdate { //if file:// - should return null @Test public void testMalformedPathFile() throws SentryMalformedPathException{ - List<String> results = PathsUpdate.parsePath("file://hostname/path"); - System.out.println(results); - Assert.assertNull("Parse path without throwing exception",results); + String result = PathsUpdate.parsePath("file://hostname/path"); + Assert.assertNull("Parse path without throwing exception",result); } @Test public void testSerializeDeserializeInJSON() throws SentryMalformedPathException, TException{ PathsUpdate update = new PathsUpdate(1, true); TPathChanges pathChange = update.newPathChange("db1.tbl12"); - pathChange.addToAddPaths(PathsUpdate.parsePath("hdfs:///db1/tbl12/part121")); + String path = PathsUpdate.parsePath("hdfs:///db1/tbl12/part121"); + List<String> paths = Lists.newArrayList(path.split("/")); + pathChange.addToAddPaths(paths); // Serialize and deserialize the PermssionUpdate object should equals to the original one. TPathsUpdate before = update.toThrift(); http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb0456f/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestUpdateableAuthzPaths.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestUpdateableAuthzPaths.java b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestUpdateableAuthzPaths.java index e643e01..9a726da 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestUpdateableAuthzPaths.java +++ b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestUpdateableAuthzPaths.java @@ -17,6 +17,7 @@ */ package org.apache.sentry.hdfs; +import java.util.List; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.sentry.hdfs.service.thrift.TPathChanges; @@ -28,6 +29,11 @@ import static org.junit.Assert.*; public class TestUpdateableAuthzPaths { + private List<String> uriToList(String uri) throws SentryMalformedPathException { + String path = PathsUpdate.parsePath(uri); + return Lists.newArrayList(path.split("/")); + } + @Test public void testFullUpdate() { HMSPaths hmsPaths = createBaseHMSPaths(1, 1); @@ -75,13 +81,13 @@ public class TestUpdateableAuthzPaths { // Create table PathsUpdate update = new PathsUpdate(2, false); TPathChanges pathChange = update.newPathChange("db1.tbl12"); - pathChange.addToAddPaths(PathsUpdate.parsePath("hdfs:///db1/tbl12")); + pathChange.addToAddPaths(uriToList("hdfs:///db1/tbl12")); authzPaths.updatePartial(Lists.newArrayList(update), lock); // Add partition update = new PathsUpdate(3, false); pathChange = update.newPathChange("db1.tbl12"); - pathChange.addToAddPaths(PathsUpdate.parsePath("hdfs:///db1/tbl12/part121")); + pathChange.addToAddPaths(uriToList("hdfs:///db1/tbl12/part121")); authzPaths.updatePartial(Lists.newArrayList(update), lock); // Ensure no change in existing Paths @@ -96,8 +102,8 @@ public class TestUpdateableAuthzPaths { // Rename table update = new PathsUpdate(4, false); - update.newPathChange("db1.xtbl11").addToAddPaths(PathsUpdate.parsePath("hdfs:///db1/xtbl11")); - update.newPathChange("db1.tbl11").addToDelPaths(PathsUpdate.parsePath("hdfs:///db1/tbl11")); + update.newPathChange("db1.xtbl11").addToAddPaths(uriToList("hdfs:///db1/xtbl11")); + update.newPathChange("db1.tbl11").addToDelPaths(uriToList("hdfs:///db1/tbl11")); authzPaths.updatePartial(Lists.newArrayList(update), lock); // Verify name change @@ -125,7 +131,7 @@ public class TestUpdateableAuthzPaths { // Drop partition PathsUpdate update = new PathsUpdate(2, false); TPathChanges pathChange = update.newPathChange("db1.tbl11"); - pathChange.addToDelPaths(PathsUpdate.parsePath("hdfs:///db1/tbl11/part111")); + pathChange.addToDelPaths(uriToList("hdfs:///db1/tbl11/part111")); authzPaths.updatePartial(Lists.newArrayList(update), lock); // Verify Paths deleted http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb0456f/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java index 0eaac80..de94743 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java @@ -18,12 +18,15 @@ package org.apache.sentry.hdfs; import com.codahale.metrics.Timer; +import com.google.common.base.Splitter; import com.google.common.collect.Lists; import org.apache.sentry.hdfs.service.thrift.TPathChanges; import org.apache.sentry.provider.db.service.persistent.PathsImage; import org.apache.sentry.provider.db.service.persistent.SentryStore; import javax.annotation.concurrent.ThreadSafe; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -39,7 +42,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; public class PathImageRetriever implements ImageRetriever<PathsUpdate> { private final SentryStore sentryStore; - private final static String[] root = {"/"}; + private static final String[] root = {"/"}; PathImageRetriever(SentryStore sentryStore) { this.sentryStore = sentryStore; @@ -66,7 +69,7 @@ public class PathImageRetriever implements ImageRetriever<PathsUpdate> { TPathChanges pathChange = pathsUpdate.newPathChange(pathEnt.getKey()); for (String path : pathEnt.getValue()) { - pathChange.addToAddPaths(PathsUpdate.splitPath(path)); + pathChange.addToAddPaths(Lists.newArrayList(Splitter.on("/").split(path))); } } http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb0456f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java index 8b88c9a..ef67865 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java @@ -2552,7 +2552,7 @@ public class SentryStore { * @param update the corresponding path delta update * @throws Exception */ - public void addAuthzPathsMapping(final String authzObj, final Set<String> paths, + public void addAuthzPathsMapping(final String authzObj, final Iterable<String> paths, final Update update) throws Exception { execute(new DeltaTransactionBlock(update), new TransactionBlock<Object>() { public Object execute(PersistenceManager pm) throws Exception { @@ -2572,7 +2572,7 @@ public class SentryStore { * @param paths a set of paths need to be added into the authzObj -> [Paths] mapping */ private void addAuthzPathsMappingCore(PersistenceManager pm, String authzObj, - Set<String> paths) { + Iterable<String> paths) { MAuthzPathsMapping mAuthzPathsMapping = getMAuthzPathsMappingCore(pm, authzObj); if (mAuthzPathsMapping == null) { mAuthzPathsMapping = new MAuthzPathsMapping(authzObj, paths); @@ -2593,7 +2593,7 @@ public class SentryStore { * @param paths a set of paths need to be deleted from the authzObj -> [Paths] mapping * @param update the corresponding path delta update */ - public void deleteAuthzPathsMapping(final String authzObj, final Set<String> paths, + public void deleteAuthzPathsMapping(final String authzObj, final Iterable<String> paths, final Update update) throws Exception { execute(new DeltaTransactionBlock(update), new TransactionBlock<Object>() { public Object execute(PersistenceManager pm) throws Exception { @@ -2612,7 +2612,7 @@ public class SentryStore { * @throws SentryNoSuchObjectException if cannot find the existing authzObj or path. */ private void deleteAuthzPathsMappingCore(PersistenceManager pm, String authzObj, - Set<String> paths) { + Iterable<String> paths) { MAuthzPathsMapping mAuthzPathsMapping = getMAuthzPathsMappingCore(pm, authzObj); if (mAuthzPathsMapping != null) { for (String path : paths) { http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb0456f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java index 57b7f88..ca4487f 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java @@ -207,7 +207,7 @@ public class HMSFollower implements Runnable { connectedToHMS = true; LOGGER.info("HMSFollower of Sentry successfully connected to HMS"); } - } catch (Exception e) { + } catch (Throwable e) { LOGGER.error("HMSFollower cannot connect to HMS!!", e); return; } @@ -316,27 +316,19 @@ public class HMSFollower implements Runnable { /** * Retrieve a Hive full snapshot from HMS. - * - * @return mapping of hiveObj -> [Paths]. - * @throws ExecutionException, InterruptedException, TException + * @return HMS snapshot. Snapshot consists of a mapping from auth object name + * to the set of paths corresponding to that name. + * @throws InterruptedException + * @throws TException + * @throws ExecutionException */ private Map<String, Set<String>> fetchFullUpdate() - throws Exception { - FullUpdateInitializer updateInitializer = null; - - try { - updateInitializer = new FullUpdateInitializer(client, authzConf); - Map<String, Set<String>> pathsUpdate = updateInitializer.createInitialUpdate(); - LOGGER.info("Obtained full snapshot from HMS"); + throws InterruptedException, TException, ExecutionException { + LOGGER.info("Request full HMS snapshot"); + try (FullUpdateInitializer updateInitializer = new FullUpdateInitializer(client, authzConf)) { + Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot(); + LOGGER.info("Obtained full HMS snapshot"); return pathsUpdate; - } finally { - if (updateInitializer != null) { - try { - updateInitializer.close(); - } catch (Exception e) { - LOGGER.error("Exception while closing updateInitializer", e); - } - } } } http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb0456f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java index 84574f0..083e0ac 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java @@ -18,20 +18,22 @@ package org.apache.sentry.service.thrift; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import org.apache.sentry.hdfs.PathsUpdate; import org.apache.sentry.hdfs.SentryMalformedPathException; import org.apache.sentry.provider.db.service.persistent.SentryStore; import org.slf4j.Logger; -import java.util.*; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; /** * NotificationProcessor processes various notification events generated from * the Hive MetaStore state change, and applies these changes on the complete * HMS Paths snapshot or delta update stored in Sentry using SentryStore. */ -public class NotificationProcessor { +class NotificationProcessor { private final Logger LOGGER; private final SentryStore sentryStore; @@ -132,7 +134,8 @@ public class NotificationProcessor { * @param seqNum notification event ID * @throws Exception if encounters errors while persisting the path change */ - void processAddPartition(String dbName, String tableName, List<String> locations, long seqNum) + void processAddPartition(String dbName, String tableName, + Collection<String> locations, long seqNum) throws Exception { String authzObj = dbName + "." + tableName; addPaths(authzObj, locations, seqNum); @@ -148,7 +151,8 @@ public class NotificationProcessor { * @param seqNum notification event ID * @throws Exception if encounters errors while persisting the path change */ - void processDropPartition(String dbName, String tableName, List<String> locations, long seqNum) + void processDropPartition(String dbName, String tableName, + Collection<String> locations, long seqNum) throws Exception { String authzObj = dbName + "." + tableName; removePaths(authzObj, locations, seqNum); @@ -180,17 +184,17 @@ public class NotificationProcessor { * @param seqNum notification event ID * @throws Exception */ - private void addPaths(String authzObj, List<String> locations, long seqNum) + private void addPaths(String authzObj, Collection<String> locations, long seqNum) throws Exception { // AuthzObj is case insensitive authzObj = authzObj.toLowerCase(); PathsUpdate update = new PathsUpdate(seqNum, false); - Set<String> paths = new HashSet<>(); + Collection<String> paths = new HashSet<>(locations.size()); // addPath and persist into Sentry DB. // Skip update if encounter malformed path. for (String location : locations) { - List<String> pathTree = getPath(location); + String pathTree = getPath(location); if (pathTree == null) { LOGGER.debug("#### HMS Path Update [" + "OP : addPath, " @@ -203,8 +207,8 @@ public class NotificationProcessor { + authzObj + ", " + "path : " + location + ", " + "notification event ID: " + seqNum + "]"); - update.newPathChange(authzObj).addToAddPaths(pathTree); - paths.add(PathsUpdate.concatenatePath(pathTree)); + update.newPathChange(authzObj).addToAddPaths(splitPath(pathTree)); + paths.add(pathTree); } } sentryStore.addAuthzPathsMapping(authzObj, paths, update); @@ -219,15 +223,15 @@ public class NotificationProcessor { * @param seqNum notification event ID * @throws Exception */ - private void removePaths(String authzObj, List<String> locations, long seqNum) + private void removePaths(String authzObj, Collection<String> locations, long seqNum) throws Exception { // AuthzObj is case insensitive authzObj = authzObj.toLowerCase(); PathsUpdate update = new PathsUpdate(seqNum, false); - Set<String> paths = new HashSet<>(); + Collection<String> paths = new HashSet<>(locations.size()); for (String location : locations) { - List<String> pathTree = getPath(location); + String pathTree = getPath(location); if (pathTree == null) { LOGGER.debug("#### HMS Path Update [" + "OP : removePath, " @@ -240,8 +244,8 @@ public class NotificationProcessor { + "authzObj : " + authzObj + ", " + "path : " + location + ", " + "notification event ID: " + seqNum + "]"); - update.newPathChange(authzObj).addToDelPaths(pathTree); - paths.add(PathsUpdate.concatenatePath(pathTree)); + update.newPathChange(authzObj).addToDelPaths(splitPath(pathTree)); + paths.add(pathTree); } } sentryStore.deleteAuthzPathsMapping(authzObj, paths, update); @@ -288,8 +292,8 @@ public class NotificationProcessor { // AuthzObj is case insensitive oldAuthzObj = oldAuthzObj.toLowerCase(); newAuthzObj = newAuthzObj.toLowerCase(); - List<String> oldPathTree = getPath(oldLocation); - List<String> newPathTree = getPath(newLocation); + String oldPathTree = getPath(oldLocation); + String newPathTree = getPath(newLocation); LOGGER.debug("#### HMS Path Update [" + "OP : renameAuthzObject, " @@ -302,54 +306,52 @@ public class NotificationProcessor { // In the case of HiveObj name has changed if (!oldAuthzObj.equalsIgnoreCase(newAuthzObj)) { // Skip update if encounter malformed path for both oldLocation and newLocation. - if (oldPathTree != null && newPathTree != null) { + if ((oldPathTree != null) && (newPathTree != null)) { PathsUpdate update = new PathsUpdate(seqNum, false); - update.newPathChange(oldAuthzObj).addToDelPaths(oldPathTree); - update.newPathChange(newAuthzObj).addToAddPaths(newPathTree); - if (!oldLocation.equals(newLocation)) { - // Both name and location has changed - // - Alter table rename for managed table - sentryStore.renameAuthzPathsMapping(oldAuthzObj, newAuthzObj, - PathsUpdate.concatenatePath(oldPathTree), - PathsUpdate.concatenatePath(newPathTree), - update); - } else { + update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree)); + update.newPathChange(newAuthzObj).addToAddPaths(splitPath(newPathTree)); + if (oldLocation.equals(newLocation)) { //Only name has changed // - Alter table rename for an external table sentryStore.renameAuthzObj(oldAuthzObj, newAuthzObj, update); + } else { + // Both name and location has changed + // - Alter table rename for managed table + sentryStore.renameAuthzPathsMapping(oldAuthzObj, newAuthzObj, oldPathTree, + newPathTree, update); } } else if (oldPathTree != null) { PathsUpdate update = new PathsUpdate(seqNum, false); - update.newPathChange(oldAuthzObj).addToDelPaths(oldPathTree); + update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree)); sentryStore.deleteAuthzPathsMapping(oldAuthzObj, - Sets.newHashSet(PathsUpdate.concatenatePath(oldPathTree)), + Collections.singleton(oldPathTree), update); } else if (newPathTree != null) { PathsUpdate update = new PathsUpdate(seqNum, false); - update.newPathChange(newAuthzObj).addToAddPaths(newPathTree); + update.newPathChange(newAuthzObj).addToAddPaths(splitPath(newPathTree)); sentryStore.addAuthzPathsMapping(newAuthzObj, - Sets.newHashSet(PathsUpdate.concatenatePath(newPathTree)), + Collections.singleton(newPathTree), update); } } else if (!oldLocation.equals(newLocation)) { // Only Location has changed, e.g. Alter table set location - if (oldPathTree != null && newPathTree != null) { + if ((oldPathTree != null) && (newPathTree != null)) { PathsUpdate update = new PathsUpdate(seqNum, false); - update.newPathChange(oldAuthzObj).addToDelPaths(oldPathTree); - update.newPathChange(oldAuthzObj).addToAddPaths(newPathTree); - sentryStore.updateAuthzPathsMapping(oldAuthzObj, PathsUpdate.concatenatePath(oldPathTree), - PathsUpdate.concatenatePath(newPathTree), update); + update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree)); + update.newPathChange(oldAuthzObj).addToAddPaths(splitPath(newPathTree)); + sentryStore.updateAuthzPathsMapping(oldAuthzObj, oldPathTree, + newPathTree, update); } else if (oldPathTree != null) { PathsUpdate update = new PathsUpdate(seqNum, false); - update.newPathChange(oldAuthzObj).addToDelPaths(oldPathTree); + update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree)); sentryStore.deleteAuthzPathsMapping(oldAuthzObj, - Sets.newHashSet(PathsUpdate.concatenatePath(oldPathTree)), + Collections.singleton(oldPathTree), update); } else if (newPathTree != null) { PathsUpdate update = new PathsUpdate(seqNum, false); - update.newPathChange(oldAuthzObj).addToAddPaths(newPathTree); + update.newPathChange(oldAuthzObj).addToAddPaths(splitPath(newPathTree)); sentryStore.addAuthzPathsMapping(oldAuthzObj, - Sets.newHashSet(PathsUpdate.concatenatePath(newPathTree)), + Collections.singleton(newPathTree), update); } } else { @@ -366,12 +368,24 @@ public class NotificationProcessor { * @param path a path * @return the path tree given a path. */ - private List<String> getPath(String path) { + private String getPath(String path) { try { return PathsUpdate.parsePath(path); } catch (SentryMalformedPathException e) { LOGGER.error("Unexpected path while parsing, " + path, e.getMessage()); - return null; } + return null; + } + + /** + * Split path into components on the "/" character. + * The path should not start with "/". + * This is consumed by Thrift interface, so the return result should be + * {@code List<String>} + * @param path input oath e.g. {@code foo/bar} + * @return list of commponents, e.g. [foo, bar] + */ + private List<String> splitPath(String path) { + return (Lists.newArrayList(path.split("/"))); } } http://git-wip-us.apache.org/repos/asf/sentry/blob/bfb0456f/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationAdvanced.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationAdvanced.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationAdvanced.java index e771ce7..7d128b7 100644 --- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationAdvanced.java +++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationAdvanced.java @@ -756,7 +756,7 @@ public class TestHDFSIntegrationAdvanced extends TestHDFSIntegrationBase { // set the default URI scheme to be hdfs. boolean testConfOff = Boolean.valueOf(System.getProperty(EXTERNAL_SENTRY_SERVICE, "false")); if (!testConfOff) { - PathsUpdate.getConfiguration().set("fs.defaultFS", "hdfs:///"); + PathsUpdate.setDefaultScheme("hdfs"); } String dbName = "db1"; String tblName = "tab1";
