http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStoreSchemaInfo.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStoreSchemaInfo.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStoreSchemaInfo.java deleted file mode 100644 index ce76a46..0000000 --- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStoreSchemaInfo.java +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.provider.db.service.persistent; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.sentry.core.common.exception.SentryUserException; - -public class SentryStoreSchemaInfo { - private static final String SQL_FILE_EXTENSION = ".sql"; - private static final String UPGRADE_FILE_PREFIX = "upgrade-"; - private static final String INIT_FILE_PREFIX = "sentry-"; - private static final String VERSION_UPGRADE_LIST = "upgrade.order"; - private final String dbType; - private final String sentrySchemaVersions[]; - private final String sentryScriptDir; - - private static final String SENTRY_VERSION = "2.1.0"; - - public SentryStoreSchemaInfo(String sentryScriptDir, String dbType) - throws SentryUserException { - this.sentryScriptDir = sentryScriptDir; - this.dbType = dbType; - // load upgrade order for the given dbType - List<String> upgradeOrderList = new ArrayList<String>(); - String upgradeListFile = getSentryStoreScriptDir() + File.separator - + VERSION_UPGRADE_LIST + "." + dbType; - try (BufferedReader bfReader = new BufferedReader(new FileReader(upgradeListFile))) { - String currSchemaVersion; - while ((currSchemaVersion = bfReader.readLine()) != null) { - upgradeOrderList.add(currSchemaVersion.trim()); - } - } catch (FileNotFoundException e) { - throw new SentryUserException("File " + upgradeListFile + " not found ", e); - } catch (IOException e) { - throw new SentryUserException("Error reading " + upgradeListFile, e); - } - sentrySchemaVersions = upgradeOrderList.toArray(new String[0]); - } - - public String getSentrySchemaVersion() { - return SENTRY_VERSION; - } - - public List<String> getUpgradeScripts(String fromSchemaVer) - throws SentryUserException { - List<String> upgradeScriptList = new ArrayList<String>(); - - // check if we are already at current schema level - if (getSentryVersion().equals(fromSchemaVer)) { - return upgradeScriptList; - } - - // Find the list of scripts to execute for this upgrade - int firstScript = sentrySchemaVersions.length; - for (int i = 0; i < sentrySchemaVersions.length; i++) { - String fromVersion = sentrySchemaVersions[i].split("-to-")[0]; - if (fromVersion.equals(fromSchemaVer)) { - firstScript = i; - break; - } - } - if (firstScript == sentrySchemaVersions.length) { - throw new SentryUserException("Unknown version specified for upgrade " - + fromSchemaVer + " Metastore schema may be too old or newer"); - } - - for (int i = firstScript; i < sentrySchemaVersions.length; i++) { - String scriptFile = generateUpgradeFileName(sentrySchemaVersions[i]); - upgradeScriptList.add(scriptFile); - } - return upgradeScriptList; - } - - /*** - * Get the name of the script to initialize the schema for given version - * - * @param toVersion - * Target version. If it's null, then the current server version is - * used - * @return - * @throws SentryUserException - */ - public String generateInitFileName(String toVersion) - throws SentryUserException { - String version = toVersion; - if (version == null) { - version = getSentryVersion(); - } - String initScriptName = INIT_FILE_PREFIX + dbType + "-" + version - + SQL_FILE_EXTENSION; - // check if the file exists - if (!(new File(getSentryStoreScriptDir() + File.separatorChar - + initScriptName).exists())) { - throw new SentryUserException( - "Unknown version specified for initialization: " + version); - } - return initScriptName; - } - - /** - * Find the directory of sentry store scripts - * - * @return - */ - public String getSentryStoreScriptDir() { - return sentryScriptDir; - } - - // format the upgrade script name eg upgrade-x-y-dbType.sql - private String generateUpgradeFileName(String fileVersion) { - return INIT_FILE_PREFIX + UPGRADE_FILE_PREFIX + dbType + "-" - + fileVersion + SQL_FILE_EXTENSION; - } - - // Current hive version, in majorVersion.minorVersion.changeVersion format - // TODO: store the version using the build script - public static String getSentryVersion() { - return SENTRY_VERSION; - } -}
http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java deleted file mode 100644 index 6ad52a3..0000000 --- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.provider.db.service.persistent; - -import javax.jdo.PersistenceManager; - -/** - * TransactionBlock wraps the code that is executed inside a single - * transaction. The {@link #execute(PersistenceManager)} method returns the - * result of the transaction. - */ -@FunctionalInterface -public interface TransactionBlock<T> { - /** - * Execute some code as a single transaction, the code should not start new - * transaction or manipulate transactions with the PersistenceManager. - * - * @param pm PersistenceManager for the current transaction - * @return Object with the result of execute() - * @throws Exception - */ - T execute(PersistenceManager pm) throws Exception; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java deleted file mode 100644 index ba6e845..0000000 --- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.provider.db.service.persistent; - -import com.codahale.metrics.Counter; -import static com.codahale.metrics.MetricRegistry.name; -import com.codahale.metrics.Timer; - -import com.codahale.metrics.Timer.Context; -import org.apache.hadoop.conf.Configuration; -import org.apache.sentry.core.common.exception.SentryUserException; -import org.apache.sentry.service.common.ServiceConstants.ServerConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jdo.PersistenceManager; -import javax.jdo.PersistenceManagerFactory; -import javax.jdo.Transaction; - -import org.apache.sentry.api.service.thrift.SentryMetrics; - -import java.util.Random; -import java.util.concurrent.Callable; - -/** - * TransactionManager is used for executing the database transaction, it supports - * the transaction with retry mechanism for the unexpected exceptions, - * except <em>SentryUserExceptions</em>, eg, <em>SentryNoSuchObjectException</em>, - * <em>SentryAlreadyExistsException</em> etc. For <em>SentryUserExceptions</em>, - * will simply throw the exception without retry<p> - * - * The purpose of the class is to separate all transaction housekeeping (opening - * transaction, rolling back failed transactions) from the actual transaction - * business logic.<p> - * - * TransactionManager creates an instance of PersistenceManager for each - * transaction.<p> - * - * TransactionManager exposes several metrics: - * <ul> - * <li>Timer metric for all transactions</li> - * <li>Counter for failed transactions</li> - * <li>Counter for each exception thrown by transaction</li> - * </ul> - */ -@SuppressWarnings("NestedTryStatement") -public final class TransactionManager { - - private static final Logger LOGGER = - LoggerFactory.getLogger(TransactionManager.class); - - /** Random number generator for exponential backoff */ - private static final Random random = new Random(); - - private final PersistenceManagerFactory pmf; - - // Maximum number of retries per call - private final int transactionRetryMax; - - // Delay (in milliseconds) between retries - private final int retryWaitTimeMills; - - /** Name for metrics */ - private static final String TRANSACTIONS = "transactions"; - - // Transaction timer measures time distribution for all transactions - private final Timer transactionTimer = - SentryMetrics.getInstance(). - getTimer(name(TransactionManager.class, - TRANSACTIONS)); - - // Counter for failed transactions - private final Counter failedTransactionsCount = - SentryMetrics.getInstance(). - getCounter(name(TransactionManager.class, - TRANSACTIONS, "failed")); - - private final Counter retryCount = - SentryMetrics.getInstance().getCounter(name(TransactionManager.class, - TRANSACTIONS, "retry")); - - TransactionManager(PersistenceManagerFactory pmf, Configuration conf) { - this.pmf = pmf; - transactionRetryMax = conf.getInt( - ServerConfig.SENTRY_STORE_TRANSACTION_RETRY, - ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_DEFAULT); - retryWaitTimeMills = conf.getInt( - ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_WAIT_TIME_MILLIS, - ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_WAIT_TIME_MILLIS_DEFAULT); - } - - - /** - * Execute some code as a single transaction, the code in tb.execute() - * should not start new transaction or manipulate transactions with the - * PersistenceManager. - * - * @param tb transaction block with code to be executed - * @return Object with the result of tb.execute() - */ - public <T> T executeTransaction(TransactionBlock<T> tb) throws Exception { - try (Context context = transactionTimer.time(); - PersistenceManager pm = pmf.getPersistenceManager()) { - Transaction transaction = pm.currentTransaction(); - transaction.begin(); - try { - T result = tb.execute(pm); - transaction.commit(); - return result; - } catch (Exception e) { - // Count total failed transactions - failedTransactionsCount.inc(); - // Count specific exceptions - SentryMetrics.getInstance().getCounter(name(TransactionManager.class, - "exception", e.getClass().getSimpleName())).inc(); - // Re-throw the exception - throw e; - } finally { - if (transaction.isActive()) { - transaction.rollback(); - } - } - } - } - - /** - * Execute a list of TransactionBlock code as a single transaction. - * The code in tb.execute() should not start new transaction or - * manipulate transactions with the PersistenceManager. It returns - * the result of the last transaction block execution. - * - * @param tbs transaction blocks with code to be executed - * @return the result of the last result of tb.execute() - */ - private <T> T executeTransaction(Iterable<TransactionBlock<T>> tbs) throws Exception { - try (Context context = transactionTimer.time(); - PersistenceManager pm = pmf.getPersistenceManager()) { - Transaction transaction = pm.currentTransaction(); - transaction.begin(); - try { - T result = null; - for (TransactionBlock<T> tb : tbs) { - result = tb.execute(pm); - } - transaction.commit(); - return result; - } catch (Exception e) { - // Count total failed transactions - failedTransactionsCount.inc(); - // Count specific exceptions - SentryMetrics.getInstance().getCounter(name(TransactionManager.class, - "exception", e.getClass().getSimpleName())).inc(); - // Re-throw the exception - throw e; - } finally { - if (transaction.isActive()) { - transaction.rollback(); - } - } - } - } - - /** - * Execute some code as a single transaction with retry mechanism. - * - * @param tb transaction block with code to execute - * @return Object with the result of tb.execute() - */ - @SuppressWarnings("squid:S00112") - public <T> T executeTransactionWithRetry(final TransactionBlock<T> tb) - throws Exception { - return new ExponentialBackoff().execute( - new Callable<T>() { - @Override - public T call() throws Exception { - return executeTransaction(tb); - } - } - ); - } - - /** - * Execute a list of TransactionBlock code as a single transaction. - * If any of the TransactionBlock fail, all the TransactionBlocks would - * retry. It returns the result of the last transaction block - * execution. - * - * @param tbs a list of transaction blocks with code to be executed. - */ - @SuppressWarnings("squid:S00112") - <T> void executeTransactionBlocksWithRetry(final Iterable<TransactionBlock<T>> tbs) - throws Exception { - new ExponentialBackoff().execute( - new Callable<T>() { - @Override - public T call() throws Exception { - return executeTransaction(tbs); - } - } - ); - } - - /** - * Implementation of exponential backoff with random fuzziness. - * On each iteration the backoff time is 1.5 the previous amount plus the - * random fuzziness factor which is up to half of the previous amount. - */ - private class ExponentialBackoff { - - @SuppressWarnings("squid:S00112") - <T> T execute(Callable<T> arg) throws Exception { - Exception ex = null; - long sleepTime = retryWaitTimeMills; - - for (int retryNum = 1; retryNum <= transactionRetryMax; retryNum++) { - try { - return arg.call(); - } catch (SentryUserException e) { - // throw the sentry exception without retry - LOGGER.warn("Transaction manager encountered non-retriable exception", e); - throw e; - } catch (Exception e) { - ex = e; - retryCount.inc(); - LOGGER.warn("Transaction execution encountered exception", e); - LOGGER.warn("Retrying transaction {}/{} times", - retryNum, transactionRetryMax); - // Introduce some randomness in the backoff time. - LOGGER.warn("Sleeping for {} milliseconds before retrying", sleepTime); - Thread.sleep(sleepTime); - int fuzz = random.nextInt((int)sleepTime / 2); - sleepTime *= 3; - sleepTime /= 2; - sleepTime += fuzz; - } - } - assert(ex != null); - String message = "The transaction has reached max retry number, " - + ex.getMessage(); - LOGGER.error(message, ex); - throw new Exception(message, ex); - } - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java deleted file mode 100644 index 992d8ab..0000000 --- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java +++ /dev/null @@ -1,524 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.service.thrift; - -import com.codahale.metrics.Counter; -import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.sentry.hdfs.PathsUpdate; -import org.apache.sentry.hdfs.SentryMalformedPathException; -import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; -import org.apache.sentry.api.service.thrift.SentryMetrics; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Deque; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.Map; -import java.util.HashMap; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -import static com.codahale.metrics.MetricRegistry.name; - -/** - * Manage fetching full snapshot from HMS. - * Snapshot is represented as a map from the hive object name to - * the set of paths for this object. - * The hive object name is either the Hive database name or - * Hive database name joined with Hive table name as {@code dbName.tableName}. - * All table partitions are stored under the table object. - * <p> - * Once {@link FullUpdateInitializer}, the {@link FullUpdateInitializer#getFullHMSSnapshot()} - * method should be called to get the initial update. - * <p> - * It is important to close the {@link FullUpdateInitializer} object to prevent resource - * leaks. - * <p> - * The usual way of using {@link FullUpdateInitializer} is - * <pre> - * {@code - * try (FullUpdateInitializer updateInitializer = - * new FullUpdateInitializer(clientFactory, authzConf)) { - * Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot(); - * return pathsUpdate; - * } - */ -public final class FullUpdateInitializer implements AutoCloseable { - - /* - * Implementation note. - * - * The snapshot is obtained using an executor. We follow the map/reduce model. - * Each executor thread (mapper) obtains and returns a partial snapshot which are then - * reduced to a single combined snapshot by getFullHMSSnapshot(). - * - * Synchronization between the getFullHMSSnapshot() and executors is done using the - * 'results' queue. The queue holds the futures for each scheduled task. - * It is initially populated by getFullHMSSnapshot and each task may add new future - * results to it. Only getFullHMSSnapshot() removes entries from the results queue. - * This guarantees that once the results queue is empty there are no pending jobs. - * - * Since there are no other data sharing, the implementation is safe without - * any other synchronization. It is not thread-safe for concurrent calls - * to getFullHMSSnapshot(). - * - */ - - - private static final String FULL_UPDATE_INITIALIZER_THREAD_NAME = "hms-fetch-%d"; - private final ExecutorService threadPool; - private final int maxPartitionsPerCall; - private final int maxTablesPerCall; - private final Deque<Future<CallResult>> results = new ConcurrentLinkedDeque<>(); - private final int maxRetries; - private final int waitDurationMillis; - - private static final Logger LOGGER = LoggerFactory.getLogger(FullUpdateInitializer.class); - - private static final ObjectMapping emptyObjectMapping = - new ObjectMapping(Collections.<String, Set<String>>emptyMap()); - private final HiveConnectionFactory clientFactory; - - /** Total number of database objects */ - private final Counter databaseCount = SentryMetrics.getInstance() - .getCounter(name(FullUpdateInitializer.class, "total", "db")); - - /** Total number of table objects */ - private final Counter tableCount = SentryMetrics.getInstance() - .getCounter(name(FullUpdateInitializer.class, "total", "tables")); - - /** Total number of partition objects */ - private final Counter partitionCount = SentryMetrics.getInstance() - .getCounter(name(FullUpdateInitializer.class, "total", "partitions")); - - /** - * Extract path (not starting with "/") from the full URI - * @param uri - resource URI (usually with scheme) - * @return path if uri is valid or null - */ - static String pathFromURI(String uri) { - try { - return PathsUpdate.parsePath(uri); - } catch (SentryMalformedPathException e) { - LOGGER.warn(String.format("Ignoring invalid uri %s: %s", - uri, e.getReason())); - return null; - } - } - - /** - * Mapping of object to set of paths. - * Used to represent partial results from executor threads. Multiple - * ObjectMapping objects are combined in a single mapping - * to get the final result. - */ - private static final class ObjectMapping { - private final Map<String, Set<String>> objects; - - ObjectMapping(Map<String, Set<String>> objects) { - this.objects = objects; - } - - ObjectMapping(String authObject, String path) { - Set<String> values = Collections.singleton(safeIntern(path)); - objects = ImmutableMap.of(authObject, values); - } - - ObjectMapping(String authObject, Collection<String> paths) { - Set<String> values = new HashSet<>(paths); - objects = ImmutableMap.of(authObject, values); - } - - Map<String, Set<String>> getObjects() { - return objects; - } - } - - private static final class CallResult { - private final Exception failure; - private final boolean successStatus; - private final ObjectMapping objectMapping; - - CallResult(Exception ex) { - failure = ex; - successStatus = false; - objectMapping = emptyObjectMapping; - } - - CallResult(ObjectMapping objectMapping) { - failure = null; - successStatus = true; - this.objectMapping = objectMapping; - } - - boolean success() { - return successStatus; - } - - ObjectMapping getObjectMapping() { - return objectMapping; - } - - public Exception getFailure() { - return failure; - } - } - - private abstract class BaseTask implements Callable<CallResult> { - - /** - * Class represents retry strategy for BaseTask. - */ - private final class RetryStrategy { - private int retryStrategyMaxRetries = 0; - private final int retryStrategyWaitDurationMillis; - - private RetryStrategy(int retryStrategyMaxRetries, int retryStrategyWaitDurationMillis) { - this.retryStrategyMaxRetries = retryStrategyMaxRetries; - - // Assign default wait duration if negative value is provided. - this.retryStrategyWaitDurationMillis = (retryStrategyWaitDurationMillis > 0) ? - retryStrategyWaitDurationMillis : 1000; - } - - @SuppressWarnings({"squid:S1141", "squid:S2142"}) - public CallResult exec() { - // Retry logic is happening inside callable/task to avoid - // synchronous waiting on getting the result. - // Retry the failure task until reach the max retry number. - // Wait configurable duration for next retry. - // - // Only thrift exceptions are retried. - // Other exceptions are propagated up the stack. - Exception exception = null; - try { - // We catch all exceptions except Thrift exceptions which are retried - for (int i = 0; i < retryStrategyMaxRetries; i++) { - //noinspection NestedTryStatement - try { - return new CallResult(doTask()); - } catch (TException ex) { - LOGGER.debug("Failed to execute task on " + (i + 1) + " attempts." + - " Sleeping for " + retryStrategyWaitDurationMillis + " ms. Exception: " + - ex.toString(), ex); - exception = ex; - - try { - Thread.sleep(retryStrategyWaitDurationMillis); - } catch (InterruptedException ignored) { - // Skip the rest retries if get InterruptedException. - // And set the corresponding retries number. - LOGGER.warn("Interrupted during update fetch during iteration " + (i + 1)); - break; - } - } - } - } catch (Exception ex) { - exception = ex; - } - LOGGER.error("Failed to execute task", exception); - // We will fail in the end, so we are shutting down the pool to prevent - // new tasks from being scheduled. - threadPool.shutdown(); - return new CallResult(exception); - } - } - - private final RetryStrategy retryStrategy; - - BaseTask() { - retryStrategy = new RetryStrategy(maxRetries, waitDurationMillis); - } - - @Override - public CallResult call() throws Exception { - return retryStrategy.exec(); - } - - abstract ObjectMapping doTask() throws Exception; - } - - private class PartitionTask extends BaseTask { - private final String dbName; - private final String tblName; - private final String authName; - private final List<String> partNames; - - PartitionTask(String dbName, String tblName, String authName, - List<String> partNames) { - this.dbName = safeIntern(dbName); - this.tblName = safeIntern(tblName); - this.authName = safeIntern(authName); - this.partNames = partNames; - } - - @Override - ObjectMapping doTask() throws Exception { - List<Partition> tblParts; - HMSClient c = null; - try (HMSClient client = clientFactory.connect()) { - c = client; - tblParts = client.getClient().getPartitionsByNames(dbName, tblName, partNames); - } catch (Exception e) { - if (c != null) { - c.invalidate(); - } - throw e; - } - - LOGGER.debug("Fetched partitions for db = {}, table = {}", - dbName, tblName); - - Collection<String> partitionNames = new ArrayList<>(tblParts.size()); - for (Partition part : tblParts) { - String partPath = pathFromURI(part.getSd().getLocation()); - if (partPath != null) { - partitionNames.add(partPath.intern()); - } - } - return new ObjectMapping(authName, partitionNames); - } - } - - private class TableTask extends BaseTask { - private final String dbName; - private final List<String> tableNames; - - TableTask(Database db, List<String> tableNames) { - dbName = safeIntern(db.getName()); - this.tableNames = tableNames; - } - - @Override - @SuppressWarnings({"squid:S2629", "squid:S135"}) - ObjectMapping doTask() throws Exception { - HMSClient c = null; - try (HMSClient client = clientFactory.connect()) { - c = client; - List<Table> tables = client.getClient().getTableObjectsByName(dbName, tableNames); - - LOGGER.debug("Fetching tables for db = {}, tables = {}", dbName, tableNames); - - Map<String, Set<String>> objectMapping = new HashMap<>(tables.size()); - for (Table tbl : tables) { - // Table names are case insensitive - if (!tbl.getDbName().equalsIgnoreCase(dbName)) { - // Inconsistency in HMS data - LOGGER.warn(String.format("DB name %s for table %s does not match %s", - tbl.getDbName(), tbl.getTableName(), dbName)); - continue; - } - - String tableName = safeIntern(tbl.getTableName().toLowerCase()); - String authzObject = (dbName + "." + tableName).intern(); - List<String> tblPartNames = - client.getClient().listPartitionNames(dbName, tableName, (short) -1); - // Count total number of partitions - partitionCount.inc(tblPartNames.size()); - for (int i = 0; i < tblPartNames.size(); i += maxPartitionsPerCall) { - List<String> partsToFetch = tblPartNames.subList(i, - Math.min(i + maxPartitionsPerCall, tblPartNames.size())); - Callable<CallResult> partTask = new PartitionTask(dbName, - tableName, authzObject, partsToFetch); - results.add(threadPool.submit(partTask)); - } - String tblPath = safeIntern(pathFromURI(tbl.getSd().getLocation())); - if (tblPath == null) { - continue; - } - Set<String> paths = objectMapping.get(authzObject); - if (paths == null) { - paths = new HashSet<>(1); - objectMapping.put(authzObject, paths); - } - paths.add(tblPath); - } - return new ObjectMapping(Collections.unmodifiableMap(objectMapping)); - } catch (Exception e) { - if (c != null) { - c.invalidate(); - } - throw e; - } - } - } - - private class DbTask extends BaseTask { - - private final String dbName; - - DbTask(String dbName) { - //Database names are case insensitive - this.dbName = safeIntern(dbName.toLowerCase()); - databaseCount.inc(); - } - - @Override - ObjectMapping doTask() throws Exception { - HMSClient c = null; - try (HMSClient client = clientFactory.connect()) { - c = client; - Database db = client.getClient().getDatabase(dbName); - if (!dbName.equalsIgnoreCase(db.getName())) { - LOGGER.warn("Database name {} does not match {}", db.getName(), dbName); - return emptyObjectMapping; - } - List<String> allTblStr = client.getClient().getAllTables(dbName); - // Count total number of tables - tableCount.inc(allTblStr.size()); - for (int i = 0; i < allTblStr.size(); i += maxTablesPerCall) { - List<String> tablesToFetch = allTblStr.subList(i, - Math.min(i + maxTablesPerCall, allTblStr.size())); - Callable<CallResult> tableTask = new TableTask(db, tablesToFetch); - results.add(threadPool.submit(tableTask)); - } - String dbPath = safeIntern(pathFromURI(db.getLocationUri())); - return (dbPath != null) ? new ObjectMapping(dbName, dbPath) : - emptyObjectMapping; - } catch (Exception e) { - if (c != null) { - c.invalidate(); - } - throw e; - } - } - } - - FullUpdateInitializer(HiveConnectionFactory clientFactory, Configuration conf) { - this.clientFactory = clientFactory; - maxPartitionsPerCall = conf.getInt( - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC, - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT); - maxTablesPerCall = conf.getInt( - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC, - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT); - maxRetries = conf.getInt( - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM, - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM_DEFAULT); - waitDurationMillis = conf.getInt( - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS, - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT); - - ThreadFactory fullUpdateInitThreadFactory = new ThreadFactoryBuilder() - .setNameFormat(FULL_UPDATE_INITIALIZER_THREAD_NAME) - .setDaemon(false) - .build(); - threadPool = Executors.newFixedThreadPool(conf.getInt( - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS, - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT), - fullUpdateInitThreadFactory); - } - - /** - * Get Full HMS snapshot. - * @return Full snapshot of HMS objects. - * @throws TException if Thrift error occured - * @throws ExecutionException if there was a scheduling error - * @throws InterruptedException if processing was interrupted - */ - @SuppressWarnings("squid:S00112") - Map<String, Collection<String>> getFullHMSSnapshot() throws Exception { - // Get list of all HMS databases - List<String> allDbStr; - HMSClient c = null; - try (HMSClient client = clientFactory.connect()) { - c = client; - allDbStr = client.getClient().getAllDatabases(); - } catch (Exception e) { - if (c != null) { - c.invalidate(); - } - throw e; - } - - // Schedule async task for each database responsible for fetching per-database - // objects. - for (String dbName : allDbStr) { - results.add(threadPool.submit(new DbTask(dbName))); - } - - // Resulting full snapshot - Map<String, Collection<String>> fullSnapshot = new HashMap<>(); - - // As async tasks complete, merge their results into full snapshot. - while (!results.isEmpty()) { - // This is the only thread that takes elements off the results list - all other threads - // only add to it. Once the list is empty it can't become non-empty - // This means that if we check that results is non-empty we can safely call pop() and - // know that the result of poll() is not null. - Future<CallResult> result = results.pop(); - // Wait for the task to complete - CallResult callResult = result.get(); - // Fail if we got errors - if (!callResult.success()) { - throw callResult.getFailure(); - } - // Merge values into fullUpdate - Map<String, Set<String>> objectMapping = - callResult.getObjectMapping().getObjects(); - for (Map.Entry<String, Set<String>> entry: objectMapping.entrySet()) { - String key = entry.getKey(); - Set<String> val = entry.getValue(); - Set<String> existingSet = (Set<String>)fullSnapshot.get(key); - if (existingSet == null) { - fullSnapshot.put(key, val); - continue; - } - existingSet.addAll(val); - } - } - return fullSnapshot; - } - - @Override - public void close() { - threadPool.shutdownNow(); - try { - threadPool.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException ignored) { - LOGGER.warn("Interrupted shutdown"); - Thread.currentThread().interrupt(); - } - } - - /** - * Intern a string but only if it is not null - * @param arg String to be interned, may be null - * @return interned string or null - */ - static String safeIntern(String arg) { - return (arg != null) ? arg.intern() : null; - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateModifier.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateModifier.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateModifier.java deleted file mode 100644 index c30d982..0000000 --- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateModifier.java +++ /dev/null @@ -1,606 +0,0 @@ -/* - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ - -package org.apache.sentry.service.thrift; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; -import org.apache.hadoop.hive.metastore.messaging.EventMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAddPartitionMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterPartitionMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterTableMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateDatabaseMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateTableMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropDatabaseMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropPartitionMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropTableMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -/** - * Apply newer events to the full update. - * - * <p>The process of obtaining ful snapshot from HMS is not atomic. - * While we read information from HMS it may change - some new objects can be created, - * or some can be removed or modified. This class is used to reconsile changes to - * the full snapshot. - */ -final class FullUpdateModifier { - private static final Logger LOGGER = LoggerFactory.getLogger(FullUpdateModifier.class); - - // Prevent creation of class instances - private FullUpdateModifier() { - } - - /** - * Take a full snapshot and apply an MS event to it. - * - * <p>We pass serializer as a parameter to simplify testing. - * - * @param image Full snapshot - * @param event HMS notificatin event - * @param deserializer Message deserializer - - * should produce Sentry JSON serializer type messages. - */ - // NOTE: we pass deserializer here instead of using built-in one to simplify testing. - // Tests use mock serializers and thus we do not have to construct proper events. - static void applyEvent(Map<String, Collection<String>> image, NotificationEvent event, - MessageDeserializer deserializer) { - EventMessage.EventType eventType = - EventMessage.EventType.valueOf(event.getEventType()); - - switch (eventType) { - case CREATE_DATABASE: - createDatabase(image, event, deserializer); - break; - case DROP_DATABASE: - dropDatabase(image, event, deserializer); - break; - case CREATE_TABLE: - createTable(image, event, deserializer); - break; - case DROP_TABLE: - dropTable(image, event, deserializer); - break; - case ALTER_TABLE: - alterTable(image, event, deserializer); - break; - case ADD_PARTITION: - addPartition(image, event, deserializer); - break; - case DROP_PARTITION: - dropPartition(image, event, deserializer); - break; - case ALTER_PARTITION: - alterPartition(image, event, deserializer); - break; - default: - LOGGER.error("Notification with ID:{} has invalid event type: {}", event.getEventId(), - event.getEventType()); - break; - } - } - - /** - * Add mapping from the new database name to location {dbname: {location}}. - */ - private static void createDatabase(Map<String, Collection<String>> image, NotificationEvent event, - MessageDeserializer deserializer) { - SentryJSONCreateDatabaseMessage message = - (SentryJSONCreateDatabaseMessage) deserializer - .getCreateDatabaseMessage(event.getMessage()); - - String dbName = message.getDB(); - if ((dbName == null) || dbName.isEmpty()) { - LOGGER.error("Create database event is missing database name"); - return; - } - dbName = dbName.toLowerCase(); - - String location = message.getLocation(); - if ((location == null) || location.isEmpty()) { - LOGGER.error("Create database event is missing database location"); - return; - } - - String path = FullUpdateInitializer.pathFromURI(location); - if (path == null) { - return; - } - - // Add new database if it doesn't exist yet - if (!image.containsKey(dbName)) { - LOGGER.debug("create database {} with location {}", dbName, location); - image.put(dbName.intern(), Collections.singleton(path)); - } else { - // Sanity check the information and print warnings if database exists but - // with a different location - Set<String> oldLocations = (Set<String>)image.get(dbName); - LOGGER.debug("database {} already exists, ignored", dbName); - if (!oldLocations.contains(location)) { - LOGGER.warn("database {} exists but location is different from {}", dbName, location); - } - } - } - - /** - * Remove a mapping from database name and remove all mappings which look like dbName.tableName - * where dbName matches database name. - */ - private static void dropDatabase(Map<String, Collection<String>> image, NotificationEvent event, - MessageDeserializer deserializer) { - SentryJSONDropDatabaseMessage message = - (SentryJSONDropDatabaseMessage) deserializer.getDropDatabaseMessage(event.getMessage()); - - String dbName = message.getDB(); - if ((dbName == null) || dbName.isEmpty()) { - LOGGER.error("Drop database event is missing database name"); - return; - } - dbName = dbName.toLowerCase(); - String location = message.getLocation(); - if ((location == null) || location.isEmpty()) { - LOGGER.error("Drop database event is missing database location"); - return; - } - - String path = FullUpdateInitializer.pathFromURI(location); - if (path == null) { - return; - } - - // If the database is alreday deleted, we have nothing to do - Set<String> locations = (Set<String>)image.get(dbName); - if (locations == null) { - LOGGER.debug("database {} is already deleted", dbName); - return; - } - - if (!locations.contains(path)) { - LOGGER.warn("Database {} location does not match {}", dbName, path); - return; - } - - LOGGER.debug("drop database {} with location {}", dbName, location); - - // Drop information about the database - image.remove(dbName); - - String dbPrefix = dbName + "."; - - // Remove all objects for this database - for (Iterator<Map.Entry<String, Collection<String>>> it = image.entrySet().iterator(); - it.hasNext(); ) { - Map.Entry<String, Collection<String>> entry = it.next(); - String key = entry.getKey(); - if (key.startsWith(dbPrefix)) { - LOGGER.debug("Removing {}", key); - it.remove(); - } - } - } - - /** - * Add mapping for dbName.tableName. - */ - private static void createTable(Map<String, Collection<String>> image, NotificationEvent event, - MessageDeserializer deserializer) { - SentryJSONCreateTableMessage message = (SentryJSONCreateTableMessage) deserializer - .getCreateTableMessage(event.getMessage()); - - String dbName = message.getDB(); - if ((dbName == null) || dbName.isEmpty()) { - LOGGER.error("Create table event is missing database name"); - return; - } - String tableName = message.getTable(); - if ((tableName == null) || tableName.isEmpty()) { - LOGGER.error("Create table event is missing table name"); - return; - } - - String location = message.getLocation(); - if ((location == null) || location.isEmpty()) { - LOGGER.error("Create table event is missing table location"); - return; - } - - String path = FullUpdateInitializer.pathFromURI(location); - if (path == null) { - return; - } - - String authName = dbName.toLowerCase() + "." + tableName.toLowerCase(); - // Add new table if it doesn't exist yet - if (!image.containsKey(authName)) { - LOGGER.debug("create table {} with location {}", authName, location); - Set<String> locations = new HashSet<>(1); - locations.add(path); - image.put(authName.intern(), locations); - } else { - // Sanity check the information and print warnings if table exists but - // with a different location - Set<String> oldLocations = (Set<String>)image.get(authName); - LOGGER.debug("Table {} already exists, ignored", authName); - if (!oldLocations.contains(location)) { - LOGGER.warn("Table {} exists but location is different from {}", authName, location); - } - } - } - - /** - * Drop mapping from dbName.tableName - */ - private static void dropTable(Map<String, Collection<String>> image, NotificationEvent event, - MessageDeserializer deserializer) { - SentryJSONDropTableMessage message = (SentryJSONDropTableMessage) deserializer - .getDropTableMessage(event.getMessage()); - - String dbName = message.getDB(); - if ((dbName == null) || dbName.isEmpty()) { - LOGGER.error("Drop table event is missing database name"); - return; - } - String tableName = message.getTable(); - if ((tableName == null) || tableName.isEmpty()) { - LOGGER.error("Drop table event is missing table name"); - return; - } - - String location = message.getLocation(); - if ((location == null) || location.isEmpty()) { - LOGGER.error("Drop table event is missing table location"); - return; - } - - String path = FullUpdateInitializer.pathFromURI(location); - if (path == null) { - return; - } - - String authName = dbName.toLowerCase() + "." + tableName.toLowerCase(); - Set<String> locations = (Set<String>)image.get(authName); - if (locations != null && locations.contains(path)) { - LOGGER.debug("Removing {}", authName); - image.remove(authName); - } else { - LOGGER.warn("can't find matching table {} with location {}", authName, location); - } - } - - /** - * ALTER TABLE is a complicated function that can alter multiple things. - * - * <p>We take care iof the following cases: - * <ul> - * <li>Change database name. This is the most complicated one. - * We need to change the actual database name and change all mappings - * that look like "dbName.tableName" to the new dbName</li> - * <li>Change table name</li> - * <li>Change location</li> - * </ul> - * - */ - private static void alterTable(Map<String, Collection<String>> image, NotificationEvent event, - MessageDeserializer deserializer) { - SentryJSONAlterTableMessage message = - (SentryJSONAlterTableMessage) deserializer.getAlterTableMessage(event.getMessage()); - String prevDbName = message.getDB(); - if ((prevDbName == null) || prevDbName.isEmpty()) { - LOGGER.error("Alter table event is missing old database name"); - return; - } - prevDbName = prevDbName.toLowerCase(); - String prevTableName = message.getTable(); - if ((prevTableName == null) || prevTableName.isEmpty()) { - LOGGER.error("Alter table event is missing old table name"); - return; - } - prevTableName = prevTableName.toLowerCase(); - - String newDbName = event.getDbName(); - if ((newDbName == null) || newDbName.isEmpty()) { - LOGGER.error("Alter table event is missing new database name"); - return; - } - newDbName = newDbName.toLowerCase(); - - String newTableName = event.getTableName(); - if ((newTableName == null) || newTableName.isEmpty()) { - LOGGER.error("Alter table event is missing new table name"); - return; - } - newTableName = newTableName.toLowerCase(); - - String prevLocation = message.getOldLocation(); - if ((prevLocation == null) || prevLocation.isEmpty()) { - LOGGER.error("Alter table event is missing old location"); - return; - } - String prevPath = FullUpdateInitializer.pathFromURI(prevLocation); - if (prevPath == null) { - return; - } - - String newLocation = message.getNewLocation(); - if ((newLocation == null) || newLocation.isEmpty()) { - LOGGER.error("Alter table event is missing new location"); - return; - } - String newPath = FullUpdateInitializer.pathFromURI(newLocation); - if (newPath == null) { - return; - } - - String prevAuthName = prevDbName + "." + prevTableName; - String newAuthName = newDbName + "." + newTableName; - - if (!prevDbName.equals(newDbName)) { - // Database name change - LOGGER.debug("Changing database name: {} -> {}", prevDbName, newDbName); - Set<String> locations = (Set<String>)image.get(prevDbName); - if (locations != null) { - // Rename database if it is not renamed yet - if (!image.containsKey(newDbName)) { - image.put(newDbName, locations); - image.remove(prevDbName); - // Walk through all tables and rename DB part of the AUTH name - // AUTH name is "dbName.TableName" so we need to replace dbName with the new name - String prevDbPrefix = prevDbName + "."; - String newDbPrefix = newDbName + "."; - renamePrefixKeys(image, prevDbPrefix, newDbPrefix); - } else { - LOGGER.warn("database {} rename: found existing database {}", prevDbName, newDbName); - } - } else { - LOGGER.debug("database {} not found", prevDbName); - } - } - - if (!prevAuthName.equals(newAuthName)) { - // Either the database name or table name changed, rename objects - Set<String> locations = (Set<String>)image.get(prevAuthName); - if (locations != null) { - // Rename if it is not renamed yet - if (!image.containsKey(newAuthName)) { - LOGGER.debug("rename {} -> {}", prevAuthName, newAuthName); - image.put(newAuthName, locations); - image.remove(prevAuthName); - } else { - LOGGER.warn("auth {} rename: found existing object {}", prevAuthName, newAuthName); - } - } else { - LOGGER.debug("auth {} not found", prevAuthName); - } - } - - if (!prevPath.equals(newPath)) { - LOGGER.debug("Location change: {} -> {}", prevPath, newPath); - // Location change - Set<String> locations = (Set<String>) image.get(newAuthName); - if (locations != null && locations.contains(prevPath) && !locations.contains(newPath)) { - locations.remove(prevPath); - locations.add(newPath); - } else { - LOGGER.warn("can not process location change for {}", newAuthName); - LOGGER.warn("old locatio = {}, new location = {}", prevPath, newPath); - } - } - } - - /** - * Add partition just adds a new location to the existing table. - */ - private static void addPartition(Map<String, Collection<String>> image, NotificationEvent event, - MessageDeserializer deserializer) { - SentryJSONAddPartitionMessage message = - (SentryJSONAddPartitionMessage) deserializer.getAddPartitionMessage(event.getMessage()); - - String dbName = message.getDB(); - if ((dbName == null) || dbName.isEmpty()) { - LOGGER.error("Add partition event is missing database name"); - return; - } - String tableName = message.getTable(); - if ((tableName == null) || tableName.isEmpty()) { - LOGGER.error("Add partition event for {} is missing table name", dbName); - return; - } - - String authName = dbName.toLowerCase() + "." + tableName.toLowerCase(); - - List<String> locations = message.getLocations(); - if (locations == null || locations.isEmpty()) { - LOGGER.error("Add partition event for {} is missing partition locations", authName); - return; - } - - Set<String> oldLocations = (Set<String>) image.get(authName); - if (oldLocations == null) { - LOGGER.warn("Add partition for {}: missing table locations",authName); - return; - } - - // Add each partition - for (String location: locations) { - String path = FullUpdateInitializer.pathFromURI(location); - if (path != null) { - LOGGER.debug("Adding partition {}:{}", authName, path); - oldLocations.add(path); - } - } - } - - /** - * Drop partition removes location from the existing table. - */ - private static void dropPartition(Map<String, Collection<String>> image, NotificationEvent event, - MessageDeserializer deserializer) { - SentryJSONDropPartitionMessage message = - (SentryJSONDropPartitionMessage) deserializer - .getDropPartitionMessage(event.getMessage()); - String dbName = message.getDB(); - if ((dbName == null) || dbName.isEmpty()) { - LOGGER.error("Drop partition event is missing database name"); - return; - } - String tableName = message.getTable(); - if ((tableName == null) || tableName.isEmpty()) { - LOGGER.error("Drop partition event for {} is missing table name", dbName); - return; - } - - String authName = dbName.toLowerCase() + "." + tableName.toLowerCase(); - - List<String> locations = message.getLocations(); - if (locations == null || locations.isEmpty()) { - LOGGER.error("Drop partition event for {} is missing partition locations", authName); - return; - } - - Set<String> oldLocations = (Set<String>) image.get(authName); - if (oldLocations == null) { - LOGGER.warn("Add partition for {}: missing table locations",authName); - return; - } - - // Drop each partition - for (String location: locations) { - String path = FullUpdateInitializer.pathFromURI(location); - if (path != null) { - oldLocations.remove(path); - } - } - } - - private static void alterPartition(Map<String, Collection<String>> image, NotificationEvent event, - MessageDeserializer deserializer) { - SentryJSONAlterPartitionMessage message = - (SentryJSONAlterPartitionMessage) deserializer - .getAlterPartitionMessage(event.getMessage()); - - String dbName = message.getDB(); - if ((dbName == null) || dbName.isEmpty()) { - LOGGER.error("Alter partition event is missing database name"); - return; - } - String tableName = message.getTable(); - if ((tableName == null) || tableName.isEmpty()) { - LOGGER.error("Alter partition event for {} is missing table name", dbName); - return; - } - - String authName = dbName.toLowerCase() + "." + tableName.toLowerCase(); - - String prevLocation = message.getOldLocation(); - if (prevLocation == null || prevLocation.isEmpty()) { - LOGGER.error("Alter partition event for {} is missing old location", authName); - } - - String prevPath = FullUpdateInitializer.pathFromURI(prevLocation); - if (prevPath == null) { - return; - } - - String newLocation = message.getNewLocation(); - if (newLocation == null || newLocation.isEmpty()) { - LOGGER.error("Alter partition event for {} is missing new location", authName); - } - - String newPath = FullUpdateInitializer.pathFromURI(newLocation); - if (newPath == null) { - return; - } - - if (prevPath.equals(newPath)) { - LOGGER.warn("Alter partition event for {} has the same old and new path {}", - authName, prevPath); - return; - } - - Set<String> locations = (Set<String>) image.get(authName); - if (locations == null) { - LOGGER.warn("Missing partition locations for {}", authName); - return; - } - - // Rename partition - if (locations.remove(prevPath)) { - LOGGER.debug("Renaming {} to {}", prevPath, newPath); - locations.add(newPath); - } - } - - /** - * Walk through the map and rename all instances of oldKey to newKey. - */ - @VisibleForTesting - protected static void renamePrefixKeys(Map<String, Collection<String>> image, - String oldKey, String newKey) { - // The trick is that we can't just iterate through the map, remove old values and - // insert new values. While we can remove old values with iterators, - // we can't insert new ones while we walk. So we collect the keys to be added in - // a new map and merge them in the end. - Map<String, Set<String>> replacement = new HashMap<>(); - - for (Iterator<Map.Entry<String, Collection<String>>> it = image.entrySet().iterator(); - it.hasNext(); ) { - Map.Entry<String, Collection<String>> entry = it.next(); - String key = entry.getKey(); - if (key.startsWith(oldKey)) { - String updatedKey = key.replaceAll("^" + oldKey + "(.*)", newKey + "$1"); - if (!image.containsKey(updatedKey)) { - LOGGER.debug("Rename {} to {}", key, updatedKey); - replacement.put(updatedKey, (Set<String>) entry.getValue()); - it.remove(); - } else { - LOGGER.warn("skipping key {} - already present", updatedKey); - } - } - } - - mergeMaps(image, replacement); - } - - /** - * Merge replacement values into the original map but only if they are not - * already there. - * - * @param m1 source map - * @param m2 map with replacement values - */ - private static void mergeMaps(Map<String, Collection<String>> m1, Map<String, Set<String>> m2) { - // Merge replacement values into the original map but only if they are not - // already there - for (Map.Entry<String, Set<String>> entry : m2.entrySet()) { - if (!m1.containsKey(entry.getKey())) { - m1.put(entry.getKey(), entry.getValue()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/GSSCallback.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/GSSCallback.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/GSSCallback.java deleted file mode 100644 index d2d85d3..0000000 --- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/GSSCallback.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.service.thrift; - -import java.util.Arrays; -import java.util.List; - -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.UnsupportedCallbackException; -import javax.security.sasl.AuthorizeCallback; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.SaslRpcServer; -import org.apache.sentry.core.common.exception.ConnectionDeniedException; -import org.apache.sentry.service.common.ServiceConstants.ServerConfig; - -public class GSSCallback extends SaslRpcServer.SaslGssCallbackHandler { - - private final Configuration conf; - public GSSCallback(Configuration conf) { - super(); - this.conf = conf; - } - - boolean comparePrincipals(String principal1, String principal2) { - String[] principalParts1 = SaslRpcServer.splitKerberosName(principal1); - String[] principalParts2 = SaslRpcServer.splitKerberosName(principal2); - if (principalParts1.length == 0 || principalParts2.length == 0) { - return false; - } - if (principalParts1.length == principalParts2.length) { - for (int i=0; i < principalParts1.length; i++) { - if (!principalParts1[i].equals(principalParts2[i])) { - return false; - } - } - return true; - } else { - return false; - } - } - - boolean allowConnect(String principal) { - String allowedPrincipals = conf.get(ServerConfig.ALLOW_CONNECT); - if (allowedPrincipals == null) { - return false; - } - String principalShortName = getShortName(principal); - List<String> items = Arrays.asList(allowedPrincipals.split("\\s*,\\s*")); - for (String item : items) { - if (comparePrincipals(item, principalShortName)) { - return true; - } - } - return false; - } - - private String getShortName(String principal) { - String parts[] = SaslRpcServer.splitKerberosName(principal); - return parts[0]; - } - - @Override - public void handle(Callback[] callbacks) - throws UnsupportedCallbackException, ConnectionDeniedException { - AuthorizeCallback ac = null; - for (Callback callback : callbacks) { - if (callback instanceof AuthorizeCallback) { - ac = (AuthorizeCallback) callback; - } else { - throw new UnsupportedCallbackException(callback, - "Unrecognized SASL GSSAPI Callback"); - } - } - if (ac != null) { - String authid = ac.getAuthenticationID(); - String authzid = ac.getAuthorizationID(); - - if (allowConnect(authid)) { - if (authid.equals(authzid)) { - ac.setAuthorized(true); - } else { - ac.setAuthorized(false); - } - if (ac.isAuthorized()) { - ac.setAuthorizedID(authzid); - } - } else { - throw new ConnectionDeniedException(ac, - "Connection to sentry service denied due to lack of client credentials", - authid); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HMSClient.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HMSClient.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HMSClient.java deleted file mode 100644 index 7831430..0000000 --- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HMSClient.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.service.thrift; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; - -/** - * AutoCloseable wrapper around HiveMetaStoreClient. - * It is only used to provide try-with-resource semantics for - * {@link HiveMetaStoreClient}. - */ -public class HMSClient implements AutoCloseable { - private final HiveMetaStoreClient client; - private boolean valid; - - public HMSClient(HiveMetaStoreClient client) { - this.client = Preconditions.checkNotNull(client); - valid = true; - } - - public HiveMetaStoreClient getClient() { - return client; - } - - public void invalidate() { - if (valid) { - client.close(); - valid = false; - } - } - - @Override - public void close() { - if (valid) { - client.close(); - valid = false; - } - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HMSFollowerState.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HMSFollowerState.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HMSFollowerState.java deleted file mode 100644 index 74268f7..0000000 --- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HMSFollowerState.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with the License. You may obtain - * a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.sentry.service.thrift; - -/** - * States for the HMSFollower - */ -public enum HMSFollowerState implements SentryState { - /** - * If the HMSFollower has been started or not. - */ - STARTED, - - /** - * If the HMSFollower is connected to the HMS - */ - CONNECTED; - - /** - * The component name this state is for. - */ - public static final String COMPONENT = "HMSFollower"; - - /** - * {@inheritDoc} - */ - @Override - public long getValue() { - return 1 << this.ordinal(); - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java deleted file mode 100644 index 62542c3..0000000 --- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.service.thrift; - -import org.apache.hadoop.hive.metastore.api.MetaException; - -import java.io.IOException; - -public interface HiveConnectionFactory extends AutoCloseable { - /** - * Open a new connection to HMS. - * - * @return connection to HMS. - * @throws IOException if connection establishement failed. - * @throws InterruptedException if connection establishment was interrupted. - * @throws MetaException if connection establishement failed. - */ - HMSClient connect() throws IOException, InterruptedException, MetaException; -} http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java deleted file mode 100644 index 93cc34f..0000000 --- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - <p> - http://www.apache.org/licenses/LICENSE-2.0 - <p> - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ - -package org.apache.sentry.service.thrift; - -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; -import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; -import org.apache.sentry.hdfs.UniquePathsUpdate; -import org.apache.sentry.provider.db.service.persistent.SentryStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Helper class used to fetch Hive MetaStore notifications. - */ -public final class HiveNotificationFetcher implements AutoCloseable { - private static final Logger LOGGER = LoggerFactory.getLogger(HiveNotificationFetcher.class); - - private final SentryStore sentryStore; - private final HiveConnectionFactory hmsConnectionFactory; - private HiveMetaStoreClient hmsClient; - - /* The following cache and last filtered ID help us to avoid making less calls to the DB */ - private long lastIdFiltered = 0; - private Set<String> cache = new HashSet<>(); - - public HiveNotificationFetcher(SentryStore sentryStore, HiveConnectionFactory hmsConnectionFactory) { - this.sentryStore = sentryStore; - this.hmsConnectionFactory = hmsConnectionFactory; - } - - /** - * Fetch new HMS notifications appeared since a specified event ID. The returned list may - * include notifications with the same specified ID if they were not seen by Sentry. - * - * @param lastEventId The event ID to use to request notifications. - * @return A list of newer notifications unseen by Sentry. - * @throws Exception If an error occurs on the HMS communication. - */ - public List<NotificationEvent> fetchNotifications(long lastEventId) throws Exception { - return fetchNotifications(lastEventId, Integer.MAX_VALUE); - } - - /** - * Fetch new HMS notifications appeared since a specified event ID. The returned list may - * include notifications with the same specified ID if they were not seen by Sentry. - * - * @param lastEventId The event ID to use to request notifications. - * @param maxEvents The maximum number of events to fetch. - * @return A list of newer notifications unseen by Sentry. - * @throws Exception If an error occurs on the HMS communication. - */ - List<NotificationEvent> fetchNotifications(long lastEventId, int maxEvents) throws Exception { - NotificationFilter filter = null; - - /* - * HMS may bring duplicated events that were committed later than the previous request. To bring - * those newer duplicated events, we request new notifications from the last seen ID - 1. - * - * A current problem is that we could miss duplicates committed much more later, but because - * HMS does not guarantee the order of those, then it is safer to avoid processing them. - * - * TODO: We can avoid doing this once HIVE-16886 is fixed. - */ - if (lastEventId > 0) { - filter = createNotificationFilterFor(lastEventId); - lastEventId--; - } - - LOGGER.debug("Requesting HMS notifications since ID = {}", lastEventId); - - NotificationEventResponse response; - try { - response = getHmsClient().getNextNotification(lastEventId, maxEvents, filter); - } catch (Exception e) { - close(); - throw e; - } - - if (response != null && response.isSetEvents()) { - LOGGER.debug("Fetched {} new HMS notification(s)", response.getEventsSize()); - return response.getEvents(); - } - - return Collections.emptyList(); - } - - /** - * Returns a HMS notification filter for a specific notification ID. HMS notifications may - * have duplicated IDs, so the filter uses a SHA-1 hash to check for a unique notification. - * - * @param id the notification ID to filter - * @return the HMS notification filter - */ - private NotificationFilter createNotificationFilterFor(final long id) { - /* - * A SHA-1 hex value that keeps unique notifications processed is persisted on the Sentry DB. - * To keep unnecessary calls to the DB, we use a cache that keeps seen hashes of the - * specified ID. If a new filter ID is used, then we clean up the cache. - */ - - if (lastIdFiltered != id) { - lastIdFiltered = id; - cache.clear(); - } - - return new NotificationFilter() { - @Override - public boolean accept(NotificationEvent notificationEvent) { - if (notificationEvent.getEventId() == id) { - String hash = UniquePathsUpdate.sha1(notificationEvent); - - try { - if (cache.contains(hash) || sentryStore.isNotificationProcessed(hash)) { - cache.add(hash); - - LOGGER.debug("Ignoring HMS notification already processed: ID = {}", id); - return false; - } - } catch (Exception e) { - LOGGER.error("An error occurred while checking if notification {} is already " - + "processed: {}", id, e.getMessage()); - - // We cannot throw an exception on this filter, so we return false assuming this - // notification is already processed - return false; - } - } - - return true; - } - }; - } - - /** - * Gets the HMS client connection object. - * If will create a new connection if no connection object exists. - * - * @return The HMS client used to communication with the Hive MetaStore. - * @throws Exception If it cannot connect to the HMS service. - */ - private HiveMetaStoreClient getHmsClient() throws Exception { - if (hmsClient == null) { - try { - hmsClient = hmsConnectionFactory.connect().getClient(); - } catch (Exception e) { - LOGGER.error("Fail to connect to the HMS service: {}", e.getMessage()); - throw e; - } - } - - return hmsClient; - } - - /** - * @return the latest notification Id logged by the HMS - * @throws Exception when an error occurs when talking to the HMS client - */ - public long getCurrentNotificationId() throws Exception { - CurrentNotificationEventId eventId; - try { - eventId = getHmsClient().getCurrentNotificationEventId(); - } catch (Exception e) { - close(); - throw e; - } - - if (eventId != null && eventId.isSetEventId()) { - return eventId.getEventId(); - } - - return SentryStore.EMPTY_NOTIFICATION_ID; - } - - /* AutoCloseable implementations */ - - @Override - public void close() { - try { - if (hmsClient != null) { - hmsClient.close(); - } - - cache.clear(); - } finally { - hmsClient = null; - } - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java deleted file mode 100644 index 31e58fd..0000000 --- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.service.thrift; - -import com.google.common.base.Preconditions; - -import java.io.File; -import java.io.IOException; -import java.security.PrivilegedExceptionAction; -import javax.security.auth.login.LoginException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SaslRpcServer; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; - -import org.apache.sentry.service.common.ServiceConstants.ServerConfig; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Factory used to generate Hive connections. - * Supports insecure and Kerberos connections. - * For Kerberos connections starts the ticket renewal thread and sets - * up Kerberos credentials. - */ -public final class HiveSimpleConnectionFactory implements HiveConnectionFactory { - private static final Logger LOGGER = LoggerFactory.getLogger(HiveSimpleConnectionFactory.class); - - /** Sentty configuration */ - private final Configuration conf; - /** Hive configuration */ - private final HiveConf hiveConf; - private final boolean insecure; - private SentryKerberosContext kerberosContext = null; - - public HiveSimpleConnectionFactory(Configuration sentryConf, HiveConf hiveConf) { - this.conf = sentryConf; - this.hiveConf = hiveConf; - insecure = !ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase( - sentryConf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_NONE).trim()); - } - - /** - * Initialize the Factory. - * For insecure connections there is nothing to initialize. - * For Kerberos connections sets up ticket renewal thread. - * @throws IOException - * @throws LoginException - */ - public void init() throws IOException, LoginException { - if (insecure) { - LOGGER.info("Using insecure connection to HMS"); - return; - } - - int port = conf.getInt(ServerConfig.RPC_PORT, ServerConfig.RPC_PORT_DEFAULT); - String rawPrincipal = Preconditions.checkNotNull(conf.get(ServerConfig.PRINCIPAL), - "%s is required", ServerConfig.PRINCIPAL); - String principal = SecurityUtil.getServerPrincipal(rawPrincipal, NetUtils.createSocketAddr( - conf.get(ServerConfig.RPC_ADDRESS, ServerConfig.RPC_ADDRESS_DEFAULT), - port).getAddress()); - LOGGER.debug("Opening kerberos connection to HMS using kerberos principal {}", principal); - String[] principalParts = SaslRpcServer.splitKerberosName(principal); - Preconditions.checkArgument(principalParts.length == 3, - "Kerberos principal %s should have 3 parts", principal); - String keytab = Preconditions.checkNotNull(conf.get(ServerConfig.KEY_TAB), - "Configuration is missing required %s paraeter", ServerConfig.KEY_TAB); - File keytabFile = new File(keytab); - Preconditions.checkState(keytabFile.isFile() && keytabFile.canRead(), - "Keytab %s does not exist or is not readable", keytab); - // Instantiating SentryKerberosContext in non-server mode handles the ticket renewal. - kerberosContext = new SentryKerberosContext(principal, keytab, false); - UserGroupInformation.setConfiguration(conf); - LOGGER.info("Using secure connection to HMS"); - } - - /** - * Connect to HMS in unsecure mode or in Kerberos mode according to config. - * - * @return HMS connection - * @throws IOException if could not establish connection - * @throws InterruptedException if connection was interrupted - * @throws MetaException if other errors happened - */ - public HMSClient connect() throws IOException, InterruptedException, MetaException { - if (insecure) { - return new HMSClient(new HiveMetaStoreClient(hiveConf)); - } - UserGroupInformation clientUGI = - UserGroupInformation.getUGIFromSubject(kerberosContext.getSubject()); - return new HMSClient(clientUGI.doAs(new PrivilegedExceptionAction<HiveMetaStoreClient>() { - @Override - public HiveMetaStoreClient run() throws MetaException { - return new HiveMetaStoreClient(hiveConf); - } - })); - } - - @Override - public void close() throws Exception { - if (kerberosContext != null) { - kerberosContext.shutDown(); - kerberosContext = null; - } - } -}
