SENTRY-1630: out of sequence error in HMSFollower (Alex Kolbasov, reviewed by Vamsee Yarlagadda and Na Li)
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/747c2260 Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/747c2260 Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/747c2260 Branch: refs/heads/sentry-ha-redesign Commit: 747c226013f31d02e623b82902f2bc62a87fc4e9 Parents: 5c1d559 Author: Alexander Kolbasov <[email protected]> Authored: Tue Jul 11 22:20:09 2017 +0200 Committer: Alexander Kolbasov <[email protected]> Committed: Tue Jul 11 22:20:09 2017 +0200 ---------------------------------------------------------------------- .../sentry/hdfs/FullUpdateInitializer.java | 454 ----------------- .../sentry/hdfs/TestFullUpdateInitializer.java | 320 ------------ .../service/thrift/FullUpdateInitializer.java | 492 +++++++++++++++++++ .../apache/sentry/service/thrift/HMSClient.java | 56 +++ .../sentry/service/thrift/HMSFollower.java | 168 ++----- .../service/thrift/HiveConnectionFactory.java | 35 ++ .../thrift/HiveSimpleConnectionFactory.java | 129 +++++ .../service/thrift/SentryKerberosContext.java | 6 +- .../sentry/service/thrift/SentryService.java | 21 +- .../thrift/TestFullUpdateInitializer.java | 346 +++++++++++++ 10 files changed, 1115 insertions(+), 912 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java deleted file mode 100644 index cf9774c..0000000 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java +++ /dev/null @@ -1,454 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.hdfs; - -import com.google.common.collect.ImmutableMap; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Deque; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.Map; -import java.util.HashMap; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -/** - * Manage fetching full snapshot from HMS. - * Snapshot is represented as a map from the hive object name to - * the set of paths for this object. - * The hive object name is either the Hive database name or - * Hive database name joined with Hive table name as {@code dbName.tableName}. - * All table partitions are stored under the table object. - * <p> - * Once {@link FullUpdateInitializer}, the {@link FullUpdateInitializer#getFullHMSSnapshot()} - * method should be called to get the initial update. - * <p> - * It is important to close the {@link FullUpdateInitializer} object to prevent resource - * leaks. - * <p> - * The usual way of using {@link FullUpdateInitializer} is - * <pre> - * {@code - * try (FullUpdateInitializer updateInitializer = - * new FullUpdateInitializer(client, authzConf)) { - * Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot(); - * return pathsUpdate; - * } - */ -public final class FullUpdateInitializer implements AutoCloseable { - - /* - * Implementation note. - * - * The snapshot is obtained using an executor. We follow the map/reduce model. - * Each executor thread (mapper) obtains and returns a partial snapshot which are then - * reduced to a single combined snapshot by getFullHMSSnapshot(). - * - * Synchronization between the getFullHMSSnapshot() and executors is done using the - * 'results' queue. The queue holds the futures for each scheduled task. - * It is initially populated by getFullHMSSnapshot and each task may add new future - * results to it. Only getFullHMSSnapshot() removes entries from the results queue. - * This guarantees that once the results queue is empty there are no pending jobs. - * - * Since there are no other data sharing, the implementation is safe without - * any other synchronization. It is not thread-safe for concurrent calls - * to getFullHMSSnapshot(). - * - */ - - private final ExecutorService threadPool; - private final HiveMetaStoreClient client; - private final int maxPartitionsPerCall; - private final int maxTablesPerCall; - private final Deque<Future<CallResult>> results = new ConcurrentLinkedDeque<>(); - private final int maxRetries; - private final int waitDurationMillis; - - private static final Logger LOGGER = LoggerFactory.getLogger(FullUpdateInitializer.class); - - private static final ObjectMapping emptyObjectMapping = - new ObjectMapping(Collections.<String, Set<String>>emptyMap()); - - /** - * Extract path (not starting with "/") from the full URI - * @param uri - resource URI (usually with scheme) - * @return path if uri is valid or null - */ - private static String pathFromURI(String uri) { - try { - return PathsUpdate.parsePath(uri); - } catch (SentryMalformedPathException e) { - LOGGER.warn(String.format("Ignoring invalid uri %s: %s", - uri, e.getReason())); - return null; - } - } - - /** - * Mapping of object to set of paths. - * Used to represent partial results from executor threads. Multiple - * ObjectMapping objects are combined in a single mapping - * to get the final result. - */ - private static final class ObjectMapping { - private final Map<String, Set<String>> objects; - - ObjectMapping(Map<String, Set<String>> objects) { - this.objects = objects; - } - - ObjectMapping(String authObject, String path) { - Set<String> values = Collections.singleton(safeIntern(path)); - objects = ImmutableMap.of(authObject, values); - } - - ObjectMapping(String authObject, Collection<String> paths) { - Set<String> values = new HashSet<>(paths); - objects = ImmutableMap.of(authObject, values); - } - - Map<String, Set<String>> getObjects() { - return objects; - } - } - - private static final class CallResult { - private final Exception failure; - private final boolean successStatus; - private final ObjectMapping objectMapping; - - CallResult(Exception ex) { - failure = ex; - successStatus = false; - objectMapping = emptyObjectMapping; - } - - CallResult(ObjectMapping objectMapping) { - failure = null; - successStatus = true; - this.objectMapping = objectMapping; - } - - boolean success() { - return successStatus; - } - - ObjectMapping getObjectMapping() { - return objectMapping; - } - - public Exception getFailure() { - return failure; - } - } - - private abstract class BaseTask implements Callable<CallResult> { - - /** - * Class represents retry strategy for BaseTask. - */ - private final class RetryStrategy { - private int retryStrategyMaxRetries = 0; - private final int retryStrategyWaitDurationMillis; - - private RetryStrategy(int retryStrategyMaxRetries, int retryStrategyWaitDurationMillis) { - this.retryStrategyMaxRetries = retryStrategyMaxRetries; - - // Assign default wait duration if negative value is provided. - this.retryStrategyWaitDurationMillis = (retryStrategyWaitDurationMillis > 0) ? - retryStrategyWaitDurationMillis : 1000; - } - - @SuppressWarnings({"squid:S1141", "squid:S2142"}) - public CallResult exec() { - // Retry logic is happening inside callable/task to avoid - // synchronous waiting on getting the result. - // Retry the failure task until reach the max retry number. - // Wait configurable duration for next retry. - // - // Only thrift exceptions are retried. - // Other exceptions are propagated up the stack. - Exception exception = null; - try { - // We catch all exceptions except Thrift exceptions which are retried - for (int i = 0; i < retryStrategyMaxRetries; i++) { - //noinspection NestedTryStatement - try { - return new CallResult(doTask()); - } catch (TException ex) { - LOGGER.debug("Failed to execute task on " + (i + 1) + " attempts." + - " Sleeping for " + retryStrategyWaitDurationMillis + " ms. Exception: " + - ex.toString(), ex); - exception = ex; - - try { - Thread.sleep(retryStrategyWaitDurationMillis); - } catch (InterruptedException ignored) { - // Skip the rest retries if get InterruptedException. - // And set the corresponding retries number. - LOGGER.warn("Interrupted during update fetch during iteration " + (i + 1)); - break; - } - } - } - } catch (Exception ex) { - exception = ex; - } - LOGGER.error("Failed to execute task", exception); - // We will fail in the end, so we are shutting down the pool to prevent - // new tasks from being scheduled. - threadPool.shutdown(); - return new CallResult(exception); - } - } - - private final RetryStrategy retryStrategy; - - BaseTask() { - retryStrategy = new RetryStrategy(maxRetries, waitDurationMillis); - } - - @Override - public CallResult call() throws Exception { - return retryStrategy.exec(); - } - - abstract ObjectMapping doTask() throws TException; - } - - private class PartitionTask extends BaseTask { - private final String dbName; - private final String tblName; - private final String authName; - private final List<String> partNames; - - PartitionTask(String dbName, String tblName, String authName, - List<String> partNames) { - this.dbName = safeIntern(dbName); - this.tblName = safeIntern(tblName); - this.authName = safeIntern(authName); - this.partNames = partNames; - } - - @Override - ObjectMapping doTask() throws TException { - List<Partition> tblParts = client.getPartitionsByNames(dbName, tblName, partNames); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("#### Fetching partitions " + - "[" + dbName + "." + tblName + "]" + "[" + partNames + "]"); - } - Collection<String> partitionNames = new ArrayList<>(tblParts.size()); - for (Partition part : tblParts) { - String partPath = pathFromURI(part.getSd().getLocation()); - if (partPath != null) { - partitionNames.add(partPath.intern()); - } - } - return new ObjectMapping(authName, partitionNames); - } - } - - private class TableTask extends BaseTask { - private final String dbName; - private final List<String> tableNames; - - TableTask(Database db, List<String> tableNames) { - dbName = safeIntern(db.getName()); - this.tableNames = tableNames; - } - - @Override - @SuppressWarnings({"squid:S2629", "squid:S135"}) - ObjectMapping doTask() throws TException { - List<Table> tables = client.getTableObjectsByName(dbName, tableNames); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("#### Fetching tables [" + dbName + "][" + - tableNames + "]"); - } - Map<String, Set<String>> objectMapping = new HashMap<>(tables.size()); - for (Table tbl : tables) { - // Table names are case insensitive - if (!tbl.getDbName().equalsIgnoreCase(dbName)) { - // Inconsistency in HMS data - LOGGER.warn(String.format("DB name %s for table %s does not match %s", - tbl.getDbName(), tbl.getTableName(), dbName)); - continue; - } - - String tableName = safeIntern(tbl.getTableName().toLowerCase()); - String authzObject = (dbName + "." + tableName).intern(); - List<String> tblPartNames = client.listPartitionNames(dbName, tableName, (short) -1); - for (int i = 0; i < tblPartNames.size(); i += maxPartitionsPerCall) { - List<String> partsToFetch = tblPartNames.subList(i, - Math.min(i + maxPartitionsPerCall, tblPartNames.size())); - Callable<CallResult> partTask = new PartitionTask(dbName, - tableName, authzObject, partsToFetch); - results.add(threadPool.submit(partTask)); - } - String tblPath = safeIntern(pathFromURI(tbl.getSd().getLocation())); - if (tblPath == null) { - continue; - } - Set<String> paths = objectMapping.get(authzObject); - if (paths == null) { - paths = new HashSet<>(1); - objectMapping.put(authzObject, paths); - } - paths.add(tblPath); - } - return new ObjectMapping(Collections.unmodifiableMap(objectMapping)); - } - } - - private class DbTask extends BaseTask { - - private final String dbName; - - DbTask(String dbName) { - //Database names are case insensitive - this.dbName = safeIntern(dbName.toLowerCase()); - } - - @Override - ObjectMapping doTask() throws TException { - Database db = client.getDatabase(dbName); - if (!dbName.equalsIgnoreCase(db.getName())) { - LOGGER.warn("Database name {} does not match {}", db.getName(), dbName); - return emptyObjectMapping; - } - List<String> allTblStr = client.getAllTables(dbName); - for (int i = 0; i < allTblStr.size(); i += maxTablesPerCall) { - List<String> tablesToFetch = allTblStr.subList(i, - Math.min(i + maxTablesPerCall, allTblStr.size())); - Callable<CallResult> tableTask = new TableTask(db, tablesToFetch); - results.add(threadPool.submit(tableTask)); - } - String dbPath = safeIntern(pathFromURI(db.getLocationUri())); - return (dbPath != null) ? new ObjectMapping(dbName, dbPath) : - emptyObjectMapping; - } - } - - public FullUpdateInitializer(HiveMetaStoreClient client, Configuration conf) { - this.client = client; - maxPartitionsPerCall = conf.getInt( - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC, - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT); - maxTablesPerCall = conf.getInt( - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC, - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT); - maxRetries = conf.getInt( - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM, - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM_DEFAULT); - waitDurationMillis = conf.getInt( - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS, - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT); - threadPool = Executors.newFixedThreadPool(conf.getInt( - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS, - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT)); - } - - /** - * Get Full HMS snapshot. - * @return Full snapshot of HMS objects. - * @throws TException if Thrift error occured - * @throws ExecutionException if there was a scheduling error - * @throws InterruptedException if processing was interrupted - */ - @SuppressWarnings("squid:S00112") - public Map<String, Set<String>> getFullHMSSnapshot() - throws Exception { - // Get list of all HMS databases - List<String> allDbStr = client.getAllDatabases(); - // Schedule async task for each database responsible for fetching per-database - // objects. - for (String dbName : allDbStr) { - results.add(threadPool.submit(new DbTask(dbName))); - } - - // Resulting full snapshot - Map<String, Set<String>> fullSnapshot = new HashMap<>(); - - // As async tasks complete, merge their results into full snapshot. - while (!results.isEmpty()) { - // This is the only thread that takes elements off the results list - all other threads - // only add to it. Once the list is empty it can't become non-empty - // This means that if we check that results is non-empty we can safely call pop() and - // know that the result of poll() is not null. - Future<CallResult> result = results.pop(); - // Wait for the task to complete - CallResult callResult = result.get(); - // Fail if we got errors - if (!callResult.success()) { - throw callResult.getFailure(); - } - // Merge values into fullUpdate - Map<String, Set<String>> objectMapping = - callResult.getObjectMapping().getObjects(); - for (Map.Entry<String, Set<String>> entry: objectMapping.entrySet()) { - String key = entry.getKey(); - Set<String> val = entry.getValue(); - Set<String> existingSet = fullSnapshot.get(key); - if (existingSet == null) { - fullSnapshot.put(key, val); - continue; - } - existingSet.addAll(val); - } - } - return fullSnapshot; - } - - @Override - public void close() { - threadPool.shutdownNow(); - try { - threadPool.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException ignored) { - LOGGER.warn("Interrupted shutdown"); - Thread.currentThread().interrupt(); - } - } - - /** - * Intern a string but only if it is not null - * @param arg String to be interned, may be null - * @return interned string or null - */ - static String safeIntern(String arg) { - return (arg != null) ? arg.intern() : null; - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java deleted file mode 100644 index 389e9b8..0000000 --- a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java +++ /dev/null @@ -1,320 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.hdfs; - -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.thrift.TException; -import org.junit.Assert; -import org.junit.Test; -import org.mockito.Mockito; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class TestFullUpdateInitializer { - - private static Configuration conf = new Configuration(); - - static { - conf.setInt(ServiceConstants.ServerConfig - .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC, 1); - conf.setInt(ServiceConstants.ServerConfig - .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC, 1); - conf.setInt(ServiceConstants.ServerConfig - .SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS, 8); - } - - /** - * Representation of a Hive table. A table has a name and a list of partitions. - */ - private static class HiveTable { - String name; - List<String> partitions; - - HiveTable(String name) { - this.name = name; - this.partitions = new ArrayList<>(); - } - - HiveTable(String name, List<String> partitions) { - this.name = name; - this.partitions = partitions; - if (this.partitions == null) { - this.partitions = new ArrayList<>(); - } - } - - HiveTable add(String partition) { - partitions.add(partition); - return this; - } - } - - /** - * Representation of a Hive database. A database has a name and a list of tables - */ - private static class HiveDb { - String name; - Collection<HiveTable> tables; - - HiveDb(String name) { - this.name = name; - tables = new ArrayList<>(); - } - - HiveDb(String name, Collection<HiveTable> tables) { - this.name = name; - this.tables = tables; - if (this.tables == null) { - this.tables = new ArrayList<>(); - } - } - - void add(HiveTable table) { - this.tables.add(table); - } - } - - /** - * Representation of a full Hive snapshot. A snapshot is collection of databases - */ - private static class HiveSnapshot { - List<HiveDb> databases = new ArrayList<>(); - - HiveSnapshot() { - } - - HiveSnapshot(Collection<HiveDb> dblist) { - if (dblist != null) { - databases.addAll(dblist); - } - } - - HiveSnapshot add(HiveDb db) { - this.databases.add(db); - return this; - } - } - - /** - * Convert Hive snapshot to mock client that will return proper values - * for the snapshot. - */ - private static class MockClient { - HiveMetaStoreClient client; - - MockClient(HiveSnapshot snapshot) throws TException { - client = Mockito.mock(HiveMetaStoreClient.class); - List<String> dbNames = new ArrayList<>(snapshot.databases.size()); - // Walk over all databases and mock appropriate objects - for (HiveDb mdb: snapshot.databases) { - String dbName = mdb.name; - dbNames.add(dbName); - Database db = makeDb(dbName); - Mockito.when(client.getDatabase(dbName)).thenReturn(db); - List<String> tableNames = new ArrayList<>(mdb.tables.size()); - // Walk over all tables for the database and mock appropriate objects - for (HiveTable table: mdb.tables) { - String tableName = table.name; - tableNames.add(tableName); - Table mockTable = makeTable(dbName, tableName); - Mockito.when(client.getTableObjectsByName(dbName, - Lists.newArrayList(tableName))) - .thenReturn(Lists.newArrayList(mockTable)); - Mockito.when(client.listPartitionNames(dbName, tableName, (short) -1)) - .thenReturn(table.partitions); - // Walk across all partitions and mock appropriate objects - for (String partName: table.partitions) { - Partition p = makePartition(dbName, tableName, partName); - Mockito.when(client.getPartitionsByNames(dbName, tableName, - Lists.<String>newArrayList(partName))) - .thenReturn(Lists.<Partition>newArrayList(p)); - } - } - Mockito.when(client.getAllTables(dbName)).thenReturn(tableNames); - } - // Return all database names - Mockito.when(client.getAllDatabases()).thenReturn(dbNames); - } - } - - /** - * Create mock database with the given name - * @param name Database name - * @return Mock database object - */ - private static Database makeDb(String name) { - Database db = Mockito.mock(Database.class); - Mockito.when(db.getName()).thenReturn(name); - Mockito.when(db.getLocationUri()).thenReturn("hdfs:///" + name); - return db; - } - - /** - * Create mock table - * @param dbName db for this table - * @param tableName name of the table - * @return mock table object - */ - private static Table makeTable(String dbName, String tableName) { - Table table = Mockito.mock(Table.class); - Mockito.when(table.getDbName()).thenReturn(dbName); - Mockito.when(table.getTableName()).thenReturn(tableName); - StorageDescriptor sd = Mockito.mock(StorageDescriptor.class); - Mockito.when(sd.getLocation()).thenReturn( - String.format("hdfs:///%s/%s", dbName, tableName)); - Mockito.when(table.getSd()).thenReturn(sd); - return table; - } - - /** - * Create mock partition - * @param dbName database for this partition - * @param tableName table for this partition - * @param partName partition name - * @return mock partition object - */ - private static Partition makePartition(String dbName, String tableName, String partName) { - Partition partition = Mockito.mock(Partition.class); - StorageDescriptor sd = Mockito.mock(StorageDescriptor.class); - Mockito.when(sd.getLocation()).thenReturn( - String.format("hdfs:///%s/%s/%s", dbName, tableName, partName)); - Mockito.when(partition.getSd()).thenReturn(sd); - return partition; - } - - @Test - // Test basic operation with small database - public void testSimple() throws Exception { - HiveTable tab21 = new HiveTable("tab21"); - HiveTable tab31 = new HiveTable("tab31").add("part311").add("part312"); - HiveDb db3 = new HiveDb("db3", Lists.newArrayList(tab31)); - HiveDb db2 = new HiveDb("db2", Lists.newArrayList(tab21)); - HiveDb db1 = new HiveDb("db1"); - HiveSnapshot snap = new HiveSnapshot().add(db1).add(db2).add(db3); - MockClient c = new MockClient(snap); - - Map<String, Set<String>> update; - try(FullUpdateInitializer cacheInitializer = new FullUpdateInitializer(c.client, conf)) { - update = cacheInitializer.getFullHMSSnapshot(); - } - Assert.assertEquals(5, update.size()); - Assert.assertEquals(Sets.newHashSet("db1"), update.get("db1")); - Assert.assertEquals(Sets.newHashSet("db2"), update.get("db2")); - Assert.assertEquals(Sets.newHashSet("db3"), update.get("db3")); - Assert.assertEquals(Sets.newHashSet("db2/tab21"), update.get("db2.tab21")); - Assert.assertEquals(Sets.newHashSet("db3/tab31", - "db3/tab31/part311", "db3/tab31/part312"), update.get("db3.tab31")); - } - - @Test - // Test that invalid paths are handled correctly - public void testInvalidPaths() throws Exception { - //Set up mocks: db1.tb1, with tb1 returning a wrong dbname (db2) - Database db1 = makeDb("db1"); - - Table tab1 = Mockito.mock(Table.class); - //Return a wrong db name, so that this triggers an exception - Mockito.when(tab1.getDbName()).thenReturn("db2"); - Mockito.when(tab1.getTableName()).thenReturn("tab1"); - - HiveMetaStoreClient client = Mockito.mock(HiveMetaStoreClient.class); - Mockito.when(client.getAllDatabases()).thenReturn(Lists.newArrayList("db1")); - Mockito.when(client.getDatabase("db1")).thenReturn(db1); - - Table tab12 = Mockito.mock(Table.class); - Mockito.when(tab12.getDbName()).thenReturn("db1"); - Mockito.when(tab12.getTableName()).thenReturn("tab21"); - StorageDescriptor sd21 = Mockito.mock(StorageDescriptor.class); - Mockito.when(sd21.getLocation()).thenReturn("hdfs:///db1/tab21"); - Mockito.when(tab12.getSd()).thenReturn(sd21); - - Mockito.when(client.getTableObjectsByName("db1", - Lists.newArrayList("tab1"))).thenReturn(Lists.newArrayList(tab1)); - Mockito.when(client.getTableObjectsByName("db1", - Lists.newArrayList("tab12"))).thenReturn(Lists.newArrayList(tab12)); - Mockito.when(client.getAllTables("db1")). - thenReturn(Lists.newArrayList("tab1", "tab12")); - - - Map<String, Set<String>> update; - try(FullUpdateInitializer cacheInitializer = new FullUpdateInitializer(client, conf)) { - update = cacheInitializer.getFullHMSSnapshot(); - } - Assert.assertEquals(2, update.size()); - Assert.assertEquals(Sets.newHashSet("db1"), update.get("db1")); - Assert.assertEquals(Sets.newHashSet("db1/tab21"), update.get("db1.tab21")); - } - - @Test - // Test handling of a big tables and partitions - public void testBig() throws Exception { - int ndbs = 3; - int ntables = 51; - int nparts = 131; - - HiveSnapshot snap = new HiveSnapshot(); - - for (int i = 0; i < ndbs; i++) { - HiveDb db = new HiveDb("db" + i); - for (int j = 0; j < ntables; j++) { - HiveTable table = new HiveTable("table" + i + j); - for (int k = 0; k < nparts; k++) { - table.add("part" + i + j + k); - } - db.add(table); - } - snap.add(db); - } - MockClient c = new MockClient(snap); - Map<String, Set<String>> update; - try(FullUpdateInitializer cacheInitializer = new FullUpdateInitializer(c.client, conf)) { - update = cacheInitializer.getFullHMSSnapshot(); - } - Assert.assertEquals((ntables * ndbs) + ndbs, update.size()); - for (int i = 0; i < ndbs; i++) { - String dbName = "db" + i; - Assert.assertEquals(Sets.newHashSet(dbName), update.get(dbName)); - - for (int j = 0; j < ntables; j++) { - String tableName = "table" + i + j; - Set<String> values = new HashSet<>(); - values.add(String.format("%s/%s", dbName, tableName)); - for (int k = 0; k < nparts; k++) { - String partName = "part" + i + j + k; - values.add(String.format("%s/%s/%s", dbName, tableName, partName)); - } - String authz = dbName + "." + tableName; - Assert.assertEquals(values, update.get(authz)); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java new file mode 100644 index 0000000..1490581 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java @@ -0,0 +1,492 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.service.thrift; + +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.sentry.hdfs.PathsUpdate; +import org.apache.sentry.hdfs.SentryMalformedPathException; +import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Manage fetching full snapshot from HMS. + * Snapshot is represented as a map from the hive object name to + * the set of paths for this object. + * The hive object name is either the Hive database name or + * Hive database name joined with Hive table name as {@code dbName.tableName}. + * All table partitions are stored under the table object. + * <p> + * Once {@link FullUpdateInitializer}, the {@link FullUpdateInitializer#getFullHMSSnapshot()} + * method should be called to get the initial update. + * <p> + * It is important to close the {@link FullUpdateInitializer} object to prevent resource + * leaks. + * <p> + * The usual way of using {@link FullUpdateInitializer} is + * <pre> + * {@code + * try (FullUpdateInitializer updateInitializer = + * new FullUpdateInitializer(clientFactory, authzConf)) { + * Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot(); + * return pathsUpdate; + * } + */ +public final class FullUpdateInitializer implements AutoCloseable { + + /* + * Implementation note. + * + * The snapshot is obtained using an executor. We follow the map/reduce model. + * Each executor thread (mapper) obtains and returns a partial snapshot which are then + * reduced to a single combined snapshot by getFullHMSSnapshot(). + * + * Synchronization between the getFullHMSSnapshot() and executors is done using the + * 'results' queue. The queue holds the futures for each scheduled task. + * It is initially populated by getFullHMSSnapshot and each task may add new future + * results to it. Only getFullHMSSnapshot() removes entries from the results queue. + * This guarantees that once the results queue is empty there are no pending jobs. + * + * Since there are no other data sharing, the implementation is safe without + * any other synchronization. It is not thread-safe for concurrent calls + * to getFullHMSSnapshot(). + * + */ + + private final ExecutorService threadPool; + private final int maxPartitionsPerCall; + private final int maxTablesPerCall; + private final Deque<Future<CallResult>> results = new ConcurrentLinkedDeque<>(); + private final int maxRetries; + private final int waitDurationMillis; + + private static final Logger LOGGER = LoggerFactory.getLogger(FullUpdateInitializer.class); + + private static final ObjectMapping emptyObjectMapping = + new ObjectMapping(Collections.<String, Set<String>>emptyMap()); + private final HiveConnectionFactory clientFactory; + + /** + * Extract path (not starting with "/") from the full URI + * @param uri - resource URI (usually with scheme) + * @return path if uri is valid or null + */ + private static String pathFromURI(String uri) { + try { + return PathsUpdate.parsePath(uri); + } catch (SentryMalformedPathException e) { + LOGGER.warn(String.format("Ignoring invalid uri %s: %s", + uri, e.getReason())); + return null; + } + } + + /** + * Mapping of object to set of paths. + * Used to represent partial results from executor threads. Multiple + * ObjectMapping objects are combined in a single mapping + * to get the final result. + */ + private static final class ObjectMapping { + private final Map<String, Set<String>> objects; + + ObjectMapping(Map<String, Set<String>> objects) { + this.objects = objects; + } + + ObjectMapping(String authObject, String path) { + Set<String> values = Collections.singleton(safeIntern(path)); + objects = ImmutableMap.of(authObject, values); + } + + ObjectMapping(String authObject, Collection<String> paths) { + Set<String> values = new HashSet<>(paths); + objects = ImmutableMap.of(authObject, values); + } + + Map<String, Set<String>> getObjects() { + return objects; + } + } + + private static final class CallResult { + private final Exception failure; + private final boolean successStatus; + private final ObjectMapping objectMapping; + + CallResult(Exception ex) { + failure = ex; + successStatus = false; + objectMapping = emptyObjectMapping; + } + + CallResult(ObjectMapping objectMapping) { + failure = null; + successStatus = true; + this.objectMapping = objectMapping; + } + + boolean success() { + return successStatus; + } + + ObjectMapping getObjectMapping() { + return objectMapping; + } + + public Exception getFailure() { + return failure; + } + } + + private abstract class BaseTask implements Callable<CallResult> { + + /** + * Class represents retry strategy for BaseTask. + */ + private final class RetryStrategy { + private int retryStrategyMaxRetries = 0; + private final int retryStrategyWaitDurationMillis; + + private RetryStrategy(int retryStrategyMaxRetries, int retryStrategyWaitDurationMillis) { + this.retryStrategyMaxRetries = retryStrategyMaxRetries; + + // Assign default wait duration if negative value is provided. + this.retryStrategyWaitDurationMillis = (retryStrategyWaitDurationMillis > 0) ? + retryStrategyWaitDurationMillis : 1000; + } + + @SuppressWarnings({"squid:S1141", "squid:S2142"}) + public CallResult exec() { + // Retry logic is happening inside callable/task to avoid + // synchronous waiting on getting the result. + // Retry the failure task until reach the max retry number. + // Wait configurable duration for next retry. + // + // Only thrift exceptions are retried. + // Other exceptions are propagated up the stack. + Exception exception = null; + try { + // We catch all exceptions except Thrift exceptions which are retried + for (int i = 0; i < retryStrategyMaxRetries; i++) { + //noinspection NestedTryStatement + try { + return new CallResult(doTask()); + } catch (TException ex) { + LOGGER.debug("Failed to execute task on " + (i + 1) + " attempts." + + " Sleeping for " + retryStrategyWaitDurationMillis + " ms. Exception: " + + ex.toString(), ex); + exception = ex; + + try { + Thread.sleep(retryStrategyWaitDurationMillis); + } catch (InterruptedException ignored) { + // Skip the rest retries if get InterruptedException. + // And set the corresponding retries number. + LOGGER.warn("Interrupted during update fetch during iteration " + (i + 1)); + break; + } + } + } + } catch (Exception ex) { + exception = ex; + } + LOGGER.error("Failed to execute task", exception); + // We will fail in the end, so we are shutting down the pool to prevent + // new tasks from being scheduled. + threadPool.shutdown(); + return new CallResult(exception); + } + } + + private final RetryStrategy retryStrategy; + + BaseTask() { + retryStrategy = new RetryStrategy(maxRetries, waitDurationMillis); + } + + @Override + public CallResult call() throws Exception { + return retryStrategy.exec(); + } + + abstract ObjectMapping doTask() throws Exception; + } + + private class PartitionTask extends BaseTask { + private final String dbName; + private final String tblName; + private final String authName; + private final List<String> partNames; + + PartitionTask(String dbName, String tblName, String authName, + List<String> partNames) { + this.dbName = safeIntern(dbName); + this.tblName = safeIntern(tblName); + this.authName = safeIntern(authName); + this.partNames = partNames; + } + + @Override + ObjectMapping doTask() throws Exception { + List<Partition> tblParts; + HMSClient c = null; + try (HMSClient client = clientFactory.connect()) { + c = client; + tblParts = client.getClient().getPartitionsByNames(dbName, tblName, partNames); + } catch (Exception e) { + if (c != null) { + c.invalidate(); + } + throw e; + } + + LOGGER.debug("Fetched partitions for db = {}, table = {}", + dbName, tblName); + + Collection<String> partitionNames = new ArrayList<>(tblParts.size()); + for (Partition part : tblParts) { + String partPath = pathFromURI(part.getSd().getLocation()); + if (partPath != null) { + partitionNames.add(partPath.intern()); + } + } + return new ObjectMapping(authName, partitionNames); + } + } + + private class TableTask extends BaseTask { + private final String dbName; + private final List<String> tableNames; + + TableTask(Database db, List<String> tableNames) { + dbName = safeIntern(db.getName()); + this.tableNames = tableNames; + } + + @Override + @SuppressWarnings({"squid:S2629", "squid:S135"}) + ObjectMapping doTask() throws Exception { + HMSClient c = null; + try (HMSClient client = clientFactory.connect()) { + c = client; + List<Table> tables = client.getClient().getTableObjectsByName(dbName, tableNames); + + LOGGER.debug("Fetching tables for db = {}, tables = {}", dbName, tableNames); + + Map<String, Set<String>> objectMapping = new HashMap<>(tables.size()); + for (Table tbl : tables) { + // Table names are case insensitive + if (!tbl.getDbName().equalsIgnoreCase(dbName)) { + // Inconsistency in HMS data + LOGGER.warn(String.format("DB name %s for table %s does not match %s", + tbl.getDbName(), tbl.getTableName(), dbName)); + continue; + } + + String tableName = safeIntern(tbl.getTableName().toLowerCase()); + String authzObject = (dbName + "." + tableName).intern(); + List<String> tblPartNames = client.getClient().listPartitionNames(dbName, tableName, (short) -1); + for (int i = 0; i < tblPartNames.size(); i += maxPartitionsPerCall) { + List<String> partsToFetch = tblPartNames.subList(i, + Math.min(i + maxPartitionsPerCall, tblPartNames.size())); + Callable<CallResult> partTask = new PartitionTask(dbName, + tableName, authzObject, partsToFetch); + results.add(threadPool.submit(partTask)); + } + String tblPath = safeIntern(pathFromURI(tbl.getSd().getLocation())); + if (tblPath == null) { + continue; + } + Set<String> paths = objectMapping.get(authzObject); + if (paths == null) { + paths = new HashSet<>(1); + objectMapping.put(authzObject, paths); + } + paths.add(tblPath); + } + return new ObjectMapping(Collections.unmodifiableMap(objectMapping)); + } catch (Exception e) { + if (c != null) { + c.invalidate(); + } + throw e; + } + } + } + + private class DbTask extends BaseTask { + + private final String dbName; + + DbTask(String dbName) { + //Database names are case insensitive + this.dbName = safeIntern(dbName.toLowerCase()); + } + + @Override + ObjectMapping doTask() throws Exception { + HMSClient c = null; + try (HMSClient client = clientFactory.connect()) { + c = client; + Database db = client.getClient().getDatabase(dbName); + if (!dbName.equalsIgnoreCase(db.getName())) { + LOGGER.warn("Database name {} does not match {}", db.getName(), dbName); + return emptyObjectMapping; + } + List<String> allTblStr = client.getClient().getAllTables(dbName); + for (int i = 0; i < allTblStr.size(); i += maxTablesPerCall) { + List<String> tablesToFetch = allTblStr.subList(i, + Math.min(i + maxTablesPerCall, allTblStr.size())); + Callable<CallResult> tableTask = new TableTask(db, tablesToFetch); + results.add(threadPool.submit(tableTask)); + } + String dbPath = safeIntern(pathFromURI(db.getLocationUri())); + return (dbPath != null) ? new ObjectMapping(dbName, dbPath) : + emptyObjectMapping; + } catch (Exception e) { + if (c != null) { + c.invalidate(); + } + throw e; + } + } + } + + FullUpdateInitializer(HiveConnectionFactory clientFactory, Configuration conf) { + this.clientFactory = clientFactory; + maxPartitionsPerCall = conf.getInt( + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC, + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT); + maxTablesPerCall = conf.getInt( + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC, + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT); + maxRetries = conf.getInt( + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM, + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM_DEFAULT); + waitDurationMillis = conf.getInt( + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS, + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT); + threadPool = Executors.newFixedThreadPool(conf.getInt( + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS, + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT)); + } + + /** + * Get Full HMS snapshot. + * @return Full snapshot of HMS objects. + * @throws TException if Thrift error occured + * @throws ExecutionException if there was a scheduling error + * @throws InterruptedException if processing was interrupted + */ + @SuppressWarnings("squid:S00112") + Map<String, Set<String>> getFullHMSSnapshot() throws Exception { + // Get list of all HMS databases + List<String> allDbStr; + HMSClient c = null; + try (HMSClient client = clientFactory.connect()) { + c = client; + allDbStr = client.getClient().getAllDatabases(); + } catch (Exception e) { + if (c != null) { + c.invalidate(); + } + throw e; + } + + // Schedule async task for each database responsible for fetching per-database + // objects. + for (String dbName : allDbStr) { + results.add(threadPool.submit(new DbTask(dbName))); + } + + // Resulting full snapshot + Map<String, Set<String>> fullSnapshot = new HashMap<>(); + + // As async tasks complete, merge their results into full snapshot. + while (!results.isEmpty()) { + // This is the only thread that takes elements off the results list - all other threads + // only add to it. Once the list is empty it can't become non-empty + // This means that if we check that results is non-empty we can safely call pop() and + // know that the result of poll() is not null. + Future<CallResult> result = results.pop(); + // Wait for the task to complete + CallResult callResult = result.get(); + // Fail if we got errors + if (!callResult.success()) { + throw callResult.getFailure(); + } + // Merge values into fullUpdate + Map<String, Set<String>> objectMapping = + callResult.getObjectMapping().getObjects(); + for (Map.Entry<String, Set<String>> entry: objectMapping.entrySet()) { + String key = entry.getKey(); + Set<String> val = entry.getValue(); + Set<String> existingSet = fullSnapshot.get(key); + if (existingSet == null) { + fullSnapshot.put(key, val); + continue; + } + existingSet.addAll(val); + } + } + return fullSnapshot; + } + + @Override + public void close() { + threadPool.shutdownNow(); + try { + threadPool.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException ignored) { + LOGGER.warn("Interrupted shutdown"); + Thread.currentThread().interrupt(); + } + } + + /** + * Intern a string but only if it is not null + * @param arg String to be interned, may be null + * @return interned string or null + */ + static String safeIntern(String arg) { + return (arg != null) ? arg.intern() : null; + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java new file mode 100644 index 0000000..86ff47e --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.service.thrift; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; + +/** + * AutoCloseable wrapper around HiveMetaStoreClient. + * It is only used to provide try-with-resource semantics for + * {@link HiveMetaStoreClient}. + */ +class HMSClient implements AutoCloseable { + private final HiveMetaStoreClient client; + private boolean valid; + + HMSClient(HiveMetaStoreClient client) { + this.client = Preconditions.checkNotNull(client); + valid = true; + } + + public HiveMetaStoreClient getClient() { + return client; + } + + public void invalidate() { + if (valid) { + client.close(); + valid = false; + } + } + + @Override + public void close() { + if (valid) { + client.close(); + valid = false; + } + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java index 1f7eb18..2d581f7 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java @@ -18,7 +18,6 @@ package org.apache.sentry.service.thrift; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -27,17 +26,12 @@ import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SaslRpcServer; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.hcatalog.messaging.HCatEventMessage; import org.apache.sentry.binding.hive.conf.HiveAuthzConf; import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException; import org.apache.sentry.core.common.exception.SentryInvalidInputException; import org.apache.sentry.core.common.exception.SentryNoSuchObjectException; import org.apache.sentry.hdfs.PermissionsUpdate; -import org.apache.sentry.hdfs.FullUpdateInitializer; import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges; import org.apache.sentry.provider.db.SentryPolicyStorePlugin; import org.apache.sentry.provider.db.service.persistent.SentryStore; @@ -50,10 +44,8 @@ import org.apache.sentry.binding.metastore.messaging.json.*; import javax.jdo.JDODataStoreException; import javax.security.auth.login.LoginException; -import java.io.File; import java.io.IOException; import java.net.SocketException; -import java.security.PrivilegedExceptionAction; import java.util.Collections; import java.util.List; import java.util.Map; @@ -73,32 +65,33 @@ import static org.apache.sentry.hdfs.Updateable.Update; @SuppressWarnings("PMD") public class HMSFollower implements Runnable, AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(HMSFollower.class); + private HiveSimpleConnectionFactory hiveConnectionFactory; // Track the latest eventId of the event that has been logged. So we don't log the same message private long lastLoggedEventId = SentryStore.EMPTY_CHANGE_ID; private static boolean connectedToHMS = false; - private HiveMetaStoreClient client; - private SentryKerberosContext kerberosContext; + private HMSClient client; private final Configuration authzConf; - private boolean kerberos; private final SentryStore sentryStore; private String hiveInstance; private boolean needLogHMSSupportReady = true; private final LeaderStatusMonitor leaderMonitor; - HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor) { - LOGGER.info("HMSFollower is being initialized"); + HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor, + HiveSimpleConnectionFactory hiveConnectionFactory) { authzConf = conf; this.leaderMonitor = leaderMonitor; sentryStore = store; + this.hiveConnectionFactory = hiveConnectionFactory; } @VisibleForTesting - HMSFollower(Configuration conf, SentryStore sentryStore, String hiveInstance) { - this.authzConf = conf; - this.sentryStore = sentryStore; + HMSFollower(Configuration conf, SentryStore sentryStore, String hiveInstance) + throws IOException, LoginException { + this(conf, sentryStore, null, null); this.hiveInstance = hiveInstance; - this.leaderMonitor = null; + hiveConnectionFactory = new HiveSimpleConnectionFactory(conf, new HiveConf()); + hiveConnectionFactory.init(); } @VisibleForTesting @@ -110,6 +103,11 @@ public class HMSFollower implements Runnable, AutoCloseable { public void close() { // Close any outstanding connections to HMS closeHMSConnection(); + try { + hiveConnectionFactory.close(); + } catch (Exception e) { + LOGGER.error("failed to close Hive Connection Factory", e); + } } /** @@ -117,77 +115,13 @@ public class HMSFollower implements Runnable, AutoCloseable { * Throws @LoginException if Kerberos context creation failed using Sentry's kerberos credentials * Throws @MetaException if there was a problem on creating an HMSClient */ - private HiveMetaStoreClient getMetaStoreClient(Configuration conf) - throws IOException, InterruptedException, LoginException, MetaException { - if (client != null) { - return client; - } - - final HiveConf hiveConf = new HiveConf(); - hiveInstance = hiveConf.get(HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME.getVar()); - - String principal, keytab; - - //TODO: Is this the right(standard) way to create a HMS client? HiveMetastoreClientFactoryImpl? - //TODO: Check if HMS is using kerberos instead of relying on Sentry conf - kerberos = ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase( - conf.get(ServiceConstants.ServerConfig.SECURITY_MODE, ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS).trim()); - if (kerberos) { - LOGGER.info("Making a kerberos connection to HMS"); - try { - int port = conf.getInt(ServiceConstants.ServerConfig.RPC_PORT, ServiceConstants.ServerConfig.RPC_PORT_DEFAULT); - String rawPrincipal = Preconditions.checkNotNull(conf.get(ServiceConstants.ServerConfig.PRINCIPAL), - ServiceConstants.ServerConfig.PRINCIPAL + " is required"); - principal = SecurityUtil.getServerPrincipal(rawPrincipal, NetUtils.createSocketAddr( - conf.get(ServiceConstants.ServerConfig.RPC_ADDRESS, ServiceConstants.ServerConfig.RPC_ADDRESS_DEFAULT), port).getAddress()); - } catch (IOException io) { - throw new RuntimeException("Can't translate kerberos principal'", io); - } - - LOGGER.info("Using kerberos principal: " + principal); - final String[] principalParts = SaslRpcServer.splitKerberosName(principal); - Preconditions.checkArgument(principalParts.length == 3, - "Kerberos principal should have 3 parts: " + principal); - - keytab = Preconditions.checkNotNull(conf.get(ServiceConstants.ServerConfig.KEY_TAB), - ServiceConstants.ServerConfig.KEY_TAB + " is required"); - File keytabFile = new File(keytab); - Preconditions.checkState(keytabFile.isFile() && keytabFile.canRead(), - "Keytab " + keytab + " does not exist or is not readable."); - - try { - // Instantiating SentryKerberosContext in non-server mode handles the ticket renewal. - kerberosContext = new SentryKerberosContext(principal, keytab, false); - - UserGroupInformation.setConfiguration(hiveConf); - UserGroupInformation clientUGI = UserGroupInformation.getUGIFromSubject(kerberosContext.getSubject()); - - // HiveMetaStoreClient handles the connection retry logic to HMS and can be configured using properties: - // hive.metastore.connect.retries, hive.metastore.client.connect.retry.delay - client = clientUGI.doAs(new PrivilegedExceptionAction<HiveMetaStoreClient>() { - @Override - public HiveMetaStoreClient run() throws Exception { - return new HiveMetaStoreClient(hiveConf); - } - }); - LOGGER.info("Secure connection established with HMS"); - } catch (LoginException e) { - // Kerberos login failed - LOGGER.error("Failed to setup kerberos context."); - throw e; - } finally { - // Shutdown kerberos context if HMS connection failed to setup to avoid thread leaks. - if ((kerberosContext != null) && (client == null)) { - kerberosContext.shutDown(); - kerberosContext = null; - } - } - } else { - //This is only for testing purposes. Sentry strongly recommends strong authentication - client = new HiveMetaStoreClient(hiveConf); - LOGGER.info("Established non-secure connection with HMS"); + private HiveMetaStoreClient getMetaStoreClient() + throws IOException, InterruptedException, MetaException { + if (client == null) { + client = hiveConnectionFactory.connect(); + connectedToHMS = true; } - return client; + return client.getClient(); } @Override @@ -209,7 +143,7 @@ public class HMSFollower implements Runnable, AutoCloseable { closeHMSConnection(); return; } - processHiveMetastoreUpdates(lastProcessedNotificationID); + processHiveMetastoreUpdates(); } /** @@ -236,26 +170,11 @@ public class HMSFollower implements Runnable, AutoCloseable { * * Clients connections waiting for an event notification will be woken up afterwards. */ - private void processHiveMetastoreUpdates(Long lastProcessedNotificationID) { - if (client == null) { - try { - client = getMetaStoreClient(authzConf); - if (client == null) { - //TODO: Do we want to throw an exception after a certain timeout? - return; - } else { - connectedToHMS = true; - LOGGER.info("HMSFollower of Sentry successfully connected to HMS"); - } - } catch (Throwable e) { - LOGGER.error("HMSFollower cannot connect to HMS!!", e); - return; - } - } - + private void processHiveMetastoreUpdates() { try { // Decision of taking full snapshot is based on AuthzPathsMapping information persisted // in the sentry persistent store. If AuthzPathsMapping is empty, shapshot is needed. + Long lastProcessedNotificationID; if (sentryStore.isAuthzPathsMappingEmpty()) { // TODO: expose time used for full update in the metrics @@ -270,27 +189,26 @@ public class HMSFollower implements Runnable, AutoCloseable { // will be dropped. A new attempts will be made after 500 milliseconds when // HMSFollower run again. - Map<String, Set<String>> pathsFullSnapshot; - CurrentNotificationEventId eventIDBefore = client.getCurrentNotificationEventId(); - LOGGER.info(String.format("Before fetching hive full snapshot, Current NotificationID = %s.", eventIDBefore)); + CurrentNotificationEventId eventIDBefore = getMetaStoreClient().getCurrentNotificationEventId(); + LOGGER.info("Before fetching hive full snapshot, Current NotificationID = {}", eventIDBefore); - pathsFullSnapshot = fetchFullUpdate(); + Map<String, Set<String>> pathsFullSnapshot = fetchFullUpdate(); if(pathsFullSnapshot.isEmpty()) { LOGGER.info("Hive full snapshot is Empty. Perhaps, HMS does not have any data"); return; } - CurrentNotificationEventId eventIDAfter = client.getCurrentNotificationEventId(); - LOGGER.info(String.format("After fetching hive full snapshot, Current NotificationID = %s.", eventIDAfter)); + CurrentNotificationEventId eventIDAfter = getMetaStoreClient().getCurrentNotificationEventId(); + LOGGER.info("After fetching hive full snapshot, Current NotificationID = {}", eventIDAfter); if (!eventIDBefore.equals(eventIDAfter)) { - LOGGER.error("#### Fail to get a point-in-time hive full snapshot !! Current NotificationID = " + - eventIDAfter.toString()); + LOGGER.error("Fail to get a point-in-time hive full snapshot. Current ID = {}", + eventIDAfter); return; } - LOGGER.info(String.format("Successfully fetched hive full snapshot, Current NotificationID = %s.", - eventIDAfter)); + LOGGER.info("Successfully fetched hive full snapshot, Current NotificationID = {}", + eventIDAfter); // As eventIDAfter is the last event that was processed, eventIDAfter is used to update // lastProcessedNotificationID instead of getting it from persistent store. lastProcessedNotificationID = eventIDAfter.getEventId(); @@ -314,18 +232,18 @@ public class HMSFollower implements Runnable, AutoCloseable { // HIVE-15761: Currently getNextNotification API may return an empty // NotificationEventResponse causing TProtocolException. // Workaround: Only processes the notification events newer than the last updated one. - CurrentNotificationEventId eventId = client.getCurrentNotificationEventId(); + CurrentNotificationEventId eventId = getMetaStoreClient().getCurrentNotificationEventId(); LOGGER.debug("Last Notification in HMS {} lastProcessedNotificationID is {}", eventId.getEventId(), lastProcessedNotificationID); if (eventId.getEventId() > lastProcessedNotificationID) { NotificationEventResponse response = - client.getNextNotification(lastProcessedNotificationID, Integer.MAX_VALUE, null); + getMetaStoreClient().getNextNotification(lastProcessedNotificationID, Integer.MAX_VALUE, null); if (response.isSetEvents()) { if (!response.getEvents().isEmpty()) { if (lastProcessedNotificationID != lastLoggedEventId) { // Only log when there are updates and the notification ID has changed. - LOGGER.debug(String.format("lastProcessedNotificationID = %s. Processing %s events", - lastProcessedNotificationID, response.getEvents().size())); + LOGGER.debug("lastProcessedNotificationID = {}. Processing {} events", + lastProcessedNotificationID, response.getEvents().size()); lastLoggedEventId = lastProcessedNotificationID; } @@ -337,6 +255,7 @@ public class HMSFollower implements Runnable, AutoCloseable { // If the underlying exception is around socket exception, it is better to retry connection to HMS if (e.getCause() instanceof SocketException) { LOGGER.error("Encountered Socket Exception during fetching Notification entries, will reconnect to HMS", e); + client.invalidate(); closeHMSConnection(); } else { LOGGER.error("ThriftException occured fetching Notification entries, will try", e); @@ -360,16 +279,10 @@ public class HMSFollower implements Runnable, AutoCloseable { if (client != null) { LOGGER.info("Closing the HMS client connection"); client.close(); + connectedToHMS = false; } - if (kerberosContext != null) { - LOGGER.info("Shutting down kerberos context associated with the HMS client connection"); - kerberosContext.shutDown(); - } - } catch (LoginException le) { - LOGGER.warn("Failed to stop kerberos context (potential to cause thread leak)", le); } finally { client = null; - kerberosContext = null; } } @@ -385,7 +298,8 @@ public class HMSFollower implements Runnable, AutoCloseable { private Map<String, Set<String>> fetchFullUpdate() throws TException, ExecutionException { LOGGER.info("Request full HMS snapshot"); - try (FullUpdateInitializer updateInitializer = new FullUpdateInitializer(client, authzConf)) { + try (FullUpdateInitializer updateInitializer = + new FullUpdateInitializer(hiveConnectionFactory, authzConf)) { Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot(); LOGGER.info("Obtained full HMS snapshot"); return pathsUpdate; http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java new file mode 100644 index 0000000..62542c3 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.service.thrift; + +import org.apache.hadoop.hive.metastore.api.MetaException; + +import java.io.IOException; + +public interface HiveConnectionFactory extends AutoCloseable { + /** + * Open a new connection to HMS. + * + * @return connection to HMS. + * @throws IOException if connection establishement failed. + * @throws InterruptedException if connection establishment was interrupted. + * @throws MetaException if connection establishement failed. + */ + HMSClient connect() throws IOException, InterruptedException, MetaException; +} http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java new file mode 100644 index 0000000..3d67401 --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.service.thrift; + +import com.google.common.base.Preconditions; + +import java.io.File; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import javax.security.auth.login.LoginException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Factory used to generate Hive connections. + * Supports insecure and Kerberos connections. + * For Kerberos connections starts the ticket renewal thread and sets + * up Kerberos credentials. + */ +public final class HiveSimpleConnectionFactory implements HiveConnectionFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(HiveSimpleConnectionFactory.class); + + /** Sentty configuration */ + private final Configuration conf; + /** Hive configuration */ + private final HiveConf hiveConf; + private final boolean insecure; + private SentryKerberosContext kerberosContext = null; + + HiveSimpleConnectionFactory(Configuration sentryConf, HiveConf hiveConf) { + this.conf = sentryConf; + this.hiveConf = hiveConf; + insecure = !ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase( + sentryConf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_NONE).trim()); + } + + /** + * Initialize the Factory. + * For insecure connections there is nothing to initialize. + * For Kerberos connections sets up ticket renewal thread. + * @throws IOException + * @throws LoginException + */ + void init() throws IOException, LoginException { + if (insecure) { + LOGGER.info("Using insecure connection to HMS"); + return; + } + + int port = conf.getInt(ServerConfig.RPC_PORT, ServerConfig.RPC_PORT_DEFAULT); + String rawPrincipal = Preconditions.checkNotNull(conf.get(ServerConfig.PRINCIPAL), + "%s is required", ServerConfig.PRINCIPAL); + String principal = SecurityUtil.getServerPrincipal(rawPrincipal, NetUtils.createSocketAddr( + conf.get(ServerConfig.RPC_ADDRESS, ServerConfig.RPC_ADDRESS_DEFAULT), + port).getAddress()); + LOGGER.debug("Opening kerberos connection to HMS using kerberos principal {}", principal); + String[] principalParts = SaslRpcServer.splitKerberosName(principal); + Preconditions.checkArgument(principalParts.length == 3, + "Kerberos principal %s should have 3 parts", principal); + String keytab = Preconditions.checkNotNull(conf.get(ServerConfig.KEY_TAB), + "Configuration is missing required %s paraeter", ServerConfig.KEY_TAB); + File keytabFile = new File(keytab); + Preconditions.checkState(keytabFile.isFile() && keytabFile.canRead(), + "Keytab %s does not exist or is not readable", keytab); + // Instantiating SentryKerberosContext in non-server mode handles the ticket renewal. + kerberosContext = new SentryKerberosContext(principal, keytab, false); + UserGroupInformation.setConfiguration(conf); + LOGGER.info("Using secure connection to HMS"); + } + + /** + * Connect to HMS in unsecure mode or in Kerberos mode according to config. + * + * @return HMS connection + * @throws IOException if could not establish connection + * @throws InterruptedException if connection was interrupted + * @throws MetaException if other errors happened + */ + public HMSClient connect() throws IOException, InterruptedException, MetaException { + if (insecure) { + return new HMSClient(new HiveMetaStoreClient(hiveConf)); + } + UserGroupInformation clientUGI = + UserGroupInformation.getUGIFromSubject(kerberosContext.getSubject()); + return new HMSClient(clientUGI.doAs(new PrivilegedExceptionAction<HiveMetaStoreClient>() { + @Override + public HiveMetaStoreClient run() throws MetaException { + return new HiveMetaStoreClient(hiveConf); + } + })); + } + + @Override + public void close() throws Exception { + if (kerberosContext != null) { + kerberosContext.shutDown(); + kerberosContext = null; + } + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryKerberosContext.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryKerberosContext.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryKerberosContext.java index 8d78d1d..edb8006 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryKerberosContext.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryKerberosContext.java @@ -40,9 +40,9 @@ public class SentryKerberosContext implements Runnable { private LoginContext loginContext; private Subject subject; private final javax.security.auth.login.Configuration kerberosConfig; - @Deprecated + private Thread renewerThread; - @Deprecated + private boolean shutDownRenewer = false; public SentryKerberosContext(String principal, String keyTab, boolean server) @@ -113,7 +113,6 @@ public class SentryKerberosContext implements Runnable { * Ticket renewer thread * wait till 80% time interval left on the ticket and then renew it */ - @Deprecated @Override public void run() { try { @@ -145,7 +144,6 @@ public class SentryKerberosContext implements Runnable { } } - @Deprecated public void startRenewerThread() { renewerThread = new Thread(this); renewerThread.start(); http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java index ec938da..322197b 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java @@ -40,6 +40,7 @@ import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; @@ -74,6 +75,7 @@ import static org.apache.sentry.core.common.utils.SigUtils.registerSigListener; public class SentryService implements Callable, SigUtils.SigListener { private static final Logger LOGGER = LoggerFactory.getLogger(SentryService.class); + private HiveSimpleConnectionFactory hiveConnectionFactory; private enum Status { NOT_STARTED, @@ -276,7 +278,7 @@ public class SentryService implements Callable, SigUtils.SigListener { thriftServer.serve(); } - private void startHMSFollower(Configuration conf) throws Exception{ + private void startHMSFollower(Configuration conf) throws Exception { if (!hdfsSyncEnabled) { LOGGER.info("HMS follower is not started because HDFS sync is disabled."); return; @@ -296,13 +298,11 @@ public class SentryService implements Callable, SigUtils.SigListener { Preconditions.checkState(hmsFollower == null); Preconditions.checkState(hmsFollowerExecutor == null); + Preconditions.checkState(hiveConnectionFactory == null); - try { - hmsFollower = new HMSFollower(conf, sentryStore, leaderMonitor); - } catch (Exception ex) { - LOGGER.error("Could not create HMSFollower", ex); - throw ex; - } + hiveConnectionFactory = new HiveSimpleConnectionFactory(conf, new HiveConf()); + hiveConnectionFactory.init(); + hmsFollower = new HMSFollower(conf, sentryStore, leaderMonitor, hiveConnectionFactory); long initDelay = conf.getLong(ServerConfig.SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS, ServerConfig.SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS_DEFAULT); @@ -334,6 +334,7 @@ public class SentryService implements Callable, SigUtils.SigListener { Preconditions.checkNotNull(hmsFollowerExecutor); Preconditions.checkNotNull(hmsFollower); + Preconditions.checkNotNull(hiveConnectionFactory); // use follower scheduling interval as timeout for shutting down its executor as // such scheduling interval should be an upper bound of how long the task normally takes to finish @@ -343,7 +344,13 @@ public class SentryService implements Callable, SigUtils.SigListener { SentryServiceUtil.shutdownAndAwaitTermination(hmsFollowerExecutor, "hmsFollowerExecutor", timeoutValue, TimeUnit.MILLISECONDS, LOGGER); } finally { + try { + hiveConnectionFactory.close(); + } catch (Exception e) { + LOGGER.error("Can't close HiveConnectionFactory", e); + } hmsFollowerExecutor = null; + hiveConnectionFactory = null; try { // close connections hmsFollower.close();
