Repository: sentry Updated Branches: refs/heads/sentry-ha-redesign c0ddd6121 -> e3f0a9f87
SENTRY-1371: Rework Sentry start up and Hive state fetch (Hao Hao, Reviewed by: Sravya Tirukkovalur) Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/e3f0a9f8 Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/e3f0a9f8 Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/e3f0a9f8 Branch: refs/heads/sentry-ha-redesign Commit: e3f0a9f87ca16723a8c1fba4a6fe19adf28c01c9 Parents: c0ddd61 Author: hahao <[email protected]> Authored: Tue Aug 23 11:01:47 2016 -0700 Committer: hahao <[email protected]> Committed: Tue Aug 23 11:01:47 2016 -0700 ---------------------------------------------------------------------- sentry-hdfs/sentry-hdfs-common/pom.xml | 6 + .../sentry/hdfs/FullUpdateInitializer.java | 335 +++++++++++++++++++ .../sentry/hdfs/TestFullUpdateInitializer.java | 170 ++++++++++ sentry-provider/sentry-provider-db/pom.xml | 14 + .../sentry/service/thrift/HMSFollower.java | 75 ++++- .../sentry/service/thrift/SentryService.java | 4 +- 6 files changed, 587 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/e3f0a9f8/sentry-hdfs/sentry-hdfs-common/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/pom.xml b/sentry-hdfs/sentry-hdfs-common/pom.xml index 1c73aaa..7e607e5 100644 --- a/sentry-hdfs/sentry-hdfs-common/pom.xml +++ b/sentry-hdfs/sentry-hdfs-common/pom.xml @@ -74,6 +74,12 @@ limitations under the License. <artifactId>apacheds-jdbm1</artifactId> <version>2.0.0-M2</version> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + </dependencies> <build> <sourceDirectory>${basedir}/src/main/java</sourceDirectory> http://git-wip-us.apache.org/repos/asf/sentry/blob/e3f0a9f8/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java new file mode 100644 index 0000000..a1f970b --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java @@ -0,0 +1,335 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.hdfs; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.sentry.hdfs.service.thrift.TPathChanges; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * FullUpdateInitializer is for fetching hive full update, + * the <hiveObj, paths> mapping. Multiple tasks will be + * running concurrently, and throw exception or inform + * sync incomplete paths update based on user configurable + * after retry configurable times. + */ +public class FullUpdateInitializer implements Closeable { + + private final ExecutorService threadPool; + private HiveMetaStoreClient client; + private final int maxPartitionsPerCall; + private final int maxTablesPerCall; + private final List<Future<CallResult>> results = new ArrayList<Future<CallResult>>(); + private final AtomicInteger taskCounter = new AtomicInteger(0); + private final int maxRetries; + private final int waitDurationMillis; + private final boolean failOnRetry; + + private static final Logger LOGGER = LoggerFactory.getLogger(FullUpdateInitializer.class); + + final static class CallResult { + final private Exception failure; + final private boolean successStatus; + + CallResult(Exception ex, boolean successStatus) { + failure = ex; + this.successStatus = successStatus; + } + + public boolean getSuccessStatus() { + return successStatus; + } + + public Exception getFailure() { + return failure; + } + } + + abstract class BaseTask implements Callable<CallResult> { + + /** + * Class represents retry strategy for BaseTask. + */ + private final class RetryStrategy { + private int retryStrategyMaxRetries = 0; + private int retryStrategyWaitDurationMillis; + private int retries; + private Exception exception; + + private RetryStrategy(int retryStrategyMaxRetries, int retryStrategyWaitDurationMillis) { + this.retryStrategyMaxRetries = retryStrategyMaxRetries; + retries = 0; + + // Assign default wait duration if negative value is provided. + if (retryStrategyWaitDurationMillis > 0) { + this.retryStrategyWaitDurationMillis = retryStrategyWaitDurationMillis; + } else { + this.retryStrategyWaitDurationMillis = 1000; + } + } + + public CallResult exec() { + + // Retry logic is happening inside callable/task to avoid + // synchronous waiting on getting the result. + // Retry the failure task until reach the max retry number. + // Wait configurable duration for next retry. + for (int i = 0; i < retryStrategyMaxRetries; i++) { + try { + doTask(); + + // Task succeeds, reset the exception and return + // the successful flag. + exception = null; + return new CallResult(exception, true); + } catch (Exception ex) { + LOGGER.debug("Failed to execute task on " + (i + 1) + " attempts." + + " Sleeping for " + retryStrategyWaitDurationMillis + " ms. Exception: " + ex.toString(), ex); + exception = ex; + + try { + Thread.sleep(retryStrategyWaitDurationMillis); + } catch (InterruptedException exception) { + // Skip the rest retries if get InterruptedException. + // And set the corresponding retries number. + retries = i; + i = retryStrategyMaxRetries; + } + } + + retries = i; + } + + // Task fails, return the failure flag. + LOGGER.error("Task did not complete successfully after " + retries + + " tries. Exception got: " + exception.toString()); + return new CallResult(exception, false); + } + } + + private RetryStrategy retryStrategy; + + BaseTask() { + taskCounter.incrementAndGet(); + retryStrategy = new RetryStrategy(maxRetries, waitDurationMillis); + } + + @Override + public CallResult call() throws Exception { + CallResult callResult = retryStrategy.exec(); + taskCounter.decrementAndGet(); + return callResult; + } + + abstract void doTask() throws Exception; + } + + class PartitionTask extends BaseTask { + private final String dbName; + private final String tblName; + private final List<String> partNames; + private final TPathChanges tblPathChange; + + PartitionTask(String dbName, String tblName, List<String> partNames, + TPathChanges tblPathChange) { + super(); + this.dbName = dbName; + this.tblName = tblName; + this.partNames = partNames; + this.tblPathChange = tblPathChange; + } + + @Override + public void doTask() throws Exception { + List<Partition> tblParts = client.getPartitionsByNames(dbName, tblName, partNames); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("#### Fetching partitions " + + "[" + dbName + "." + tblName + "]" + "[" + partNames + "]"); + } + for (Partition part : tblParts) { + List<String> partPath = PathsUpdate.parsePath(part.getSd() + .getLocation()); + if (partPath != null) { + synchronized (tblPathChange) { + tblPathChange.addToAddPaths(partPath); + } + } + } + } + } + + class TableTask extends BaseTask { + private final Database db; + private final List<String> tableNames; + private final PathsUpdate update; + + TableTask(Database db, List<String> tableNames, PathsUpdate update) { + super(); + this.db = db; + this.tableNames = tableNames; + this.update = update; + } + + @Override + public void doTask() throws Exception { + List<Table> tables = client.getTableObjectsByName(db.getName(), tableNames); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("#### Fetching tables [" + db.getName() + "][" + + tableNames + "]"); + } + for (Table tbl : tables) { + TPathChanges tblPathChange; + // Table names are case insensitive + String tableName = tbl.getTableName().toLowerCase(); + synchronized (update) { + Preconditions.checkArgument(tbl.getDbName().equalsIgnoreCase(db.getName())); + tblPathChange = update.newPathChange(db.getName() + "." + tableName); + } + if (tbl.getSd().getLocation() != null) { + List<String> tblPath = + PathsUpdate.parsePath(tbl.getSd().getLocation()); + if (tblPath != null) { + tblPathChange.addToAddPaths(tblPath); + } + List<String> tblPartNames = client.listPartitionNames(db.getName(), tableName, (short) -1); + for (int i = 0; i < tblPartNames.size(); i += maxPartitionsPerCall) { + List<String> partsToFetch = + tblPartNames.subList(i, Math.min( + i + maxPartitionsPerCall, tblPartNames.size())); + Callable<CallResult> partTask = + new PartitionTask(db.getName(), tableName, + partsToFetch, tblPathChange); + synchronized (results) { + results.add(threadPool.submit(partTask)); + } + } + } + } + } + } + + class DbTask extends BaseTask { + + private final PathsUpdate update; + private final String dbName; + + DbTask(PathsUpdate update, String dbName) { + super(); + this.update = update; + //Database names are case insensitive + this.dbName = dbName.toLowerCase(); + } + + @Override + public void doTask() throws Exception { + Database db = client.getDatabase(dbName); + List<String> dbPath = PathsUpdate.parsePath(db.getLocationUri()); + if (dbPath != null) { + synchronized (update) { + Preconditions.checkArgument(dbName.equalsIgnoreCase(db.getName())); + update.newPathChange(dbName).addToAddPaths(dbPath); + } + } + List<String> allTblStr = client.getAllTables(dbName); + for (int i = 0; i < allTblStr.size(); i += maxTablesPerCall) { + List<String> tablesToFetch = + allTblStr.subList(i, Math.min( + i + maxTablesPerCall, allTblStr.size())); + Callable<CallResult> tableTask = + new TableTask(db, tablesToFetch, update); + synchronized (results) { + results.add(threadPool.submit(tableTask)); + } + } + } + } + + public FullUpdateInitializer(HiveMetaStoreClient client, Configuration conf) { + this.client = client; + this.maxPartitionsPerCall = conf.getInt( + ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC, + ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT); + this.maxTablesPerCall = conf.getInt( + ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC, + ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT); + threadPool = Executors.newFixedThreadPool(conf.getInt( + ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS, + ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT)); + maxRetries = conf.getInt( + ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM, + ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM_DEFAULT); + waitDurationMillis = conf.getInt( + ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS, + ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT); + failOnRetry = conf.getBoolean( + ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE, + ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE_DEFAULT); + } + + public UpdateableAuthzPaths createInitialUpdate() throws Exception { + UpdateableAuthzPaths authzPaths = new UpdateableAuthzPaths(new + String[]{"/"}); + PathsUpdate tempUpdate = new PathsUpdate(-1, false); + List<String> allDbStr = client.getAllDatabases(); + for (String dbName : allDbStr) { + Callable<CallResult> dbTask = new DbTask(tempUpdate, dbName); + results.add(threadPool.submit(dbTask)); + } + + while (taskCounter.get() > 0) { + Thread.sleep(1000); + // Wait until no more tasks remain + } + + for (Future<CallResult> result : results) { + CallResult callResult = result.get(); + + // Fail the HMS startup if tasks are not all successful and + // fail on partial updates flag is set in the config. + if (!callResult.getSuccessStatus() && failOnRetry) { + throw new RuntimeException(callResult.getFailure()); + } + } + + authzPaths.updatePartial(Lists.newArrayList(tempUpdate), new ReentrantReadWriteLock()); + return authzPaths; + } + + + @Override + public void close() throws IOException { + if (threadPool != null) { + threadPool.shutdownNow(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/e3f0a9f8/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java new file mode 100644 index 0000000..0bb6f66 --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.hdfs; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; + +public class TestFullUpdateInitializer { + + @Test + public void testInitializer() throws Exception { + + Database db1 = Mockito.mock(Database.class); + Mockito.when(db1.getName()).thenReturn("db1"); + Mockito.when(db1.getLocationUri()).thenReturn("hdfs:///db1"); + Database db2 = Mockito.mock(Database.class); + Mockito.when(db2.getName()).thenReturn("db2"); + Mockito.when(db2.getLocationUri()).thenReturn("hdfs:///db2"); + Database db3 = Mockito.mock(Database.class); + Mockito.when(db3.getName()).thenReturn("db3"); + Mockito.when(db3.getLocationUri()).thenReturn("hdfs:///db3"); + + Table tab21 = Mockito.mock(Table.class); + Mockito.when(tab21.getDbName()).thenReturn("db2"); + Mockito.when(tab21.getTableName()).thenReturn("tab21"); + StorageDescriptor sd21 = Mockito.mock(StorageDescriptor.class); + Mockito.when(sd21.getLocation()).thenReturn("hdfs:///db2/tab21"); + Mockito.when(tab21.getSd()).thenReturn(sd21); + + Table tab31 = Mockito.mock(Table.class); + Mockito.when(tab31.getDbName()).thenReturn("db3"); + Mockito.when(tab31.getTableName()).thenReturn("tab31"); + StorageDescriptor sd31 = Mockito.mock(StorageDescriptor.class); + Mockito.when(sd31.getLocation()).thenReturn("hdfs:///db3/tab31"); + Mockito.when(tab31.getSd()).thenReturn(sd31); + + Partition part311 = Mockito.mock(Partition.class); + StorageDescriptor sd311 = Mockito.mock(StorageDescriptor.class); + Mockito.when(sd311.getLocation()).thenReturn("hdfs:///db3/tab31/part311"); + Mockito.when(part311.getSd()).thenReturn(sd311); + + Partition part312 = Mockito.mock(Partition.class); + StorageDescriptor sd312 = Mockito.mock(StorageDescriptor.class); + Mockito.when(sd312.getLocation()).thenReturn("hdfs:///db3/tab31/part312"); + Mockito.when(part312.getSd()).thenReturn(sd312); + + HiveMetaStoreClient client = Mockito.mock(HiveMetaStoreClient.class); + Mockito.when(client.getAllDatabases()).thenReturn(Lists + .newArrayList("db1", "db2", "db3")); + Mockito.when(client.getDatabase("db1")).thenReturn(db1); + Mockito.when(client.getAllTables("db1")).thenReturn(new ArrayList<String>()); + + Mockito.when(client.getDatabase("db2")).thenReturn(db2); + Mockito.when(client.getAllTables("db2")).thenReturn(Lists.newArrayList("tab21")); + Mockito.when(client.getTableObjectsByName("db2", Lists.newArrayList("tab21"))) + .thenReturn(Lists.newArrayList(tab21)); + Mockito.when(client.listPartitionNames("db2", "tab21", (short) -1)) + .thenReturn(new ArrayList<String>()); + + Mockito.when(client.getDatabase("db3")).thenReturn(db3); + Mockito.when(client.getAllTables("db3")).thenReturn(Lists + .newArrayList("tab31")); + Mockito.when(client.getTableObjectsByName("db3", Lists.newArrayList("tab31"))) + .thenReturn(Lists.newArrayList(tab31)); + Mockito.when(client.listPartitionNames("db3", "tab31", (short) -1)) + .thenReturn(Lists.newArrayList("part311", "part312")); + + Mockito.when(client.getPartitionsByNames("db3", "tab31", Lists.newArrayList("part311"))) + .thenReturn(Lists.newArrayList(part311)); + Mockito.when(client.getPartitionsByNames("db3", "tab31", Lists.newArrayList("part312"))) + .thenReturn(Lists.newArrayList(part312)); + + Configuration conf = new Configuration(); + conf.setInt(ServiceConstants.ServerConfig + .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC, 1); + conf.setInt(ServiceConstants.ServerConfig + .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC, 1); + conf.setInt(ServiceConstants.ServerConfig + .SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS, 1); + + FullUpdateInitializer cacheInitializer = new + FullUpdateInitializer(client, conf); + UpdateableAuthzPaths update = cacheInitializer.createInitialUpdate(); + + Assert.assertEquals(new HashSet<String>(Arrays.asList("db1")), update.findAuthzObjectExactMatches(new + String[]{"db1"})); + Assert.assertEquals(new HashSet<String>(Arrays.asList("db2")), update.findAuthzObjectExactMatches(new + String[]{"db2"})); + Assert.assertEquals(new HashSet<String>(Arrays.asList("db2.tab21")), update.findAuthzObjectExactMatches(new + String[]{"db2", "tab21"})); + Assert.assertEquals(new HashSet<String>(Arrays.asList("db3")), update.findAuthzObjectExactMatches(new + String[]{"db3"})); + Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), update.findAuthzObjectExactMatches(new + String[]{"db3", "tab31"})); + Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), update.findAuthzObjectExactMatches(new + String[]{"db3", "tab31", "part311"})); + Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), update.findAuthzObjectExactMatches(new + String[]{"db3", "tab31", "part312"})); + cacheInitializer.close(); + + } + + // Make sure exceptions in initializer parallel tasks are propagated well + @Test + public void testExceptionInTask() throws Exception { + //Set up mocks: db1.tb1, with tb1 returning a wrong dbname (db2) + Database db1 = Mockito.mock(Database.class); + Mockito.when(db1.getName()).thenReturn("db1"); + Mockito.when(db1.getLocationUri()).thenReturn("hdfs:///db1"); + + Table tab1 = Mockito.mock(Table.class); + //Return a wrong db name, so that this triggers an exception + Mockito.when(tab1.getDbName()).thenReturn("db2"); + Mockito.when(tab1.getTableName()).thenReturn("tab1"); + + HiveMetaStoreClient client = Mockito.mock(HiveMetaStoreClient.class); + Mockito.when(client.getAllDatabases()).thenReturn(Lists.newArrayList("db1")); + Mockito.when(client.getDatabase("db1")).thenReturn(db1); + Mockito.when(client.getTableObjectsByName("db1", + Lists.newArrayList("tab1"))) + .thenReturn(Lists.newArrayList(tab1)); + Mockito.when(client.getAllTables("db1")).thenReturn(Lists + .newArrayList("tab1")); + + Configuration conf = new Configuration(); + conf.setInt(ServiceConstants.ServerConfig + .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC, 1); + conf.setInt(ServiceConstants.ServerConfig + .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC, 1); + conf.setInt(ServiceConstants.ServerConfig + .SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS, 1); + conf.setInt(ServiceConstants.ServerConfig + .SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM, 2); + + try { + FullUpdateInitializer cacheInitializer = new FullUpdateInitializer(client, conf); + cacheInitializer.createInitialUpdate(); + Assert.fail("Expected cacheInitializer to fail"); + } catch (Exception e) { + Assert.assertTrue(e instanceof RuntimeException); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/e3f0a9f8/sentry-provider/sentry-provider-db/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/pom.xml b/sentry-provider/sentry-provider-db/pom.xml index 1699286..2451f34 100644 --- a/sentry-provider/sentry-provider-db/pom.xml +++ b/sentry-provider/sentry-provider-db/pom.xml @@ -117,6 +117,20 @@ limitations under the License. <version>1.8.0-SNAPSHOT</version> </dependency> <dependency> + <groupId>org.apache.sentry</groupId> + <artifactId>sentry-hdfs-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.sentry</groupId> + <artifactId>sentry-binding-hive-conf</artifactId> + <version>1.8.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.sentry</groupId> + <artifactId>sentry-binding-hive-follower</artifactId> + <version>1.8.0-SNAPSHOT</version> + </dependency> + <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-shims</artifactId> <scope>provided</scope> http://git-wip-us.apache.org/repos/asf/sentry/blob/e3f0a9f8/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java index 4430471..894fcc9 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java @@ -29,6 +29,9 @@ import org.apache.hadoop.security.SaslRpcServer; import org.apache.hive.hcatalog.messaging.HCatEventMessage; import org.apache.sentry.binding.hive.conf.HiveAuthzConf; import org.apache.sentry.core.common.exception.*; +import org.apache.sentry.hdfs.UpdateableAuthzPaths; +import org.apache.sentry.hdfs.FullUpdateInitializer; +import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; import org.apache.sentry.provider.db.service.persistent.SentryStore; import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable; import org.apache.thrift.TException; @@ -43,6 +46,7 @@ import java.io.IOException; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE; import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_DROP_WITH_POLICY_STORE; @@ -67,8 +71,10 @@ public class HMSFollower implements Runnable { private String hiveInstance; final static int maxRetriesForLogin = 3; final static int maxRetriesForConnection = 3; + private volatile UpdateableAuthzPaths authzPaths; + private AtomicBoolean fullUpdateComplete; - HMSFollower(Configuration conf) throws SentryNoSuchObjectException, + HMSFollower(Configuration conf, AtomicBoolean fullUpdateComplete) throws SentryNoSuchObjectException, SentryAccessDeniedException, SentrySiteConfigurationException, IOException { //TODO: Handle any possible exceptions or throw specific exceptions LOGGER.info("HMSFollower is being initialized"); authzConf = conf; @@ -79,6 +85,7 @@ public class HMSFollower implements Runnable { } //TODO: Initialize currentEventID from Sentry db currentEventID = 0; + this.fullUpdateComplete = fullUpdateComplete; } @VisibleForTesting @@ -195,27 +202,63 @@ public class HMSFollower implements Runnable { } } if (needFullUpdate()) { - //TODO: Handle - } else { - try { - NotificationEventResponse response = client.getNextNotification(currentEventID, Integer.MAX_VALUE, null); - if (response.isSetEvents()) { - LOGGER.info(String.format("CurrentEventID = %s. Processing %s events", - currentEventID, response.getEvents().size())); - processNotificationEvents(response.getEvents()); + // TODO: read currentEventID from Sentry DB + // This guarantee events before failover but did not applied can be fetch later. + fetchFullUpdate(); + } + + try { + NotificationEventResponse response = client.getNextNotification(currentEventID, Integer.MAX_VALUE, null); + if (response.isSetEvents()) { + LOGGER.info(String.format("CurrentEventID = %s. Processing %s events", + currentEventID, response.getEvents().size())); + processNotificationEvents(response.getEvents()); + } + } catch (TException e) { + LOGGER.error("ThriftException occured fetching Notification entries, will try"); + e.printStackTrace(); + } catch (SentryInvalidInputException|SentryInvalidHMSEventException e) { + throw new RuntimeException(e); + } + } + + /** + * Block the sentry service until it starts up, signal main thread + * the full update fetch process is done. + */ + private void fetchFullUpdate() { + fullUpdateComplete.getAndSet(false); + + FullUpdateInitializer updateInitializer = null; + try { + updateInitializer = new FullUpdateInitializer(client, authzConf); + HMSFollower.this.authzPaths = updateInitializer.createInitialUpdate(); + // TODO: notify HDFS plugin + LOGGER.info("#### Hive full update initialization complete !!"); + } catch (Exception e) { + LOGGER.error("#### Could not create hive full update !!", e); + } finally { + if (updateInitializer != null) { + try { + updateInitializer.close(); + } catch (Exception e) { + LOGGER.info("#### Exception while closing updateInitializer !!", e); } - } catch (TException e) { - LOGGER.error("ThriftException occured fetching Notification entries, will try"); - e.printStackTrace(); - } catch (SentryInvalidInputException|SentryInvalidHMSEventException e) { - throw new RuntimeException(e); } + + fullUpdateComplete.getAndSet(true); } } private boolean needFullUpdate() { - //TODO Implement - return false; + // Currently fullUpdateComplete is indicator that server is starting up + // and in request of a full update. + // TODO: set fullUpdateComplete based on notification id stored in SentryDB. + if (!fullUpdateComplete.get()) { + return true; + } else { + return false; + } } private boolean syncWithPolicyStore(HiveAuthzConf.AuthzConfVars syncConfVar) { http://git-wip-us.apache.org/repos/asf/sentry/blob/e3f0a9f8/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java index ddcb90c..7497719 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.EventListener; import java.util.List; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import javax.security.auth.Subject; @@ -94,6 +95,7 @@ public class SentryService implements Callable { private long maxMessageSize; private final boolean isHA; private final Activator act; + private AtomicBoolean fullUpdateComplete = new AtomicBoolean(false); SentryMetrics sentryMetrics; public SentryService(Configuration conf) throws Exception { @@ -157,7 +159,7 @@ public class SentryService implements Callable { //TODO: Enable only if Hive is using Sentry? try { hmsFollowerExecutor = Executors.newScheduledThreadPool(1); - hmsFollowerExecutor.scheduleAtFixedRate(new HMSFollower(conf), 60000, 500, TimeUnit.MILLISECONDS); + hmsFollowerExecutor.scheduleAtFixedRate(new HMSFollower(conf, fullUpdateComplete), 60000, 500, TimeUnit.MILLISECONDS); }catch(Exception e) { //TODO: Handle LOGGER.error("Could not start HMSFollower");
