http://git-wip-us.apache.org/repos/asf/sentry/blob/b97f5c7a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStoreSchemaInfo.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStoreSchemaInfo.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStoreSchemaInfo.java new file mode 100644 index 0000000..ce76a46 --- /dev/null +++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStoreSchemaInfo.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.provider.db.service.persistent; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.sentry.core.common.exception.SentryUserException; + +public class SentryStoreSchemaInfo { + private static final String SQL_FILE_EXTENSION = ".sql"; + private static final String UPGRADE_FILE_PREFIX = "upgrade-"; + private static final String INIT_FILE_PREFIX = "sentry-"; + private static final String VERSION_UPGRADE_LIST = "upgrade.order"; + private final String dbType; + private final String sentrySchemaVersions[]; + private final String sentryScriptDir; + + private static final String SENTRY_VERSION = "2.1.0"; + + public SentryStoreSchemaInfo(String sentryScriptDir, String dbType) + throws SentryUserException { + this.sentryScriptDir = sentryScriptDir; + this.dbType = dbType; + // load upgrade order for the given dbType + List<String> upgradeOrderList = new ArrayList<String>(); + String upgradeListFile = getSentryStoreScriptDir() + File.separator + + VERSION_UPGRADE_LIST + "." + dbType; + try (BufferedReader bfReader = new BufferedReader(new FileReader(upgradeListFile))) { + String currSchemaVersion; + while ((currSchemaVersion = bfReader.readLine()) != null) { + upgradeOrderList.add(currSchemaVersion.trim()); + } + } catch (FileNotFoundException e) { + throw new SentryUserException("File " + upgradeListFile + " not found ", e); + } catch (IOException e) { + throw new SentryUserException("Error reading " + upgradeListFile, e); + } + sentrySchemaVersions = upgradeOrderList.toArray(new String[0]); + } + + public String getSentrySchemaVersion() { + return SENTRY_VERSION; + } + + public List<String> getUpgradeScripts(String fromSchemaVer) + throws SentryUserException { + List<String> upgradeScriptList = new ArrayList<String>(); + + // check if we are already at current schema level + if (getSentryVersion().equals(fromSchemaVer)) { + return upgradeScriptList; + } + + // Find the list of scripts to execute for this upgrade + int firstScript = sentrySchemaVersions.length; + for (int i = 0; i < sentrySchemaVersions.length; i++) { + String fromVersion = sentrySchemaVersions[i].split("-to-")[0]; + if (fromVersion.equals(fromSchemaVer)) { + firstScript = i; + break; + } + } + if (firstScript == sentrySchemaVersions.length) { + throw new SentryUserException("Unknown version specified for upgrade " + + fromSchemaVer + " Metastore schema may be too old or newer"); + } + + for (int i = firstScript; i < sentrySchemaVersions.length; i++) { + String scriptFile = generateUpgradeFileName(sentrySchemaVersions[i]); + upgradeScriptList.add(scriptFile); + } + return upgradeScriptList; + } + + /*** + * Get the name of the script to initialize the schema for given version + * + * @param toVersion + * Target version. If it's null, then the current server version is + * used + * @return + * @throws SentryUserException + */ + public String generateInitFileName(String toVersion) + throws SentryUserException { + String version = toVersion; + if (version == null) { + version = getSentryVersion(); + } + String initScriptName = INIT_FILE_PREFIX + dbType + "-" + version + + SQL_FILE_EXTENSION; + // check if the file exists + if (!(new File(getSentryStoreScriptDir() + File.separatorChar + + initScriptName).exists())) { + throw new SentryUserException( + "Unknown version specified for initialization: " + version); + } + return initScriptName; + } + + /** + * Find the directory of sentry store scripts + * + * @return + */ + public String getSentryStoreScriptDir() { + return sentryScriptDir; + } + + // format the upgrade script name eg upgrade-x-y-dbType.sql + private String generateUpgradeFileName(String fileVersion) { + return INIT_FILE_PREFIX + UPGRADE_FILE_PREFIX + dbType + "-" + + fileVersion + SQL_FILE_EXTENSION; + } + + // Current hive version, in majorVersion.minorVersion.changeVersion format + // TODO: store the version using the build script + public static String getSentryVersion() { + return SENTRY_VERSION; + } +}
http://git-wip-us.apache.org/repos/asf/sentry/blob/b97f5c7a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java new file mode 100644 index 0000000..6ad52a3 --- /dev/null +++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionBlock.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.provider.db.service.persistent; + +import javax.jdo.PersistenceManager; + +/** + * TransactionBlock wraps the code that is executed inside a single + * transaction. The {@link #execute(PersistenceManager)} method returns the + * result of the transaction. + */ +@FunctionalInterface +public interface TransactionBlock<T> { + /** + * Execute some code as a single transaction, the code should not start new + * transaction or manipulate transactions with the PersistenceManager. + * + * @param pm PersistenceManager for the current transaction + * @return Object with the result of execute() + * @throws Exception + */ + T execute(PersistenceManager pm) throws Exception; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/b97f5c7a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java new file mode 100644 index 0000000..ba6e845 --- /dev/null +++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.provider.db.service.persistent; + +import com.codahale.metrics.Counter; +import static com.codahale.metrics.MetricRegistry.name; +import com.codahale.metrics.Timer; + +import com.codahale.metrics.Timer.Context; +import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.core.common.exception.SentryUserException; +import org.apache.sentry.service.common.ServiceConstants.ServerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jdo.PersistenceManager; +import javax.jdo.PersistenceManagerFactory; +import javax.jdo.Transaction; + +import org.apache.sentry.api.service.thrift.SentryMetrics; + +import java.util.Random; +import java.util.concurrent.Callable; + +/** + * TransactionManager is used for executing the database transaction, it supports + * the transaction with retry mechanism for the unexpected exceptions, + * except <em>SentryUserExceptions</em>, eg, <em>SentryNoSuchObjectException</em>, + * <em>SentryAlreadyExistsException</em> etc. For <em>SentryUserExceptions</em>, + * will simply throw the exception without retry<p> + * + * The purpose of the class is to separate all transaction housekeeping (opening + * transaction, rolling back failed transactions) from the actual transaction + * business logic.<p> + * + * TransactionManager creates an instance of PersistenceManager for each + * transaction.<p> + * + * TransactionManager exposes several metrics: + * <ul> + * <li>Timer metric for all transactions</li> + * <li>Counter for failed transactions</li> + * <li>Counter for each exception thrown by transaction</li> + * </ul> + */ +@SuppressWarnings("NestedTryStatement") +public final class TransactionManager { + + private static final Logger LOGGER = + LoggerFactory.getLogger(TransactionManager.class); + + /** Random number generator for exponential backoff */ + private static final Random random = new Random(); + + private final PersistenceManagerFactory pmf; + + // Maximum number of retries per call + private final int transactionRetryMax; + + // Delay (in milliseconds) between retries + private final int retryWaitTimeMills; + + /** Name for metrics */ + private static final String TRANSACTIONS = "transactions"; + + // Transaction timer measures time distribution for all transactions + private final Timer transactionTimer = + SentryMetrics.getInstance(). + getTimer(name(TransactionManager.class, + TRANSACTIONS)); + + // Counter for failed transactions + private final Counter failedTransactionsCount = + SentryMetrics.getInstance(). + getCounter(name(TransactionManager.class, + TRANSACTIONS, "failed")); + + private final Counter retryCount = + SentryMetrics.getInstance().getCounter(name(TransactionManager.class, + TRANSACTIONS, "retry")); + + TransactionManager(PersistenceManagerFactory pmf, Configuration conf) { + this.pmf = pmf; + transactionRetryMax = conf.getInt( + ServerConfig.SENTRY_STORE_TRANSACTION_RETRY, + ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_DEFAULT); + retryWaitTimeMills = conf.getInt( + ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_WAIT_TIME_MILLIS, + ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_WAIT_TIME_MILLIS_DEFAULT); + } + + + /** + * Execute some code as a single transaction, the code in tb.execute() + * should not start new transaction or manipulate transactions with the + * PersistenceManager. + * + * @param tb transaction block with code to be executed + * @return Object with the result of tb.execute() + */ + public <T> T executeTransaction(TransactionBlock<T> tb) throws Exception { + try (Context context = transactionTimer.time(); + PersistenceManager pm = pmf.getPersistenceManager()) { + Transaction transaction = pm.currentTransaction(); + transaction.begin(); + try { + T result = tb.execute(pm); + transaction.commit(); + return result; + } catch (Exception e) { + // Count total failed transactions + failedTransactionsCount.inc(); + // Count specific exceptions + SentryMetrics.getInstance().getCounter(name(TransactionManager.class, + "exception", e.getClass().getSimpleName())).inc(); + // Re-throw the exception + throw e; + } finally { + if (transaction.isActive()) { + transaction.rollback(); + } + } + } + } + + /** + * Execute a list of TransactionBlock code as a single transaction. + * The code in tb.execute() should not start new transaction or + * manipulate transactions with the PersistenceManager. It returns + * the result of the last transaction block execution. + * + * @param tbs transaction blocks with code to be executed + * @return the result of the last result of tb.execute() + */ + private <T> T executeTransaction(Iterable<TransactionBlock<T>> tbs) throws Exception { + try (Context context = transactionTimer.time(); + PersistenceManager pm = pmf.getPersistenceManager()) { + Transaction transaction = pm.currentTransaction(); + transaction.begin(); + try { + T result = null; + for (TransactionBlock<T> tb : tbs) { + result = tb.execute(pm); + } + transaction.commit(); + return result; + } catch (Exception e) { + // Count total failed transactions + failedTransactionsCount.inc(); + // Count specific exceptions + SentryMetrics.getInstance().getCounter(name(TransactionManager.class, + "exception", e.getClass().getSimpleName())).inc(); + // Re-throw the exception + throw e; + } finally { + if (transaction.isActive()) { + transaction.rollback(); + } + } + } + } + + /** + * Execute some code as a single transaction with retry mechanism. + * + * @param tb transaction block with code to execute + * @return Object with the result of tb.execute() + */ + @SuppressWarnings("squid:S00112") + public <T> T executeTransactionWithRetry(final TransactionBlock<T> tb) + throws Exception { + return new ExponentialBackoff().execute( + new Callable<T>() { + @Override + public T call() throws Exception { + return executeTransaction(tb); + } + } + ); + } + + /** + * Execute a list of TransactionBlock code as a single transaction. + * If any of the TransactionBlock fail, all the TransactionBlocks would + * retry. It returns the result of the last transaction block + * execution. + * + * @param tbs a list of transaction blocks with code to be executed. + */ + @SuppressWarnings("squid:S00112") + <T> void executeTransactionBlocksWithRetry(final Iterable<TransactionBlock<T>> tbs) + throws Exception { + new ExponentialBackoff().execute( + new Callable<T>() { + @Override + public T call() throws Exception { + return executeTransaction(tbs); + } + } + ); + } + + /** + * Implementation of exponential backoff with random fuzziness. + * On each iteration the backoff time is 1.5 the previous amount plus the + * random fuzziness factor which is up to half of the previous amount. + */ + private class ExponentialBackoff { + + @SuppressWarnings("squid:S00112") + <T> T execute(Callable<T> arg) throws Exception { + Exception ex = null; + long sleepTime = retryWaitTimeMills; + + for (int retryNum = 1; retryNum <= transactionRetryMax; retryNum++) { + try { + return arg.call(); + } catch (SentryUserException e) { + // throw the sentry exception without retry + LOGGER.warn("Transaction manager encountered non-retriable exception", e); + throw e; + } catch (Exception e) { + ex = e; + retryCount.inc(); + LOGGER.warn("Transaction execution encountered exception", e); + LOGGER.warn("Retrying transaction {}/{} times", + retryNum, transactionRetryMax); + // Introduce some randomness in the backoff time. + LOGGER.warn("Sleeping for {} milliseconds before retrying", sleepTime); + Thread.sleep(sleepTime); + int fuzz = random.nextInt((int)sleepTime / 2); + sleepTime *= 3; + sleepTime /= 2; + sleepTime += fuzz; + } + } + assert(ex != null); + String message = "The transaction has reached max retry number, " + + ex.getMessage(); + LOGGER.error(message, ex); + throw new Exception(message, ex); + } + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/b97f5c7a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java new file mode 100644 index 0000000..992d8ab --- /dev/null +++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java @@ -0,0 +1,524 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.service.thrift; + +import com.codahale.metrics.Counter; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.sentry.hdfs.PathsUpdate; +import org.apache.sentry.hdfs.SentryMalformedPathException; +import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; +import org.apache.sentry.api.service.thrift.SentryMetrics; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import static com.codahale.metrics.MetricRegistry.name; + +/** + * Manage fetching full snapshot from HMS. + * Snapshot is represented as a map from the hive object name to + * the set of paths for this object. + * The hive object name is either the Hive database name or + * Hive database name joined with Hive table name as {@code dbName.tableName}. + * All table partitions are stored under the table object. + * <p> + * Once {@link FullUpdateInitializer}, the {@link FullUpdateInitializer#getFullHMSSnapshot()} + * method should be called to get the initial update. + * <p> + * It is important to close the {@link FullUpdateInitializer} object to prevent resource + * leaks. + * <p> + * The usual way of using {@link FullUpdateInitializer} is + * <pre> + * {@code + * try (FullUpdateInitializer updateInitializer = + * new FullUpdateInitializer(clientFactory, authzConf)) { + * Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot(); + * return pathsUpdate; + * } + */ +public final class FullUpdateInitializer implements AutoCloseable { + + /* + * Implementation note. + * + * The snapshot is obtained using an executor. We follow the map/reduce model. + * Each executor thread (mapper) obtains and returns a partial snapshot which are then + * reduced to a single combined snapshot by getFullHMSSnapshot(). + * + * Synchronization between the getFullHMSSnapshot() and executors is done using the + * 'results' queue. The queue holds the futures for each scheduled task. + * It is initially populated by getFullHMSSnapshot and each task may add new future + * results to it. Only getFullHMSSnapshot() removes entries from the results queue. + * This guarantees that once the results queue is empty there are no pending jobs. + * + * Since there are no other data sharing, the implementation is safe without + * any other synchronization. It is not thread-safe for concurrent calls + * to getFullHMSSnapshot(). + * + */ + + + private static final String FULL_UPDATE_INITIALIZER_THREAD_NAME = "hms-fetch-%d"; + private final ExecutorService threadPool; + private final int maxPartitionsPerCall; + private final int maxTablesPerCall; + private final Deque<Future<CallResult>> results = new ConcurrentLinkedDeque<>(); + private final int maxRetries; + private final int waitDurationMillis; + + private static final Logger LOGGER = LoggerFactory.getLogger(FullUpdateInitializer.class); + + private static final ObjectMapping emptyObjectMapping = + new ObjectMapping(Collections.<String, Set<String>>emptyMap()); + private final HiveConnectionFactory clientFactory; + + /** Total number of database objects */ + private final Counter databaseCount = SentryMetrics.getInstance() + .getCounter(name(FullUpdateInitializer.class, "total", "db")); + + /** Total number of table objects */ + private final Counter tableCount = SentryMetrics.getInstance() + .getCounter(name(FullUpdateInitializer.class, "total", "tables")); + + /** Total number of partition objects */ + private final Counter partitionCount = SentryMetrics.getInstance() + .getCounter(name(FullUpdateInitializer.class, "total", "partitions")); + + /** + * Extract path (not starting with "/") from the full URI + * @param uri - resource URI (usually with scheme) + * @return path if uri is valid or null + */ + static String pathFromURI(String uri) { + try { + return PathsUpdate.parsePath(uri); + } catch (SentryMalformedPathException e) { + LOGGER.warn(String.format("Ignoring invalid uri %s: %s", + uri, e.getReason())); + return null; + } + } + + /** + * Mapping of object to set of paths. + * Used to represent partial results from executor threads. Multiple + * ObjectMapping objects are combined in a single mapping + * to get the final result. + */ + private static final class ObjectMapping { + private final Map<String, Set<String>> objects; + + ObjectMapping(Map<String, Set<String>> objects) { + this.objects = objects; + } + + ObjectMapping(String authObject, String path) { + Set<String> values = Collections.singleton(safeIntern(path)); + objects = ImmutableMap.of(authObject, values); + } + + ObjectMapping(String authObject, Collection<String> paths) { + Set<String> values = new HashSet<>(paths); + objects = ImmutableMap.of(authObject, values); + } + + Map<String, Set<String>> getObjects() { + return objects; + } + } + + private static final class CallResult { + private final Exception failure; + private final boolean successStatus; + private final ObjectMapping objectMapping; + + CallResult(Exception ex) { + failure = ex; + successStatus = false; + objectMapping = emptyObjectMapping; + } + + CallResult(ObjectMapping objectMapping) { + failure = null; + successStatus = true; + this.objectMapping = objectMapping; + } + + boolean success() { + return successStatus; + } + + ObjectMapping getObjectMapping() { + return objectMapping; + } + + public Exception getFailure() { + return failure; + } + } + + private abstract class BaseTask implements Callable<CallResult> { + + /** + * Class represents retry strategy for BaseTask. + */ + private final class RetryStrategy { + private int retryStrategyMaxRetries = 0; + private final int retryStrategyWaitDurationMillis; + + private RetryStrategy(int retryStrategyMaxRetries, int retryStrategyWaitDurationMillis) { + this.retryStrategyMaxRetries = retryStrategyMaxRetries; + + // Assign default wait duration if negative value is provided. + this.retryStrategyWaitDurationMillis = (retryStrategyWaitDurationMillis > 0) ? + retryStrategyWaitDurationMillis : 1000; + } + + @SuppressWarnings({"squid:S1141", "squid:S2142"}) + public CallResult exec() { + // Retry logic is happening inside callable/task to avoid + // synchronous waiting on getting the result. + // Retry the failure task until reach the max retry number. + // Wait configurable duration for next retry. + // + // Only thrift exceptions are retried. + // Other exceptions are propagated up the stack. + Exception exception = null; + try { + // We catch all exceptions except Thrift exceptions which are retried + for (int i = 0; i < retryStrategyMaxRetries; i++) { + //noinspection NestedTryStatement + try { + return new CallResult(doTask()); + } catch (TException ex) { + LOGGER.debug("Failed to execute task on " + (i + 1) + " attempts." + + " Sleeping for " + retryStrategyWaitDurationMillis + " ms. Exception: " + + ex.toString(), ex); + exception = ex; + + try { + Thread.sleep(retryStrategyWaitDurationMillis); + } catch (InterruptedException ignored) { + // Skip the rest retries if get InterruptedException. + // And set the corresponding retries number. + LOGGER.warn("Interrupted during update fetch during iteration " + (i + 1)); + break; + } + } + } + } catch (Exception ex) { + exception = ex; + } + LOGGER.error("Failed to execute task", exception); + // We will fail in the end, so we are shutting down the pool to prevent + // new tasks from being scheduled. + threadPool.shutdown(); + return new CallResult(exception); + } + } + + private final RetryStrategy retryStrategy; + + BaseTask() { + retryStrategy = new RetryStrategy(maxRetries, waitDurationMillis); + } + + @Override + public CallResult call() throws Exception { + return retryStrategy.exec(); + } + + abstract ObjectMapping doTask() throws Exception; + } + + private class PartitionTask extends BaseTask { + private final String dbName; + private final String tblName; + private final String authName; + private final List<String> partNames; + + PartitionTask(String dbName, String tblName, String authName, + List<String> partNames) { + this.dbName = safeIntern(dbName); + this.tblName = safeIntern(tblName); + this.authName = safeIntern(authName); + this.partNames = partNames; + } + + @Override + ObjectMapping doTask() throws Exception { + List<Partition> tblParts; + HMSClient c = null; + try (HMSClient client = clientFactory.connect()) { + c = client; + tblParts = client.getClient().getPartitionsByNames(dbName, tblName, partNames); + } catch (Exception e) { + if (c != null) { + c.invalidate(); + } + throw e; + } + + LOGGER.debug("Fetched partitions for db = {}, table = {}", + dbName, tblName); + + Collection<String> partitionNames = new ArrayList<>(tblParts.size()); + for (Partition part : tblParts) { + String partPath = pathFromURI(part.getSd().getLocation()); + if (partPath != null) { + partitionNames.add(partPath.intern()); + } + } + return new ObjectMapping(authName, partitionNames); + } + } + + private class TableTask extends BaseTask { + private final String dbName; + private final List<String> tableNames; + + TableTask(Database db, List<String> tableNames) { + dbName = safeIntern(db.getName()); + this.tableNames = tableNames; + } + + @Override + @SuppressWarnings({"squid:S2629", "squid:S135"}) + ObjectMapping doTask() throws Exception { + HMSClient c = null; + try (HMSClient client = clientFactory.connect()) { + c = client; + List<Table> tables = client.getClient().getTableObjectsByName(dbName, tableNames); + + LOGGER.debug("Fetching tables for db = {}, tables = {}", dbName, tableNames); + + Map<String, Set<String>> objectMapping = new HashMap<>(tables.size()); + for (Table tbl : tables) { + // Table names are case insensitive + if (!tbl.getDbName().equalsIgnoreCase(dbName)) { + // Inconsistency in HMS data + LOGGER.warn(String.format("DB name %s for table %s does not match %s", + tbl.getDbName(), tbl.getTableName(), dbName)); + continue; + } + + String tableName = safeIntern(tbl.getTableName().toLowerCase()); + String authzObject = (dbName + "." + tableName).intern(); + List<String> tblPartNames = + client.getClient().listPartitionNames(dbName, tableName, (short) -1); + // Count total number of partitions + partitionCount.inc(tblPartNames.size()); + for (int i = 0; i < tblPartNames.size(); i += maxPartitionsPerCall) { + List<String> partsToFetch = tblPartNames.subList(i, + Math.min(i + maxPartitionsPerCall, tblPartNames.size())); + Callable<CallResult> partTask = new PartitionTask(dbName, + tableName, authzObject, partsToFetch); + results.add(threadPool.submit(partTask)); + } + String tblPath = safeIntern(pathFromURI(tbl.getSd().getLocation())); + if (tblPath == null) { + continue; + } + Set<String> paths = objectMapping.get(authzObject); + if (paths == null) { + paths = new HashSet<>(1); + objectMapping.put(authzObject, paths); + } + paths.add(tblPath); + } + return new ObjectMapping(Collections.unmodifiableMap(objectMapping)); + } catch (Exception e) { + if (c != null) { + c.invalidate(); + } + throw e; + } + } + } + + private class DbTask extends BaseTask { + + private final String dbName; + + DbTask(String dbName) { + //Database names are case insensitive + this.dbName = safeIntern(dbName.toLowerCase()); + databaseCount.inc(); + } + + @Override + ObjectMapping doTask() throws Exception { + HMSClient c = null; + try (HMSClient client = clientFactory.connect()) { + c = client; + Database db = client.getClient().getDatabase(dbName); + if (!dbName.equalsIgnoreCase(db.getName())) { + LOGGER.warn("Database name {} does not match {}", db.getName(), dbName); + return emptyObjectMapping; + } + List<String> allTblStr = client.getClient().getAllTables(dbName); + // Count total number of tables + tableCount.inc(allTblStr.size()); + for (int i = 0; i < allTblStr.size(); i += maxTablesPerCall) { + List<String> tablesToFetch = allTblStr.subList(i, + Math.min(i + maxTablesPerCall, allTblStr.size())); + Callable<CallResult> tableTask = new TableTask(db, tablesToFetch); + results.add(threadPool.submit(tableTask)); + } + String dbPath = safeIntern(pathFromURI(db.getLocationUri())); + return (dbPath != null) ? new ObjectMapping(dbName, dbPath) : + emptyObjectMapping; + } catch (Exception e) { + if (c != null) { + c.invalidate(); + } + throw e; + } + } + } + + FullUpdateInitializer(HiveConnectionFactory clientFactory, Configuration conf) { + this.clientFactory = clientFactory; + maxPartitionsPerCall = conf.getInt( + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC, + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT); + maxTablesPerCall = conf.getInt( + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC, + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT); + maxRetries = conf.getInt( + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM, + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM_DEFAULT); + waitDurationMillis = conf.getInt( + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS, + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT); + + ThreadFactory fullUpdateInitThreadFactory = new ThreadFactoryBuilder() + .setNameFormat(FULL_UPDATE_INITIALIZER_THREAD_NAME) + .setDaemon(false) + .build(); + threadPool = Executors.newFixedThreadPool(conf.getInt( + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS, + ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT), + fullUpdateInitThreadFactory); + } + + /** + * Get Full HMS snapshot. + * @return Full snapshot of HMS objects. + * @throws TException if Thrift error occured + * @throws ExecutionException if there was a scheduling error + * @throws InterruptedException if processing was interrupted + */ + @SuppressWarnings("squid:S00112") + Map<String, Collection<String>> getFullHMSSnapshot() throws Exception { + // Get list of all HMS databases + List<String> allDbStr; + HMSClient c = null; + try (HMSClient client = clientFactory.connect()) { + c = client; + allDbStr = client.getClient().getAllDatabases(); + } catch (Exception e) { + if (c != null) { + c.invalidate(); + } + throw e; + } + + // Schedule async task for each database responsible for fetching per-database + // objects. + for (String dbName : allDbStr) { + results.add(threadPool.submit(new DbTask(dbName))); + } + + // Resulting full snapshot + Map<String, Collection<String>> fullSnapshot = new HashMap<>(); + + // As async tasks complete, merge their results into full snapshot. + while (!results.isEmpty()) { + // This is the only thread that takes elements off the results list - all other threads + // only add to it. Once the list is empty it can't become non-empty + // This means that if we check that results is non-empty we can safely call pop() and + // know that the result of poll() is not null. + Future<CallResult> result = results.pop(); + // Wait for the task to complete + CallResult callResult = result.get(); + // Fail if we got errors + if (!callResult.success()) { + throw callResult.getFailure(); + } + // Merge values into fullUpdate + Map<String, Set<String>> objectMapping = + callResult.getObjectMapping().getObjects(); + for (Map.Entry<String, Set<String>> entry: objectMapping.entrySet()) { + String key = entry.getKey(); + Set<String> val = entry.getValue(); + Set<String> existingSet = (Set<String>)fullSnapshot.get(key); + if (existingSet == null) { + fullSnapshot.put(key, val); + continue; + } + existingSet.addAll(val); + } + } + return fullSnapshot; + } + + @Override + public void close() { + threadPool.shutdownNow(); + try { + threadPool.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException ignored) { + LOGGER.warn("Interrupted shutdown"); + Thread.currentThread().interrupt(); + } + } + + /** + * Intern a string but only if it is not null + * @param arg String to be interned, may be null + * @return interned string or null + */ + static String safeIntern(String arg) { + return (arg != null) ? arg.intern() : null; + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/b97f5c7a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateModifier.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateModifier.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateModifier.java new file mode 100644 index 0000000..c30d982 --- /dev/null +++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateModifier.java @@ -0,0 +1,606 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.sentry.service.thrift; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; +import org.apache.hadoop.hive.metastore.messaging.EventMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAddPartitionMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterPartitionMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterTableMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateDatabaseMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateTableMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropDatabaseMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropPartitionMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropTableMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Apply newer events to the full update. + * + * <p>The process of obtaining ful snapshot from HMS is not atomic. + * While we read information from HMS it may change - some new objects can be created, + * or some can be removed or modified. This class is used to reconsile changes to + * the full snapshot. + */ +final class FullUpdateModifier { + private static final Logger LOGGER = LoggerFactory.getLogger(FullUpdateModifier.class); + + // Prevent creation of class instances + private FullUpdateModifier() { + } + + /** + * Take a full snapshot and apply an MS event to it. + * + * <p>We pass serializer as a parameter to simplify testing. + * + * @param image Full snapshot + * @param event HMS notificatin event + * @param deserializer Message deserializer - + * should produce Sentry JSON serializer type messages. + */ + // NOTE: we pass deserializer here instead of using built-in one to simplify testing. + // Tests use mock serializers and thus we do not have to construct proper events. + static void applyEvent(Map<String, Collection<String>> image, NotificationEvent event, + MessageDeserializer deserializer) { + EventMessage.EventType eventType = + EventMessage.EventType.valueOf(event.getEventType()); + + switch (eventType) { + case CREATE_DATABASE: + createDatabase(image, event, deserializer); + break; + case DROP_DATABASE: + dropDatabase(image, event, deserializer); + break; + case CREATE_TABLE: + createTable(image, event, deserializer); + break; + case DROP_TABLE: + dropTable(image, event, deserializer); + break; + case ALTER_TABLE: + alterTable(image, event, deserializer); + break; + case ADD_PARTITION: + addPartition(image, event, deserializer); + break; + case DROP_PARTITION: + dropPartition(image, event, deserializer); + break; + case ALTER_PARTITION: + alterPartition(image, event, deserializer); + break; + default: + LOGGER.error("Notification with ID:{} has invalid event type: {}", event.getEventId(), + event.getEventType()); + break; + } + } + + /** + * Add mapping from the new database name to location {dbname: {location}}. + */ + private static void createDatabase(Map<String, Collection<String>> image, NotificationEvent event, + MessageDeserializer deserializer) { + SentryJSONCreateDatabaseMessage message = + (SentryJSONCreateDatabaseMessage) deserializer + .getCreateDatabaseMessage(event.getMessage()); + + String dbName = message.getDB(); + if ((dbName == null) || dbName.isEmpty()) { + LOGGER.error("Create database event is missing database name"); + return; + } + dbName = dbName.toLowerCase(); + + String location = message.getLocation(); + if ((location == null) || location.isEmpty()) { + LOGGER.error("Create database event is missing database location"); + return; + } + + String path = FullUpdateInitializer.pathFromURI(location); + if (path == null) { + return; + } + + // Add new database if it doesn't exist yet + if (!image.containsKey(dbName)) { + LOGGER.debug("create database {} with location {}", dbName, location); + image.put(dbName.intern(), Collections.singleton(path)); + } else { + // Sanity check the information and print warnings if database exists but + // with a different location + Set<String> oldLocations = (Set<String>)image.get(dbName); + LOGGER.debug("database {} already exists, ignored", dbName); + if (!oldLocations.contains(location)) { + LOGGER.warn("database {} exists but location is different from {}", dbName, location); + } + } + } + + /** + * Remove a mapping from database name and remove all mappings which look like dbName.tableName + * where dbName matches database name. + */ + private static void dropDatabase(Map<String, Collection<String>> image, NotificationEvent event, + MessageDeserializer deserializer) { + SentryJSONDropDatabaseMessage message = + (SentryJSONDropDatabaseMessage) deserializer.getDropDatabaseMessage(event.getMessage()); + + String dbName = message.getDB(); + if ((dbName == null) || dbName.isEmpty()) { + LOGGER.error("Drop database event is missing database name"); + return; + } + dbName = dbName.toLowerCase(); + String location = message.getLocation(); + if ((location == null) || location.isEmpty()) { + LOGGER.error("Drop database event is missing database location"); + return; + } + + String path = FullUpdateInitializer.pathFromURI(location); + if (path == null) { + return; + } + + // If the database is alreday deleted, we have nothing to do + Set<String> locations = (Set<String>)image.get(dbName); + if (locations == null) { + LOGGER.debug("database {} is already deleted", dbName); + return; + } + + if (!locations.contains(path)) { + LOGGER.warn("Database {} location does not match {}", dbName, path); + return; + } + + LOGGER.debug("drop database {} with location {}", dbName, location); + + // Drop information about the database + image.remove(dbName); + + String dbPrefix = dbName + "."; + + // Remove all objects for this database + for (Iterator<Map.Entry<String, Collection<String>>> it = image.entrySet().iterator(); + it.hasNext(); ) { + Map.Entry<String, Collection<String>> entry = it.next(); + String key = entry.getKey(); + if (key.startsWith(dbPrefix)) { + LOGGER.debug("Removing {}", key); + it.remove(); + } + } + } + + /** + * Add mapping for dbName.tableName. + */ + private static void createTable(Map<String, Collection<String>> image, NotificationEvent event, + MessageDeserializer deserializer) { + SentryJSONCreateTableMessage message = (SentryJSONCreateTableMessage) deserializer + .getCreateTableMessage(event.getMessage()); + + String dbName = message.getDB(); + if ((dbName == null) || dbName.isEmpty()) { + LOGGER.error("Create table event is missing database name"); + return; + } + String tableName = message.getTable(); + if ((tableName == null) || tableName.isEmpty()) { + LOGGER.error("Create table event is missing table name"); + return; + } + + String location = message.getLocation(); + if ((location == null) || location.isEmpty()) { + LOGGER.error("Create table event is missing table location"); + return; + } + + String path = FullUpdateInitializer.pathFromURI(location); + if (path == null) { + return; + } + + String authName = dbName.toLowerCase() + "." + tableName.toLowerCase(); + // Add new table if it doesn't exist yet + if (!image.containsKey(authName)) { + LOGGER.debug("create table {} with location {}", authName, location); + Set<String> locations = new HashSet<>(1); + locations.add(path); + image.put(authName.intern(), locations); + } else { + // Sanity check the information and print warnings if table exists but + // with a different location + Set<String> oldLocations = (Set<String>)image.get(authName); + LOGGER.debug("Table {} already exists, ignored", authName); + if (!oldLocations.contains(location)) { + LOGGER.warn("Table {} exists but location is different from {}", authName, location); + } + } + } + + /** + * Drop mapping from dbName.tableName + */ + private static void dropTable(Map<String, Collection<String>> image, NotificationEvent event, + MessageDeserializer deserializer) { + SentryJSONDropTableMessage message = (SentryJSONDropTableMessage) deserializer + .getDropTableMessage(event.getMessage()); + + String dbName = message.getDB(); + if ((dbName == null) || dbName.isEmpty()) { + LOGGER.error("Drop table event is missing database name"); + return; + } + String tableName = message.getTable(); + if ((tableName == null) || tableName.isEmpty()) { + LOGGER.error("Drop table event is missing table name"); + return; + } + + String location = message.getLocation(); + if ((location == null) || location.isEmpty()) { + LOGGER.error("Drop table event is missing table location"); + return; + } + + String path = FullUpdateInitializer.pathFromURI(location); + if (path == null) { + return; + } + + String authName = dbName.toLowerCase() + "." + tableName.toLowerCase(); + Set<String> locations = (Set<String>)image.get(authName); + if (locations != null && locations.contains(path)) { + LOGGER.debug("Removing {}", authName); + image.remove(authName); + } else { + LOGGER.warn("can't find matching table {} with location {}", authName, location); + } + } + + /** + * ALTER TABLE is a complicated function that can alter multiple things. + * + * <p>We take care iof the following cases: + * <ul> + * <li>Change database name. This is the most complicated one. + * We need to change the actual database name and change all mappings + * that look like "dbName.tableName" to the new dbName</li> + * <li>Change table name</li> + * <li>Change location</li> + * </ul> + * + */ + private static void alterTable(Map<String, Collection<String>> image, NotificationEvent event, + MessageDeserializer deserializer) { + SentryJSONAlterTableMessage message = + (SentryJSONAlterTableMessage) deserializer.getAlterTableMessage(event.getMessage()); + String prevDbName = message.getDB(); + if ((prevDbName == null) || prevDbName.isEmpty()) { + LOGGER.error("Alter table event is missing old database name"); + return; + } + prevDbName = prevDbName.toLowerCase(); + String prevTableName = message.getTable(); + if ((prevTableName == null) || prevTableName.isEmpty()) { + LOGGER.error("Alter table event is missing old table name"); + return; + } + prevTableName = prevTableName.toLowerCase(); + + String newDbName = event.getDbName(); + if ((newDbName == null) || newDbName.isEmpty()) { + LOGGER.error("Alter table event is missing new database name"); + return; + } + newDbName = newDbName.toLowerCase(); + + String newTableName = event.getTableName(); + if ((newTableName == null) || newTableName.isEmpty()) { + LOGGER.error("Alter table event is missing new table name"); + return; + } + newTableName = newTableName.toLowerCase(); + + String prevLocation = message.getOldLocation(); + if ((prevLocation == null) || prevLocation.isEmpty()) { + LOGGER.error("Alter table event is missing old location"); + return; + } + String prevPath = FullUpdateInitializer.pathFromURI(prevLocation); + if (prevPath == null) { + return; + } + + String newLocation = message.getNewLocation(); + if ((newLocation == null) || newLocation.isEmpty()) { + LOGGER.error("Alter table event is missing new location"); + return; + } + String newPath = FullUpdateInitializer.pathFromURI(newLocation); + if (newPath == null) { + return; + } + + String prevAuthName = prevDbName + "." + prevTableName; + String newAuthName = newDbName + "." + newTableName; + + if (!prevDbName.equals(newDbName)) { + // Database name change + LOGGER.debug("Changing database name: {} -> {}", prevDbName, newDbName); + Set<String> locations = (Set<String>)image.get(prevDbName); + if (locations != null) { + // Rename database if it is not renamed yet + if (!image.containsKey(newDbName)) { + image.put(newDbName, locations); + image.remove(prevDbName); + // Walk through all tables and rename DB part of the AUTH name + // AUTH name is "dbName.TableName" so we need to replace dbName with the new name + String prevDbPrefix = prevDbName + "."; + String newDbPrefix = newDbName + "."; + renamePrefixKeys(image, prevDbPrefix, newDbPrefix); + } else { + LOGGER.warn("database {} rename: found existing database {}", prevDbName, newDbName); + } + } else { + LOGGER.debug("database {} not found", prevDbName); + } + } + + if (!prevAuthName.equals(newAuthName)) { + // Either the database name or table name changed, rename objects + Set<String> locations = (Set<String>)image.get(prevAuthName); + if (locations != null) { + // Rename if it is not renamed yet + if (!image.containsKey(newAuthName)) { + LOGGER.debug("rename {} -> {}", prevAuthName, newAuthName); + image.put(newAuthName, locations); + image.remove(prevAuthName); + } else { + LOGGER.warn("auth {} rename: found existing object {}", prevAuthName, newAuthName); + } + } else { + LOGGER.debug("auth {} not found", prevAuthName); + } + } + + if (!prevPath.equals(newPath)) { + LOGGER.debug("Location change: {} -> {}", prevPath, newPath); + // Location change + Set<String> locations = (Set<String>) image.get(newAuthName); + if (locations != null && locations.contains(prevPath) && !locations.contains(newPath)) { + locations.remove(prevPath); + locations.add(newPath); + } else { + LOGGER.warn("can not process location change for {}", newAuthName); + LOGGER.warn("old locatio = {}, new location = {}", prevPath, newPath); + } + } + } + + /** + * Add partition just adds a new location to the existing table. + */ + private static void addPartition(Map<String, Collection<String>> image, NotificationEvent event, + MessageDeserializer deserializer) { + SentryJSONAddPartitionMessage message = + (SentryJSONAddPartitionMessage) deserializer.getAddPartitionMessage(event.getMessage()); + + String dbName = message.getDB(); + if ((dbName == null) || dbName.isEmpty()) { + LOGGER.error("Add partition event is missing database name"); + return; + } + String tableName = message.getTable(); + if ((tableName == null) || tableName.isEmpty()) { + LOGGER.error("Add partition event for {} is missing table name", dbName); + return; + } + + String authName = dbName.toLowerCase() + "." + tableName.toLowerCase(); + + List<String> locations = message.getLocations(); + if (locations == null || locations.isEmpty()) { + LOGGER.error("Add partition event for {} is missing partition locations", authName); + return; + } + + Set<String> oldLocations = (Set<String>) image.get(authName); + if (oldLocations == null) { + LOGGER.warn("Add partition for {}: missing table locations",authName); + return; + } + + // Add each partition + for (String location: locations) { + String path = FullUpdateInitializer.pathFromURI(location); + if (path != null) { + LOGGER.debug("Adding partition {}:{}", authName, path); + oldLocations.add(path); + } + } + } + + /** + * Drop partition removes location from the existing table. + */ + private static void dropPartition(Map<String, Collection<String>> image, NotificationEvent event, + MessageDeserializer deserializer) { + SentryJSONDropPartitionMessage message = + (SentryJSONDropPartitionMessage) deserializer + .getDropPartitionMessage(event.getMessage()); + String dbName = message.getDB(); + if ((dbName == null) || dbName.isEmpty()) { + LOGGER.error("Drop partition event is missing database name"); + return; + } + String tableName = message.getTable(); + if ((tableName == null) || tableName.isEmpty()) { + LOGGER.error("Drop partition event for {} is missing table name", dbName); + return; + } + + String authName = dbName.toLowerCase() + "." + tableName.toLowerCase(); + + List<String> locations = message.getLocations(); + if (locations == null || locations.isEmpty()) { + LOGGER.error("Drop partition event for {} is missing partition locations", authName); + return; + } + + Set<String> oldLocations = (Set<String>) image.get(authName); + if (oldLocations == null) { + LOGGER.warn("Add partition for {}: missing table locations",authName); + return; + } + + // Drop each partition + for (String location: locations) { + String path = FullUpdateInitializer.pathFromURI(location); + if (path != null) { + oldLocations.remove(path); + } + } + } + + private static void alterPartition(Map<String, Collection<String>> image, NotificationEvent event, + MessageDeserializer deserializer) { + SentryJSONAlterPartitionMessage message = + (SentryJSONAlterPartitionMessage) deserializer + .getAlterPartitionMessage(event.getMessage()); + + String dbName = message.getDB(); + if ((dbName == null) || dbName.isEmpty()) { + LOGGER.error("Alter partition event is missing database name"); + return; + } + String tableName = message.getTable(); + if ((tableName == null) || tableName.isEmpty()) { + LOGGER.error("Alter partition event for {} is missing table name", dbName); + return; + } + + String authName = dbName.toLowerCase() + "." + tableName.toLowerCase(); + + String prevLocation = message.getOldLocation(); + if (prevLocation == null || prevLocation.isEmpty()) { + LOGGER.error("Alter partition event for {} is missing old location", authName); + } + + String prevPath = FullUpdateInitializer.pathFromURI(prevLocation); + if (prevPath == null) { + return; + } + + String newLocation = message.getNewLocation(); + if (newLocation == null || newLocation.isEmpty()) { + LOGGER.error("Alter partition event for {} is missing new location", authName); + } + + String newPath = FullUpdateInitializer.pathFromURI(newLocation); + if (newPath == null) { + return; + } + + if (prevPath.equals(newPath)) { + LOGGER.warn("Alter partition event for {} has the same old and new path {}", + authName, prevPath); + return; + } + + Set<String> locations = (Set<String>) image.get(authName); + if (locations == null) { + LOGGER.warn("Missing partition locations for {}", authName); + return; + } + + // Rename partition + if (locations.remove(prevPath)) { + LOGGER.debug("Renaming {} to {}", prevPath, newPath); + locations.add(newPath); + } + } + + /** + * Walk through the map and rename all instances of oldKey to newKey. + */ + @VisibleForTesting + protected static void renamePrefixKeys(Map<String, Collection<String>> image, + String oldKey, String newKey) { + // The trick is that we can't just iterate through the map, remove old values and + // insert new values. While we can remove old values with iterators, + // we can't insert new ones while we walk. So we collect the keys to be added in + // a new map and merge them in the end. + Map<String, Set<String>> replacement = new HashMap<>(); + + for (Iterator<Map.Entry<String, Collection<String>>> it = image.entrySet().iterator(); + it.hasNext(); ) { + Map.Entry<String, Collection<String>> entry = it.next(); + String key = entry.getKey(); + if (key.startsWith(oldKey)) { + String updatedKey = key.replaceAll("^" + oldKey + "(.*)", newKey + "$1"); + if (!image.containsKey(updatedKey)) { + LOGGER.debug("Rename {} to {}", key, updatedKey); + replacement.put(updatedKey, (Set<String>) entry.getValue()); + it.remove(); + } else { + LOGGER.warn("skipping key {} - already present", updatedKey); + } + } + } + + mergeMaps(image, replacement); + } + + /** + * Merge replacement values into the original map but only if they are not + * already there. + * + * @param m1 source map + * @param m2 map with replacement values + */ + private static void mergeMaps(Map<String, Collection<String>> m1, Map<String, Set<String>> m2) { + // Merge replacement values into the original map but only if they are not + // already there + for (Map.Entry<String, Set<String>> entry : m2.entrySet()) { + if (!m1.containsKey(entry.getKey())) { + m1.put(entry.getKey(), entry.getValue()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/b97f5c7a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/GSSCallback.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/GSSCallback.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/GSSCallback.java new file mode 100644 index 0000000..d2d85d3 --- /dev/null +++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/GSSCallback.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.service.thrift; + +import java.util.Arrays; +import java.util.List; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.AuthorizeCallback; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.sentry.core.common.exception.ConnectionDeniedException; +import org.apache.sentry.service.common.ServiceConstants.ServerConfig; + +public class GSSCallback extends SaslRpcServer.SaslGssCallbackHandler { + + private final Configuration conf; + public GSSCallback(Configuration conf) { + super(); + this.conf = conf; + } + + boolean comparePrincipals(String principal1, String principal2) { + String[] principalParts1 = SaslRpcServer.splitKerberosName(principal1); + String[] principalParts2 = SaslRpcServer.splitKerberosName(principal2); + if (principalParts1.length == 0 || principalParts2.length == 0) { + return false; + } + if (principalParts1.length == principalParts2.length) { + for (int i=0; i < principalParts1.length; i++) { + if (!principalParts1[i].equals(principalParts2[i])) { + return false; + } + } + return true; + } else { + return false; + } + } + + boolean allowConnect(String principal) { + String allowedPrincipals = conf.get(ServerConfig.ALLOW_CONNECT); + if (allowedPrincipals == null) { + return false; + } + String principalShortName = getShortName(principal); + List<String> items = Arrays.asList(allowedPrincipals.split("\\s*,\\s*")); + for (String item : items) { + if (comparePrincipals(item, principalShortName)) { + return true; + } + } + return false; + } + + private String getShortName(String principal) { + String parts[] = SaslRpcServer.splitKerberosName(principal); + return parts[0]; + } + + @Override + public void handle(Callback[] callbacks) + throws UnsupportedCallbackException, ConnectionDeniedException { + AuthorizeCallback ac = null; + for (Callback callback : callbacks) { + if (callback instanceof AuthorizeCallback) { + ac = (AuthorizeCallback) callback; + } else { + throw new UnsupportedCallbackException(callback, + "Unrecognized SASL GSSAPI Callback"); + } + } + if (ac != null) { + String authid = ac.getAuthenticationID(); + String authzid = ac.getAuthorizationID(); + + if (allowConnect(authid)) { + if (authid.equals(authzid)) { + ac.setAuthorized(true); + } else { + ac.setAuthorized(false); + } + if (ac.isAuthorized()) { + ac.setAuthorizedID(authzid); + } + } else { + throw new ConnectionDeniedException(ac, + "Connection to sentry service denied due to lack of client credentials", + authid); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/b97f5c7a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HMSClient.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HMSClient.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HMSClient.java new file mode 100644 index 0000000..7831430 --- /dev/null +++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HMSClient.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.service.thrift; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; + +/** + * AutoCloseable wrapper around HiveMetaStoreClient. + * It is only used to provide try-with-resource semantics for + * {@link HiveMetaStoreClient}. + */ +public class HMSClient implements AutoCloseable { + private final HiveMetaStoreClient client; + private boolean valid; + + public HMSClient(HiveMetaStoreClient client) { + this.client = Preconditions.checkNotNull(client); + valid = true; + } + + public HiveMetaStoreClient getClient() { + return client; + } + + public void invalidate() { + if (valid) { + client.close(); + valid = false; + } + } + + @Override + public void close() { + if (valid) { + client.close(); + valid = false; + } + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/b97f5c7a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HMSFollowerState.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HMSFollowerState.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HMSFollowerState.java new file mode 100644 index 0000000..74268f7 --- /dev/null +++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HMSFollowerState.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.sentry.service.thrift; + +/** + * States for the HMSFollower + */ +public enum HMSFollowerState implements SentryState { + /** + * If the HMSFollower has been started or not. + */ + STARTED, + + /** + * If the HMSFollower is connected to the HMS + */ + CONNECTED; + + /** + * The component name this state is for. + */ + public static final String COMPONENT = "HMSFollower"; + + /** + * {@inheritDoc} + */ + @Override + public long getValue() { + return 1 << this.ordinal(); + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/b97f5c7a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java new file mode 100644 index 0000000..62542c3 --- /dev/null +++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.service.thrift; + +import org.apache.hadoop.hive.metastore.api.MetaException; + +import java.io.IOException; + +public interface HiveConnectionFactory extends AutoCloseable { + /** + * Open a new connection to HMS. + * + * @return connection to HMS. + * @throws IOException if connection establishement failed. + * @throws InterruptedException if connection establishment was interrupted. + * @throws MetaException if connection establishement failed. + */ + HMSClient connect() throws IOException, InterruptedException, MetaException; +} http://git-wip-us.apache.org/repos/asf/sentry/blob/b97f5c7a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java new file mode 100644 index 0000000..93cc34f --- /dev/null +++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java @@ -0,0 +1,211 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + <p> + http://www.apache.org/licenses/LICENSE-2.0 + <p> + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.sentry.service.thrift; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.sentry.hdfs.UniquePathsUpdate; +import org.apache.sentry.provider.db.service.persistent.SentryStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper class used to fetch Hive MetaStore notifications. + */ +public final class HiveNotificationFetcher implements AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(HiveNotificationFetcher.class); + + private final SentryStore sentryStore; + private final HiveConnectionFactory hmsConnectionFactory; + private HiveMetaStoreClient hmsClient; + + /* The following cache and last filtered ID help us to avoid making less calls to the DB */ + private long lastIdFiltered = 0; + private Set<String> cache = new HashSet<>(); + + public HiveNotificationFetcher(SentryStore sentryStore, HiveConnectionFactory hmsConnectionFactory) { + this.sentryStore = sentryStore; + this.hmsConnectionFactory = hmsConnectionFactory; + } + + /** + * Fetch new HMS notifications appeared since a specified event ID. The returned list may + * include notifications with the same specified ID if they were not seen by Sentry. + * + * @param lastEventId The event ID to use to request notifications. + * @return A list of newer notifications unseen by Sentry. + * @throws Exception If an error occurs on the HMS communication. + */ + public List<NotificationEvent> fetchNotifications(long lastEventId) throws Exception { + return fetchNotifications(lastEventId, Integer.MAX_VALUE); + } + + /** + * Fetch new HMS notifications appeared since a specified event ID. The returned list may + * include notifications with the same specified ID if they were not seen by Sentry. + * + * @param lastEventId The event ID to use to request notifications. + * @param maxEvents The maximum number of events to fetch. + * @return A list of newer notifications unseen by Sentry. + * @throws Exception If an error occurs on the HMS communication. + */ + List<NotificationEvent> fetchNotifications(long lastEventId, int maxEvents) throws Exception { + NotificationFilter filter = null; + + /* + * HMS may bring duplicated events that were committed later than the previous request. To bring + * those newer duplicated events, we request new notifications from the last seen ID - 1. + * + * A current problem is that we could miss duplicates committed much more later, but because + * HMS does not guarantee the order of those, then it is safer to avoid processing them. + * + * TODO: We can avoid doing this once HIVE-16886 is fixed. + */ + if (lastEventId > 0) { + filter = createNotificationFilterFor(lastEventId); + lastEventId--; + } + + LOGGER.debug("Requesting HMS notifications since ID = {}", lastEventId); + + NotificationEventResponse response; + try { + response = getHmsClient().getNextNotification(lastEventId, maxEvents, filter); + } catch (Exception e) { + close(); + throw e; + } + + if (response != null && response.isSetEvents()) { + LOGGER.debug("Fetched {} new HMS notification(s)", response.getEventsSize()); + return response.getEvents(); + } + + return Collections.emptyList(); + } + + /** + * Returns a HMS notification filter for a specific notification ID. HMS notifications may + * have duplicated IDs, so the filter uses a SHA-1 hash to check for a unique notification. + * + * @param id the notification ID to filter + * @return the HMS notification filter + */ + private NotificationFilter createNotificationFilterFor(final long id) { + /* + * A SHA-1 hex value that keeps unique notifications processed is persisted on the Sentry DB. + * To keep unnecessary calls to the DB, we use a cache that keeps seen hashes of the + * specified ID. If a new filter ID is used, then we clean up the cache. + */ + + if (lastIdFiltered != id) { + lastIdFiltered = id; + cache.clear(); + } + + return new NotificationFilter() { + @Override + public boolean accept(NotificationEvent notificationEvent) { + if (notificationEvent.getEventId() == id) { + String hash = UniquePathsUpdate.sha1(notificationEvent); + + try { + if (cache.contains(hash) || sentryStore.isNotificationProcessed(hash)) { + cache.add(hash); + + LOGGER.debug("Ignoring HMS notification already processed: ID = {}", id); + return false; + } + } catch (Exception e) { + LOGGER.error("An error occurred while checking if notification {} is already " + + "processed: {}", id, e.getMessage()); + + // We cannot throw an exception on this filter, so we return false assuming this + // notification is already processed + return false; + } + } + + return true; + } + }; + } + + /** + * Gets the HMS client connection object. + * If will create a new connection if no connection object exists. + * + * @return The HMS client used to communication with the Hive MetaStore. + * @throws Exception If it cannot connect to the HMS service. + */ + private HiveMetaStoreClient getHmsClient() throws Exception { + if (hmsClient == null) { + try { + hmsClient = hmsConnectionFactory.connect().getClient(); + } catch (Exception e) { + LOGGER.error("Fail to connect to the HMS service: {}", e.getMessage()); + throw e; + } + } + + return hmsClient; + } + + /** + * @return the latest notification Id logged by the HMS + * @throws Exception when an error occurs when talking to the HMS client + */ + public long getCurrentNotificationId() throws Exception { + CurrentNotificationEventId eventId; + try { + eventId = getHmsClient().getCurrentNotificationEventId(); + } catch (Exception e) { + close(); + throw e; + } + + if (eventId != null && eventId.isSetEventId()) { + return eventId.getEventId(); + } + + return SentryStore.EMPTY_NOTIFICATION_ID; + } + + /* AutoCloseable implementations */ + + @Override + public void close() { + try { + if (hmsClient != null) { + hmsClient.close(); + } + + cache.clear(); + } finally { + hmsClient = null; + } + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/b97f5c7a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java new file mode 100644 index 0000000..31e58fd --- /dev/null +++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.service.thrift; + +import com.google.common.base.Preconditions; + +import java.io.File; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import javax.security.auth.login.LoginException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.sentry.service.common.ServiceConstants.ServerConfig; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Factory used to generate Hive connections. + * Supports insecure and Kerberos connections. + * For Kerberos connections starts the ticket renewal thread and sets + * up Kerberos credentials. + */ +public final class HiveSimpleConnectionFactory implements HiveConnectionFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(HiveSimpleConnectionFactory.class); + + /** Sentty configuration */ + private final Configuration conf; + /** Hive configuration */ + private final HiveConf hiveConf; + private final boolean insecure; + private SentryKerberosContext kerberosContext = null; + + public HiveSimpleConnectionFactory(Configuration sentryConf, HiveConf hiveConf) { + this.conf = sentryConf; + this.hiveConf = hiveConf; + insecure = !ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase( + sentryConf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_NONE).trim()); + } + + /** + * Initialize the Factory. + * For insecure connections there is nothing to initialize. + * For Kerberos connections sets up ticket renewal thread. + * @throws IOException + * @throws LoginException + */ + public void init() throws IOException, LoginException { + if (insecure) { + LOGGER.info("Using insecure connection to HMS"); + return; + } + + int port = conf.getInt(ServerConfig.RPC_PORT, ServerConfig.RPC_PORT_DEFAULT); + String rawPrincipal = Preconditions.checkNotNull(conf.get(ServerConfig.PRINCIPAL), + "%s is required", ServerConfig.PRINCIPAL); + String principal = SecurityUtil.getServerPrincipal(rawPrincipal, NetUtils.createSocketAddr( + conf.get(ServerConfig.RPC_ADDRESS, ServerConfig.RPC_ADDRESS_DEFAULT), + port).getAddress()); + LOGGER.debug("Opening kerberos connection to HMS using kerberos principal {}", principal); + String[] principalParts = SaslRpcServer.splitKerberosName(principal); + Preconditions.checkArgument(principalParts.length == 3, + "Kerberos principal %s should have 3 parts", principal); + String keytab = Preconditions.checkNotNull(conf.get(ServerConfig.KEY_TAB), + "Configuration is missing required %s paraeter", ServerConfig.KEY_TAB); + File keytabFile = new File(keytab); + Preconditions.checkState(keytabFile.isFile() && keytabFile.canRead(), + "Keytab %s does not exist or is not readable", keytab); + // Instantiating SentryKerberosContext in non-server mode handles the ticket renewal. + kerberosContext = new SentryKerberosContext(principal, keytab, false); + UserGroupInformation.setConfiguration(conf); + LOGGER.info("Using secure connection to HMS"); + } + + /** + * Connect to HMS in unsecure mode or in Kerberos mode according to config. + * + * @return HMS connection + * @throws IOException if could not establish connection + * @throws InterruptedException if connection was interrupted + * @throws MetaException if other errors happened + */ + public HMSClient connect() throws IOException, InterruptedException, MetaException { + if (insecure) { + return new HMSClient(new HiveMetaStoreClient(hiveConf)); + } + UserGroupInformation clientUGI = + UserGroupInformation.getUGIFromSubject(kerberosContext.getSubject()); + return new HMSClient(clientUGI.doAs(new PrivilegedExceptionAction<HiveMetaStoreClient>() { + @Override + public HiveMetaStoreClient run() throws MetaException { + return new HiveMetaStoreClient(hiveConf); + } + })); + } + + @Override + public void close() throws Exception { + if (kerberosContext != null) { + kerberosContext.shutDown(); + kerberosContext = null; + } + } +}
