Repository: sentry
Updated Branches:
  refs/heads/sentry-ha-redesign c0ddd6121 -> e3f0a9f87


SENTRY-1371: Rework Sentry start up and Hive state fetch (Hao Hao, Reviewed by: 
Sravya Tirukkovalur)


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/e3f0a9f8
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/e3f0a9f8
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/e3f0a9f8

Branch: refs/heads/sentry-ha-redesign
Commit: e3f0a9f87ca16723a8c1fba4a6fe19adf28c01c9
Parents: c0ddd61
Author: hahao <[email protected]>
Authored: Tue Aug 23 11:01:47 2016 -0700
Committer: hahao <[email protected]>
Committed: Tue Aug 23 11:01:47 2016 -0700

----------------------------------------------------------------------
 sentry-hdfs/sentry-hdfs-common/pom.xml          |   6 +
 .../sentry/hdfs/FullUpdateInitializer.java      | 335 +++++++++++++++++++
 .../sentry/hdfs/TestFullUpdateInitializer.java  | 170 ++++++++++
 sentry-provider/sentry-provider-db/pom.xml      |  14 +
 .../sentry/service/thrift/HMSFollower.java      |  75 ++++-
 .../sentry/service/thrift/SentryService.java    |   4 +-
 6 files changed, 587 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/e3f0a9f8/sentry-hdfs/sentry-hdfs-common/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/pom.xml 
b/sentry-hdfs/sentry-hdfs-common/pom.xml
index 1c73aaa..7e607e5 100644
--- a/sentry-hdfs/sentry-hdfs-common/pom.xml
+++ b/sentry-hdfs/sentry-hdfs-common/pom.xml
@@ -74,6 +74,12 @@ limitations under the License.
       <artifactId>apacheds-jdbm1</artifactId>
       <version>2.0.0-M2</version>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
   <build>
     <sourceDirectory>${basedir}/src/main/java</sourceDirectory>

http://git-wip-us.apache.org/repos/asf/sentry/blob/e3f0a9f8/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
----------------------------------------------------------------------
diff --git 
a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
 
b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
new file mode 100644
index 0000000..a1f970b
--- /dev/null
+++ 
b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.sentry.hdfs.service.thrift.TPathChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * FullUpdateInitializer is for fetching hive full update,
+ * the <hiveObj, paths> mapping. Multiple tasks will be
+ * running concurrently, and throw exception or inform
+ * sync incomplete paths update based on user configurable
+ * after retry configurable times.
+ */
+public class FullUpdateInitializer implements Closeable {
+
+  private final ExecutorService threadPool;
+  private HiveMetaStoreClient client;
+  private final int maxPartitionsPerCall;
+  private final int maxTablesPerCall;
+  private final List<Future<CallResult>> results = new 
ArrayList<Future<CallResult>>();
+  private final AtomicInteger taskCounter = new AtomicInteger(0);
+  private final int maxRetries;
+  private final int waitDurationMillis;
+  private final boolean failOnRetry;
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FullUpdateInitializer.class);
+
+  final static class CallResult {
+    final private Exception failure;
+    final private boolean successStatus;
+
+    CallResult(Exception ex, boolean successStatus) {
+      failure = ex;
+      this.successStatus = successStatus;
+    }
+
+    public boolean getSuccessStatus() {
+      return successStatus;
+    }
+
+    public Exception getFailure() {
+      return failure;
+    }
+  }
+
+  abstract class BaseTask implements Callable<CallResult> {
+
+    /**
+     *  Class represents retry strategy for BaseTask.
+     */
+    private final class RetryStrategy {
+      private int retryStrategyMaxRetries = 0;
+      private int retryStrategyWaitDurationMillis;
+      private int retries;
+      private Exception exception;
+
+      private RetryStrategy(int retryStrategyMaxRetries, int 
retryStrategyWaitDurationMillis) {
+        this.retryStrategyMaxRetries = retryStrategyMaxRetries;
+        retries = 0;
+
+        // Assign default wait duration if negative value is provided.
+        if (retryStrategyWaitDurationMillis > 0) {
+          this.retryStrategyWaitDurationMillis = 
retryStrategyWaitDurationMillis;
+        } else {
+          this.retryStrategyWaitDurationMillis = 1000;
+        }
+      }
+
+      public CallResult exec()  {
+
+        // Retry logic is happening inside callable/task to avoid
+        // synchronous waiting on getting the result.
+        // Retry the failure task until reach the max retry number.
+        // Wait configurable duration for next retry.
+        for (int i = 0; i < retryStrategyMaxRetries; i++) {
+          try {
+            doTask();
+
+            // Task succeeds, reset the exception and return
+            // the successful flag.
+            exception = null;
+            return new CallResult(exception, true);
+          } catch (Exception ex) {
+            LOGGER.debug("Failed to execute task on " + (i + 1) + " attempts." 
+
+            " Sleeping for " + retryStrategyWaitDurationMillis + " ms. 
Exception: " + ex.toString(), ex);
+            exception = ex;
+
+            try {
+              Thread.sleep(retryStrategyWaitDurationMillis);
+            } catch (InterruptedException exception) {
+              // Skip the rest retries if get InterruptedException.
+              // And set the corresponding retries number.
+              retries = i;
+              i = retryStrategyMaxRetries;
+            }
+          }
+
+          retries = i;
+        }
+
+        // Task fails, return the failure flag.
+        LOGGER.error("Task did not complete successfully after " + retries
+        + " tries. Exception got: " + exception.toString());
+        return new CallResult(exception, false);
+      }
+    }
+
+    private RetryStrategy retryStrategy;
+
+    BaseTask() {
+      taskCounter.incrementAndGet();
+      retryStrategy = new RetryStrategy(maxRetries, waitDurationMillis);
+    }
+
+    @Override
+    public CallResult call() throws Exception {
+      CallResult callResult = retryStrategy.exec();
+      taskCounter.decrementAndGet();
+      return callResult;
+    }
+
+    abstract void doTask() throws Exception;
+  }
+
+  class PartitionTask extends BaseTask {
+    private final String dbName;
+    private final String tblName;
+    private final List<String> partNames;
+    private final TPathChanges tblPathChange;
+
+    PartitionTask(String dbName, String tblName, List<String> partNames,
+    TPathChanges tblPathChange) {
+      super();
+      this.dbName = dbName;
+      this.tblName = tblName;
+      this.partNames = partNames;
+      this.tblPathChange = tblPathChange;
+    }
+
+    @Override
+    public void doTask() throws Exception {
+      List<Partition> tblParts = client.getPartitionsByNames(dbName, tblName, 
partNames);
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("#### Fetching partitions " +
+        "[" + dbName + "." + tblName + "]" + "[" + partNames + "]");
+      }
+      for (Partition part : tblParts) {
+        List<String> partPath = PathsUpdate.parsePath(part.getSd()
+        .getLocation());
+        if (partPath != null) {
+          synchronized (tblPathChange) {
+            tblPathChange.addToAddPaths(partPath);
+          }
+        }
+      }
+    }
+  }
+
+  class TableTask extends BaseTask {
+    private final Database db;
+    private final List<String> tableNames;
+    private final PathsUpdate update;
+
+    TableTask(Database db, List<String> tableNames, PathsUpdate update) {
+      super();
+      this.db = db;
+      this.tableNames = tableNames;
+      this.update = update;
+    }
+
+    @Override
+    public void doTask() throws Exception {
+      List<Table> tables = client.getTableObjectsByName(db.getName(), 
tableNames);
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("#### Fetching tables [" + db.getName() + "][" +
+        tableNames + "]");
+      }
+      for (Table tbl : tables) {
+        TPathChanges tblPathChange;
+        // Table names are case insensitive
+        String tableName = tbl.getTableName().toLowerCase();
+        synchronized (update) {
+          
Preconditions.checkArgument(tbl.getDbName().equalsIgnoreCase(db.getName()));
+          tblPathChange = update.newPathChange(db.getName() + "." + tableName);
+        }
+        if (tbl.getSd().getLocation() != null) {
+          List<String> tblPath =
+          PathsUpdate.parsePath(tbl.getSd().getLocation());
+          if (tblPath != null) {
+            tblPathChange.addToAddPaths(tblPath);
+          }
+          List<String> tblPartNames = client.listPartitionNames(db.getName(), 
tableName, (short) -1);
+          for (int i = 0; i < tblPartNames.size(); i += maxPartitionsPerCall) {
+            List<String> partsToFetch =
+            tblPartNames.subList(i, Math.min(
+            i + maxPartitionsPerCall, tblPartNames.size()));
+            Callable<CallResult> partTask =
+            new PartitionTask(db.getName(), tableName,
+            partsToFetch, tblPathChange);
+            synchronized (results) {
+              results.add(threadPool.submit(partTask));
+            }
+          }
+        }
+      }
+    }
+  }
+
+  class DbTask extends BaseTask {
+
+    private final PathsUpdate update;
+    private final String dbName;
+
+    DbTask(PathsUpdate update, String dbName) {
+      super();
+      this.update = update;
+      //Database names are case insensitive
+      this.dbName = dbName.toLowerCase();
+    }
+
+    @Override
+    public void doTask() throws Exception {
+      Database db = client.getDatabase(dbName);
+      List<String> dbPath = PathsUpdate.parsePath(db.getLocationUri());
+      if (dbPath != null) {
+        synchronized (update) {
+          Preconditions.checkArgument(dbName.equalsIgnoreCase(db.getName()));
+          update.newPathChange(dbName).addToAddPaths(dbPath);
+        }
+      }
+      List<String> allTblStr = client.getAllTables(dbName);
+      for (int i = 0; i < allTblStr.size(); i += maxTablesPerCall) {
+        List<String> tablesToFetch =
+        allTblStr.subList(i, Math.min(
+        i + maxTablesPerCall, allTblStr.size()));
+        Callable<CallResult> tableTask =
+        new TableTask(db, tablesToFetch, update);
+        synchronized (results) {
+          results.add(threadPool.submit(tableTask));
+        }
+      }
+    }
+  }
+
+  public FullUpdateInitializer(HiveMetaStoreClient client, Configuration conf) 
{
+    this.client = client;
+    this.maxPartitionsPerCall = conf.getInt(
+        
ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC,
+        
ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT);
+    this.maxTablesPerCall = conf.getInt(
+        
ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC,
+        
ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT);
+    threadPool = Executors.newFixedThreadPool(conf.getInt(
+        
ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS,
+        
ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT));
+    maxRetries = conf.getInt(
+        
ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM,
+        
ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM_DEFAULT);
+    waitDurationMillis = conf.getInt(
+        
ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS,
+        
ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT);
+    failOnRetry = conf.getBoolean(
+        
ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE,
+        
ServiceConstants.ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_FAIL_ON_PARTIAL_UPDATE_DEFAULT);
+  }
+
+  public UpdateableAuthzPaths createInitialUpdate() throws Exception {
+    UpdateableAuthzPaths authzPaths = new UpdateableAuthzPaths(new
+    String[]{"/"});
+    PathsUpdate tempUpdate = new PathsUpdate(-1, false);
+    List<String> allDbStr = client.getAllDatabases();
+    for (String dbName : allDbStr) {
+      Callable<CallResult> dbTask = new DbTask(tempUpdate, dbName);
+      results.add(threadPool.submit(dbTask));
+    }
+
+    while (taskCounter.get() > 0) {
+      Thread.sleep(1000);
+      // Wait until no more tasks remain
+    }
+
+    for (Future<CallResult> result : results) {
+      CallResult callResult = result.get();
+
+      // Fail the HMS startup if tasks are not all successful and
+      // fail on partial updates flag is set in the config.
+      if (!callResult.getSuccessStatus() && failOnRetry) {
+        throw new RuntimeException(callResult.getFailure());
+      }
+    }
+
+    authzPaths.updatePartial(Lists.newArrayList(tempUpdate), new 
ReentrantReadWriteLock());
+    return authzPaths;
+  }
+
+
+  @Override
+  public void close() throws IOException {
+    if (threadPool != null) {
+      threadPool.shutdownNow();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/e3f0a9f8/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
----------------------------------------------------------------------
diff --git 
a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
 
b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
new file mode 100644
index 0000000..0bb6f66
--- /dev/null
+++ 
b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+
+public class TestFullUpdateInitializer {
+
+  @Test
+  public void testInitializer() throws Exception {
+
+    Database db1 = Mockito.mock(Database.class);
+    Mockito.when(db1.getName()).thenReturn("db1");
+    Mockito.when(db1.getLocationUri()).thenReturn("hdfs:///db1");
+    Database db2 = Mockito.mock(Database.class);
+    Mockito.when(db2.getName()).thenReturn("db2");
+    Mockito.when(db2.getLocationUri()).thenReturn("hdfs:///db2");
+    Database db3 = Mockito.mock(Database.class);
+    Mockito.when(db3.getName()).thenReturn("db3");
+    Mockito.when(db3.getLocationUri()).thenReturn("hdfs:///db3");
+
+    Table tab21 = Mockito.mock(Table.class);
+    Mockito.when(tab21.getDbName()).thenReturn("db2");
+    Mockito.when(tab21.getTableName()).thenReturn("tab21");
+    StorageDescriptor sd21 = Mockito.mock(StorageDescriptor.class);
+    Mockito.when(sd21.getLocation()).thenReturn("hdfs:///db2/tab21");
+    Mockito.when(tab21.getSd()).thenReturn(sd21);
+
+    Table tab31 = Mockito.mock(Table.class);
+    Mockito.when(tab31.getDbName()).thenReturn("db3");
+    Mockito.when(tab31.getTableName()).thenReturn("tab31");
+    StorageDescriptor sd31 = Mockito.mock(StorageDescriptor.class);
+    Mockito.when(sd31.getLocation()).thenReturn("hdfs:///db3/tab31");
+    Mockito.when(tab31.getSd()).thenReturn(sd31);
+
+    Partition part311 = Mockito.mock(Partition.class);
+    StorageDescriptor sd311 = Mockito.mock(StorageDescriptor.class);
+    Mockito.when(sd311.getLocation()).thenReturn("hdfs:///db3/tab31/part311");
+    Mockito.when(part311.getSd()).thenReturn(sd311);
+
+    Partition part312 = Mockito.mock(Partition.class);
+    StorageDescriptor sd312 = Mockito.mock(StorageDescriptor.class);
+    Mockito.when(sd312.getLocation()).thenReturn("hdfs:///db3/tab31/part312");
+    Mockito.when(part312.getSd()).thenReturn(sd312);
+
+    HiveMetaStoreClient client = Mockito.mock(HiveMetaStoreClient.class);
+    Mockito.when(client.getAllDatabases()).thenReturn(Lists
+    .newArrayList("db1", "db2", "db3"));
+    Mockito.when(client.getDatabase("db1")).thenReturn(db1);
+    Mockito.when(client.getAllTables("db1")).thenReturn(new 
ArrayList<String>());
+
+    Mockito.when(client.getDatabase("db2")).thenReturn(db2);
+    
Mockito.when(client.getAllTables("db2")).thenReturn(Lists.newArrayList("tab21"));
+    Mockito.when(client.getTableObjectsByName("db2", 
Lists.newArrayList("tab21")))
+    .thenReturn(Lists.newArrayList(tab21));
+    Mockito.when(client.listPartitionNames("db2", "tab21", (short) -1))
+    .thenReturn(new ArrayList<String>());
+
+    Mockito.when(client.getDatabase("db3")).thenReturn(db3);
+    Mockito.when(client.getAllTables("db3")).thenReturn(Lists
+    .newArrayList("tab31"));
+    Mockito.when(client.getTableObjectsByName("db3", 
Lists.newArrayList("tab31")))
+    .thenReturn(Lists.newArrayList(tab31));
+    Mockito.when(client.listPartitionNames("db3", "tab31", (short) -1))
+    .thenReturn(Lists.newArrayList("part311", "part312"));
+
+    Mockito.when(client.getPartitionsByNames("db3", "tab31", 
Lists.newArrayList("part311")))
+    .thenReturn(Lists.newArrayList(part311));
+    Mockito.when(client.getPartitionsByNames("db3", "tab31", 
Lists.newArrayList("part312")))
+    .thenReturn(Lists.newArrayList(part312));
+
+    Configuration conf = new Configuration();
+    conf.setInt(ServiceConstants.ServerConfig
+    .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC, 1);
+    conf.setInt(ServiceConstants.ServerConfig
+    .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC, 1);
+    conf.setInt(ServiceConstants.ServerConfig
+    .SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS, 1);
+
+    FullUpdateInitializer cacheInitializer = new
+    FullUpdateInitializer(client, conf);
+    UpdateableAuthzPaths update = cacheInitializer.createInitialUpdate();
+
+    Assert.assertEquals(new HashSet<String>(Arrays.asList("db1")), 
update.findAuthzObjectExactMatches(new
+    String[]{"db1"}));
+    Assert.assertEquals(new HashSet<String>(Arrays.asList("db2")), 
update.findAuthzObjectExactMatches(new
+    String[]{"db2"}));
+    Assert.assertEquals(new HashSet<String>(Arrays.asList("db2.tab21")), 
update.findAuthzObjectExactMatches(new
+    String[]{"db2", "tab21"}));
+    Assert.assertEquals(new HashSet<String>(Arrays.asList("db3")), 
update.findAuthzObjectExactMatches(new
+    String[]{"db3"}));
+    Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), 
update.findAuthzObjectExactMatches(new
+    String[]{"db3", "tab31"}));
+    Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), 
update.findAuthzObjectExactMatches(new
+    String[]{"db3", "tab31", "part311"}));
+    Assert.assertEquals(new HashSet<String>(Arrays.asList("db3.tab31")), 
update.findAuthzObjectExactMatches(new
+    String[]{"db3", "tab31", "part312"}));
+    cacheInitializer.close();
+
+  }
+
+  // Make sure exceptions in initializer parallel tasks are propagated well
+  @Test
+  public void testExceptionInTask() throws Exception {
+    //Set up mocks: db1.tb1, with tb1 returning a wrong dbname (db2)
+    Database db1 = Mockito.mock(Database.class);
+    Mockito.when(db1.getName()).thenReturn("db1");
+    Mockito.when(db1.getLocationUri()).thenReturn("hdfs:///db1");
+
+    Table tab1 = Mockito.mock(Table.class);
+    //Return a wrong db name, so that this triggers an exception
+    Mockito.when(tab1.getDbName()).thenReturn("db2");
+    Mockito.when(tab1.getTableName()).thenReturn("tab1");
+
+    HiveMetaStoreClient client = Mockito.mock(HiveMetaStoreClient.class);
+    
Mockito.when(client.getAllDatabases()).thenReturn(Lists.newArrayList("db1"));
+    Mockito.when(client.getDatabase("db1")).thenReturn(db1);
+    Mockito.when(client.getTableObjectsByName("db1",
+    Lists.newArrayList("tab1")))
+    .thenReturn(Lists.newArrayList(tab1));
+    Mockito.when(client.getAllTables("db1")).thenReturn(Lists
+    .newArrayList("tab1"));
+
+    Configuration conf = new Configuration();
+    conf.setInt(ServiceConstants.ServerConfig
+    .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC, 1);
+    conf.setInt(ServiceConstants.ServerConfig
+    .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC, 1);
+    conf.setInt(ServiceConstants.ServerConfig
+    .SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS, 1);
+    conf.setInt(ServiceConstants.ServerConfig
+    .SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM, 2);
+
+    try {
+      FullUpdateInitializer cacheInitializer = new 
FullUpdateInitializer(client, conf);
+      cacheInitializer.createInitialUpdate();
+      Assert.fail("Expected cacheInitializer to fail");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof RuntimeException);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/e3f0a9f8/sentry-provider/sentry-provider-db/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/pom.xml 
b/sentry-provider/sentry-provider-db/pom.xml
index 1699286..2451f34 100644
--- a/sentry-provider/sentry-provider-db/pom.xml
+++ b/sentry-provider/sentry-provider-db/pom.xml
@@ -117,6 +117,20 @@ limitations under the License.
       <version>1.8.0-SNAPSHOT</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.sentry</groupId>
+      <artifactId>sentry-hdfs-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sentry</groupId>
+      <artifactId>sentry-binding-hive-conf</artifactId>
+      <version>1.8.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sentry</groupId>
+      <artifactId>sentry-binding-hive-follower</artifactId>
+      <version>1.8.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hive</groupId>
       <artifactId>hive-shims</artifactId>
       <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/sentry/blob/e3f0a9f8/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
index 4430471..894fcc9 100644
--- 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
@@ -29,6 +29,9 @@ import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hive.hcatalog.messaging.HCatEventMessage;
 import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
 import org.apache.sentry.core.common.exception.*;
+import org.apache.sentry.hdfs.UpdateableAuthzPaths;
+import org.apache.sentry.hdfs.FullUpdateInitializer;
+import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
 import org.apache.sentry.provider.db.service.persistent.SentryStore;
 import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
 import org.apache.thrift.TException;
@@ -43,6 +46,7 @@ import java.io.IOException;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE;
 import static 
org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_DROP_WITH_POLICY_STORE;
@@ -67,8 +71,10 @@ public class HMSFollower implements Runnable {
   private String hiveInstance;
   final static int maxRetriesForLogin = 3;
   final static int maxRetriesForConnection = 3;
+  private volatile UpdateableAuthzPaths authzPaths;
+  private AtomicBoolean fullUpdateComplete;
 
-  HMSFollower(Configuration conf) throws SentryNoSuchObjectException,
+  HMSFollower(Configuration conf, AtomicBoolean fullUpdateComplete) throws 
SentryNoSuchObjectException,
       SentryAccessDeniedException, SentrySiteConfigurationException, 
IOException { //TODO: Handle any possible exceptions or throw specific 
exceptions
     LOGGER.info("HMSFollower is being initialized");
     authzConf = conf;
@@ -79,6 +85,7 @@ public class HMSFollower implements Runnable {
     }
     //TODO: Initialize currentEventID from Sentry db
     currentEventID = 0;
+    this.fullUpdateComplete = fullUpdateComplete;
   }
 
   @VisibleForTesting
@@ -195,27 +202,63 @@ public class HMSFollower implements Runnable {
       }
     }
     if (needFullUpdate()) {
-      //TODO: Handle
-    } else {
-      try {
-        NotificationEventResponse response = 
client.getNextNotification(currentEventID, Integer.MAX_VALUE, null);
-        if (response.isSetEvents()) {
-          LOGGER.info(String.format("CurrentEventID = %s. Processing %s 
events",
-              currentEventID, response.getEvents().size()));
-          processNotificationEvents(response.getEvents());
+      // TODO: read currentEventID from Sentry DB
+      // This guarantee events before failover but did not applied can be 
fetch later.
+      fetchFullUpdate();
+    }
+
+    try {
+      NotificationEventResponse response = 
client.getNextNotification(currentEventID, Integer.MAX_VALUE, null);
+      if (response.isSetEvents()) {
+        LOGGER.info(String.format("CurrentEventID = %s. Processing %s events",
+            currentEventID, response.getEvents().size()));
+        processNotificationEvents(response.getEvents());
+      }
+    } catch (TException e) {
+      LOGGER.error("ThriftException occured fetching Notification entries, 
will try");
+      e.printStackTrace();
+    } catch (SentryInvalidInputException|SentryInvalidHMSEventException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Block the sentry service until it starts up, signal main thread
+   * the full update fetch process is done.
+   */
+  private void fetchFullUpdate() {
+    fullUpdateComplete.getAndSet(false);
+
+    FullUpdateInitializer updateInitializer = null;
+    try {
+      updateInitializer = new FullUpdateInitializer(client, authzConf);
+      HMSFollower.this.authzPaths = updateInitializer.createInitialUpdate();
+      // TODO: notify HDFS plugin
+      LOGGER.info("#### Hive full update initialization complete !!");
+    } catch (Exception e) {
+      LOGGER.error("#### Could not create hive full update !!", e);
+    } finally {
+      if (updateInitializer != null) {
+        try {
+          updateInitializer.close();
+        } catch (Exception e) {
+          LOGGER.info("#### Exception while closing updateInitializer !!", e);
         }
-      } catch (TException e) {
-        LOGGER.error("ThriftException occured fetching Notification entries, 
will try");
-        e.printStackTrace();
-      } catch (SentryInvalidInputException|SentryInvalidHMSEventException e) {
-        throw new RuntimeException(e);
       }
+
+      fullUpdateComplete.getAndSet(true);
     }
   }
 
   private boolean needFullUpdate() {
-    //TODO Implement
-    return false;
+    // Currently fullUpdateComplete is indicator that server is starting up
+    // and in request of a full update.
+    // TODO: set fullUpdateComplete based on notification id stored in 
SentryDB.
+    if (!fullUpdateComplete.get()) {
+      return true;
+    } else {
+      return false;
+    }
   }
 
   private boolean syncWithPolicyStore(HiveAuthzConf.AuthzConfVars syncConfVar) 
{

http://git-wip-us.apache.org/repos/asf/sentry/blob/e3f0a9f8/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
index ddcb90c..7497719 100644
--- 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
 import java.util.EventListener;
 import java.util.List;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.security.auth.Subject;
 
@@ -94,6 +95,7 @@ public class SentryService implements Callable {
   private long maxMessageSize;
   private final boolean isHA;
   private final Activator act;
+  private AtomicBoolean fullUpdateComplete = new AtomicBoolean(false);
   SentryMetrics sentryMetrics;
 
   public SentryService(Configuration conf) throws Exception {
@@ -157,7 +159,7 @@ public class SentryService implements Callable {
     //TODO: Enable only if Hive is using Sentry?
     try {
       hmsFollowerExecutor = Executors.newScheduledThreadPool(1);
-      hmsFollowerExecutor.scheduleAtFixedRate(new HMSFollower(conf), 60000, 
500, TimeUnit.MILLISECONDS);
+      hmsFollowerExecutor.scheduleAtFixedRate(new HMSFollower(conf, 
fullUpdateComplete), 60000, 500, TimeUnit.MILLISECONDS);
     }catch(Exception e) {
       //TODO: Handle
       LOGGER.error("Could not start HMSFollower");

Reply via email to