http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore2.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore2.java 
b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore2.java
new file mode 100644
index 0000000..fa4e02a
--- /dev/null
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore2.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
+import org.apache.hadoop.hive.metastore.model.MNotificationLog;
+import org.apache.hadoop.hive.metastore.model.MNotificationNextId;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.hadoop.hive.metastore.TestOldSchema.dropAllStoreObjects;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+// Tests from TestObjectStore that can't be moved yet due to references to 
EventMessage.  Once
+// EventMessage has been moved this should be recombined with TestObjectStore.
+
+public class TestObjectStore2 {
+  private ObjectStore objectStore = null;
+
+  public static class MockPartitionExpressionProxy implements 
PartitionExpressionProxy {
+    @Override
+    public String convertExprToFilter(byte[] expr) throws MetaException {
+      return null;
+    }
+
+    @Override
+    public boolean filterPartitionsByExpr(List<FieldSchema> partColumns,
+                                          byte[] expr, String 
defaultPartitionName, List<String> partitionNames)
+        throws MetaException {
+      return false;
+    }
+
+    @Override
+    public FileMetadataExprType getMetadataType(String inputFormat) {
+      return null;
+    }
+
+    @Override
+    public SearchArgument createSarg(byte[] expr) {
+      return null;
+    }
+
+    @Override
+    public FileFormatProxy getFileFormatProxy(FileMetadataExprType type) {
+      return null;
+    }
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    Configuration conf = MetastoreConf.newMetastoreConf();
+    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.EXPRESSION_PROXY_CLASS,
+        MockPartitionExpressionProxy.class.getName());
+
+    objectStore = new ObjectStore();
+    objectStore.setConf(conf);
+    dropAllStoreObjects(objectStore);
+  }
+
+  /**
+   * Test notification operations
+   */
+  // TODO MS-SPLIT uncomment once we move EventMessage over
+  @Test
+  public void testNotificationOps() throws InterruptedException {
+    final int NO_EVENT_ID = 0;
+    final int FIRST_EVENT_ID = 1;
+    final int SECOND_EVENT_ID = 2;
+
+    NotificationEvent event =
+        new NotificationEvent(0, 0, 
EventMessage.EventType.CREATE_DATABASE.toString(), "");
+    NotificationEventResponse eventResponse;
+    CurrentNotificationEventId eventId;
+
+    // Verify that there is no notifications available yet
+    eventId = objectStore.getCurrentNotificationEventId();
+    assertEquals(NO_EVENT_ID, eventId.getEventId());
+
+    // Verify that addNotificationEvent() updates the NotificationEvent with 
the new event ID
+    objectStore.addNotificationEvent(event);
+    assertEquals(FIRST_EVENT_ID, event.getEventId());
+    objectStore.addNotificationEvent(event);
+    assertEquals(SECOND_EVENT_ID, event.getEventId());
+
+    // Verify that objectStore fetches the latest notification event ID
+    eventId = objectStore.getCurrentNotificationEventId();
+    assertEquals(SECOND_EVENT_ID, eventId.getEventId());
+
+    // Verify that getNextNotification() returns all events
+    eventResponse = objectStore.getNextNotification(new 
NotificationEventRequest());
+    assertEquals(2, eventResponse.getEventsSize());
+    assertEquals(FIRST_EVENT_ID, 
eventResponse.getEvents().get(0).getEventId());
+    assertEquals(SECOND_EVENT_ID, 
eventResponse.getEvents().get(1).getEventId());
+
+    // Verify that getNextNotification(last) returns events after a specified 
event
+    eventResponse = objectStore.getNextNotification(new 
NotificationEventRequest(FIRST_EVENT_ID));
+    assertEquals(1, eventResponse.getEventsSize());
+    assertEquals(SECOND_EVENT_ID, 
eventResponse.getEvents().get(0).getEventId());
+
+    // Verify that getNextNotification(last) returns zero events if there are 
no more notifications available
+    eventResponse = objectStore.getNextNotification(new 
NotificationEventRequest(SECOND_EVENT_ID));
+    assertEquals(0, eventResponse.getEventsSize());
+
+    // Verify that cleanNotificationEvents() cleans up all old notifications
+    Thread.sleep(1);
+    objectStore.cleanNotificationEvents(1);
+    eventResponse = objectStore.getNextNotification(new 
NotificationEventRequest());
+    assertEquals(0, eventResponse.getEventsSize());
+  }
+
+  @Ignore(
+      "This test is here to allow testing with other databases like mysql / 
postgres etc\n"
+          + " with  user changes to the code. This cannot be run on apache 
derby because of\n"
+          + " 
https://db.apache.org/derby/docs/10.10/devguide/cdevconcepts842385.html";
+  )
+  @Test
+  public void testConcurrentAddNotifications() throws ExecutionException, 
InterruptedException {
+
+    final int NUM_THREADS = 10;
+    CyclicBarrier cyclicBarrier = new CyclicBarrier(NUM_THREADS,
+        () -> LoggerFactory.getLogger("test")
+            .debug(NUM_THREADS + " threads going to add notification"));
+
+    Configuration conf = MetastoreConf.newMetastoreConf();
+    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.EXPRESSION_PROXY_CLASS,
+        MockPartitionExpressionProxy.class.getName());
+    /*
+       Below are the properties that need to be set based on what database 
this test is going to be run
+     */
+
+//    conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, 
"com.mysql.jdbc.Driver");
+//    conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY,
+//        "jdbc:mysql://localhost:3306/metastore_db");
+//    conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, "");
+//    conf.setVar(HiveConf.ConfVars.METASTOREPWD, "");
+
+    /*
+     we have to  add this one manually as for tests the db is initialized via 
the metastoreDiretSQL
+     and we don't run the schema creation sql that includes the an insert for 
notification_sequence
+     which can be locked. the entry in notification_sequence happens via 
notification_event insertion.
+    */
+    objectStore.getPersistenceManager().newQuery(MNotificationLog.class, 
"eventType==''").execute();
+    objectStore.getPersistenceManager().newQuery(MNotificationNextId.class, 
"nextEventId==-1").execute();
+
+    objectStore.addNotificationEvent(
+        new NotificationEvent(0, 0,
+            EventMessage.EventType.CREATE_DATABASE.toString(),
+            "CREATE DATABASE DB initial"));
+
+    ExecutorService executorService = 
Executors.newFixedThreadPool(NUM_THREADS);
+    for (int i = 0; i < NUM_THREADS; i++) {
+      final int n = i;
+
+      executorService.execute(
+          () -> {
+            ObjectStore store = new ObjectStore();
+            store.setConf(conf);
+
+            String eventType = 
EventMessage.EventType.CREATE_DATABASE.toString();
+            NotificationEvent dbEvent =
+                new NotificationEvent(0, 0, eventType,
+                    "CREATE DATABASE DB" + n);
+            System.out.println("ADDING NOTIFICATION");
+
+            try {
+              cyclicBarrier.await();
+            } catch (InterruptedException | BrokenBarrierException e) {
+              throw new RuntimeException(e);
+            }
+            store.addNotificationEvent(dbEvent);
+            System.out.println("FINISH NOTIFICATION");
+          });
+    }
+    executorService.shutdown();
+    assertTrue(executorService.awaitTermination(15, TimeUnit.SECONDS));
+
+    // we have to setup this again as the underlying PMF keeps getting 
reinitialized with original
+    // reference closed
+    ObjectStore store = new ObjectStore();
+    store.setConf(conf);
+
+    NotificationEventResponse eventResponse = store.getNextNotification(
+        new NotificationEventRequest());
+    assertEquals(NUM_THREADS + 1, eventResponse.getEventsSize());
+    long previousId = 0;
+    for (NotificationEvent event : eventResponse.getEvents()) {
+      assertTrue("previous:" + previousId + " current:" + event.getEventId(),
+          previousId < event.getEventId());
+      assertTrue(previousId + 1 == event.getEventId());
+      previousId = event.getEventId();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/metastore/src/test/org/apache/hadoop/hive/metastore/TestRawStoreProxy.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/test/org/apache/hadoop/hive/metastore/TestRawStoreProxy.java 
b/metastore/src/test/org/apache/hadoop/hive/metastore/TestRawStoreProxy.java
deleted file mode 100644
index 68d65a8..0000000
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/TestRawStoreProxy.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.metastore;
-
-import static org.junit.Assert.fail;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.junit.Test;
-
-public class TestRawStoreProxy {
-
-  static class TestStore extends ObjectStore {
-    @Override
-    public void setConf(Configuration conf) {
-      // noop
-    }
-
-    public void noopMethod() throws MetaException {
-      Deadline.checkTimeout();
-    }
-
-    public void exceptions() throws IllegalStateException, MetaException {
-      Deadline.checkTimeout();
-      throw new IllegalStateException("throwing an exception");
-    }
-  }
-
-  @Test
-  public void testExceptionDispatch() throws Throwable {
-    HiveConf hiveConf = new HiveConf();
-    hiveConf.setTimeVar(HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, 10,
-        TimeUnit.MILLISECONDS);
-    RawStoreProxy rsp = new RawStoreProxy(hiveConf, hiveConf, TestStore.class, 
1);
-    try {
-      rsp.invoke(null, TestStore.class.getMethod("exceptions"), new Object[] 
{});
-      fail("an exception is expected");
-    } catch (IllegalStateException ise) {
-      // expected
-    }
-    Thread.sleep(20);
-    // this shouldn't throw an exception
-    rsp.invoke(null, TestStore.class.getMethod("noopMethod"), new Object[] {});
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
 
b/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
deleted file mode 100644
index b6d2df5..0000000
--- 
a/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
+++ /dev/null
@@ -1,901 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.cache;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.ObjectStore;
-import org.apache.hadoop.hive.metastore.TableType;
-import 
org.apache.hadoop.hive.metastore.TestObjectStore.MockPartitionExpressionProxy;
-import org.apache.hadoop.hive.metastore.api.AggrStats;
-import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.PrincipalType;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import 
org.apache.hadoop.hive.metastore.columnstats.cache.LongColumnStatsDataInspector;
-import 
org.apache.hadoop.hive.metastore.columnstats.cache.StringColumnStatsDataInspector;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestCachedStore {
-
-  private ObjectStore objectStore;
-  private CachedStore cachedStore;
-  private SharedCache sharedCache;
-
-  @Before
-  public void setUp() throws Exception {
-    HiveConf conf = new HiveConf();
-    conf.setBoolean(HiveConf.ConfVars.HIVE_IN_TEST.varname, true);
-    conf.setVar(HiveConf.ConfVars.METASTORE_EXPRESSION_PROXY_CLASS,
-        MockPartitionExpressionProxy.class.getName());
-    objectStore = new ObjectStore();
-    objectStore.setConf(conf);
-    cachedStore = new CachedStore();
-    cachedStore.setConf(conf);
-    // Stop the CachedStore cache update service. We'll start it explicitly to 
control the test
-    CachedStore.stopCacheUpdateService(1);
-    cachedStore.setInitializedForTest();
-
-    // Stop the CachedStore cache update service. We'll start it explicitly to 
control the test
-    CachedStore.stopCacheUpdateService(1);
-    sharedCache = new SharedCache();
-    sharedCache.getDatabaseCache().clear();
-    sharedCache.getTableCache().clear();
-    sharedCache.getPartitionCache().clear();
-    sharedCache.getSdCache().clear();
-    sharedCache.getPartitionColStatsCache().clear();
-  }
-
-  
/**********************************************************************************************
-   * Methods that test CachedStore
-   
*********************************************************************************************/
-
-  @Test
-  public void testDatabaseOps() throws Exception {
-    // Add a db via ObjectStore
-    String dbName = "testDatabaseOps";
-    String dbDescription = "testDatabaseOps";
-    String dbLocation = "file:/tmp";
-    Map<String, String> dbParams = new HashMap<String, String>();
-    String dbOwner = "user1";
-    Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
-    db.setOwnerName(dbOwner);
-    db.setOwnerType(PrincipalType.USER);
-    objectStore.createDatabase(db);
-    db = objectStore.getDatabase(dbName);
-    // Prewarm CachedStore
-    CachedStore.prewarm(objectStore);
-
-    // Read database via CachedStore
-    Database dbNew = cachedStore.getDatabase(dbName);
-    Assert.assertEquals(db, dbNew);
-
-    // Add another db via CachedStore
-    final String dbName1 = "testDatabaseOps1";
-    final String dbDescription1 = "testDatabaseOps1";
-    Database db1 = new Database(dbName1, dbDescription1, dbLocation, dbParams);
-    db1.setOwnerName(dbOwner);
-    db1.setOwnerType(PrincipalType.USER);
-    cachedStore.createDatabase(db1);
-    db1 = cachedStore.getDatabase(dbName1);
-
-    // Read db via ObjectStore
-    dbNew = objectStore.getDatabase(dbName1);
-    Assert.assertEquals(db1, dbNew);
-
-    // Alter the db via CachedStore (can only alter owner or parameters)
-    db = new Database(dbName, dbDescription, dbLocation, dbParams);
-    dbOwner = "user2";
-    db.setOwnerName(dbOwner);
-    db.setOwnerType(PrincipalType.USER);
-    cachedStore.alterDatabase(dbName, db);
-    db = cachedStore.getDatabase(dbName);
-
-    // Read db via ObjectStore
-    dbNew = objectStore.getDatabase(dbName);
-    Assert.assertEquals(db, dbNew);
-
-    // Add another db via ObjectStore
-    final String dbName2 = "testDatabaseOps2";
-    final String dbDescription2 = "testDatabaseOps2";
-    Database db2 = new Database(dbName2, dbDescription2, dbLocation, dbParams);
-    db2.setOwnerName(dbOwner);
-    db2.setOwnerType(PrincipalType.USER);
-    objectStore.createDatabase(db2);
-    db2 = objectStore.getDatabase(dbName2);
-
-    // Alter db "testDatabaseOps" via ObjectStore
-    dbOwner = "user1";
-    db = new Database(dbName, dbDescription, dbLocation, dbParams);
-    db.setOwnerName(dbOwner);
-    db.setOwnerType(PrincipalType.USER);
-    objectStore.alterDatabase(dbName, db);
-    db = objectStore.getDatabase(dbName);
-
-    // Drop db "testDatabaseOps1" via ObjectStore
-    objectStore.dropDatabase(dbName1);
-
-    // We update twice to accurately detect if cache is dirty or not
-    updateCache(cachedStore, 100, 500, 100);
-    updateCache(cachedStore, 100, 500, 100);
-
-    // Read the newly added db via CachedStore
-    dbNew = cachedStore.getDatabase(dbName2);
-    Assert.assertEquals(db2, dbNew);
-
-    // Read the altered db via CachedStore (altered user from "user2" to 
"user1")
-    dbNew = cachedStore.getDatabase(dbName);
-    Assert.assertEquals(db, dbNew);
-
-    // Try to read the dropped db after cache update
-    try {
-      dbNew = cachedStore.getDatabase(dbName1);
-      Assert.fail("The database: " + dbName1
-          + " should have been removed from the cache after running the update 
service");
-    } catch (NoSuchObjectException e) {
-      // Expected
-    }
-
-    // Clean up
-    objectStore.dropDatabase(dbName);
-    objectStore.dropDatabase(dbName2);
-  }
-
-  @Test
-  public void testTableOps() throws Exception {
-    // Add a db via ObjectStore
-    String dbName = "testTableOps";
-    String dbDescription = "testTableOps";
-    String dbLocation = "file:/tmp";
-    Map<String, String> dbParams = new HashMap<String, String>();
-    String dbOwner = "user1";
-    Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
-    db.setOwnerName(dbOwner);
-    db.setOwnerType(PrincipalType.USER);
-    objectStore.createDatabase(db);
-    db = objectStore.getDatabase(dbName);
-
-    // Add a table via ObjectStore
-    String tblName = "tbl";
-    String tblOwner = "user1";
-    String serdeLocation = "file:/tmp";
-    FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
-    FieldSchema col2 = new FieldSchema("col2", "string", "string column");
-    List<FieldSchema> cols = new ArrayList<FieldSchema>();
-    cols.add(col1);
-    cols.add(col2);
-    Map<String, String> serdeParams = new HashMap<String, String>();
-    Map<String, String> tblParams = new HashMap<String, String>();
-    SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", new 
HashMap<String, String>());
-    StorageDescriptor sd =
-        new StorageDescriptor(cols, serdeLocation, "input", "output", false, 
0, serdeInfo, null,
-            null, serdeParams);
-    sd.setStoredAsSubDirectories(false);
-    Table tbl =
-        new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new 
ArrayList<FieldSchema>(), tblParams,
-            null, null, TableType.MANAGED_TABLE.toString());
-    objectStore.createTable(tbl);
-    tbl = objectStore.getTable(dbName, tblName);
-
-    // Prewarm CachedStore
-    CachedStore.prewarm(objectStore);
-
-    // Read database, table via CachedStore
-    Database dbNew = cachedStore.getDatabase(dbName);
-    Assert.assertEquals(db, dbNew);
-    Table tblNew = cachedStore.getTable(dbName, tblName);
-    Assert.assertEquals(tbl, tblNew);
-
-    // Add a new table via CachedStore
-    String tblName1 = "tbl1";
-    Table tbl1 =
-        new Table(tblName1, dbName, tblOwner, 0, 0, 0, sd, new 
ArrayList<FieldSchema>(), tblParams,
-            null, null, TableType.MANAGED_TABLE.toString());
-    cachedStore.createTable(tbl1);
-    tbl1 = cachedStore.getTable(dbName, tblName1);
-
-    // Read via object store
-    tblNew = objectStore.getTable(dbName, tblName1);
-    Assert.assertEquals(tbl1, tblNew);
-
-    // Add a new table via ObjectStore
-    String tblName2 = "tbl2";
-    Table tbl2 =
-        new Table(tblName2, dbName, tblOwner, 0, 0, 0, sd, new 
ArrayList<FieldSchema>(), tblParams,
-            null, null, TableType.MANAGED_TABLE.toString());
-    objectStore.createTable(tbl2);
-    tbl2 = objectStore.getTable(dbName, tblName2);
-
-    // Alter table "tbl" via ObjectStore
-    tblOwner = "user2";
-    tbl =
-        new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new 
ArrayList<FieldSchema>(), tblParams,
-            null, null, TableType.MANAGED_TABLE.toString());
-    objectStore.alterTable(dbName, tblName, tbl);
-    tbl = objectStore.getTable(dbName, tblName);
-
-    // Drop table "tbl1" via ObjectStore
-    objectStore.dropTable(dbName, tblName1);
-
-    // We update twice to accurately detect if cache is dirty or not
-    updateCache(cachedStore, 100, 500, 100);
-    updateCache(cachedStore, 100, 500, 100);
-
-    // Read "tbl2" via CachedStore
-    tblNew = cachedStore.getTable(dbName, tblName2);
-    Assert.assertEquals(tbl2, tblNew);
-
-    // Read the altered "tbl" via CachedStore
-    tblNew = cachedStore.getTable(dbName, tblName);
-    Assert.assertEquals(tbl, tblNew);
-
-    // Try to read the dropped "tbl1" via CachedStore (should throw exception)
-    tblNew = cachedStore.getTable(dbName, tblName1);
-    Assert.assertNull(tblNew);
-
-    // Should return "tbl" and "tbl2"
-    List<String> tblNames = cachedStore.getTables(dbName, "*");
-    Assert.assertTrue(tblNames.contains(tblName));
-    Assert.assertTrue(!tblNames.contains(tblName1));
-    Assert.assertTrue(tblNames.contains(tblName2));
-
-    // Clean up
-    objectStore.dropTable(dbName, tblName);
-    objectStore.dropTable(dbName, tblName2);
-    objectStore.dropDatabase(dbName);
-  }
-
-  @Test
-  public void testPartitionOps() throws Exception {
-    // Add a db via ObjectStore
-    String dbName = "testPartitionOps";
-    String dbDescription = "testPartitionOps";
-    String dbLocation = "file:/tmp";
-    Map<String, String> dbParams = new HashMap<String, String>();
-    String dbOwner = "user1";
-    Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
-    db.setOwnerName(dbOwner);
-    db.setOwnerType(PrincipalType.USER);
-    objectStore.createDatabase(db);
-    db = objectStore.getDatabase(dbName);
-
-    // Add a table via ObjectStore
-    String tblName = "tbl";
-    String tblOwner = "user1";
-    String serdeLocation = "file:/tmp";
-    FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
-    FieldSchema col2 = new FieldSchema("col2", "string", "string column");
-    List<FieldSchema> cols = new ArrayList<FieldSchema>();
-    cols.add(col1);
-    cols.add(col2);
-    Map<String, String> serdeParams = new HashMap<String, String>();
-    Map<String, String> tblParams = new HashMap<String, String>();
-    SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", null);
-    StorageDescriptor sd =
-        new StorageDescriptor(cols, serdeLocation, "input", "output", false, 
0, serdeInfo, null,
-            null, serdeParams);
-    FieldSchema ptnCol1 = new FieldSchema("part1", "string", "string partition 
column");
-    List<FieldSchema> ptnCols = new ArrayList<FieldSchema>();
-    ptnCols.add(ptnCol1);
-    Table tbl =
-        new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols, tblParams, 
null, null,
-            TableType.MANAGED_TABLE.toString());
-    objectStore.createTable(tbl);
-    tbl = objectStore.getTable(dbName, tblName);
-    final String ptnColVal1 = "aaa";
-    Map<String, String> partParams = new HashMap<String, String>();
-    Partition ptn1 =
-        new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0, 0, sd, 
partParams);
-    objectStore.addPartition(ptn1);
-    ptn1 = objectStore.getPartition(dbName, tblName, 
Arrays.asList(ptnColVal1));
-    final String ptnColVal2 = "bbb";
-    Partition ptn2 =
-        new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0, 0, sd, 
partParams);
-    objectStore.addPartition(ptn2);
-    ptn2 = objectStore.getPartition(dbName, tblName, 
Arrays.asList(ptnColVal2));
-
-    // Prewarm CachedStore
-    CachedStore.prewarm(objectStore);
-
-    // Read database, table, partition via CachedStore
-    Database dbNew = cachedStore.getDatabase(dbName);
-    Assert.assertEquals(db, dbNew);
-    Table tblNew = cachedStore.getTable(dbName, tblName);
-    Assert.assertEquals(tbl, tblNew);
-    Partition newPtn1 = cachedStore.getPartition(dbName, tblName, 
Arrays.asList(ptnColVal1));
-    Assert.assertEquals(ptn1, newPtn1);
-    Partition newPtn2 = cachedStore.getPartition(dbName, tblName, 
Arrays.asList(ptnColVal2));
-    Assert.assertEquals(ptn2, newPtn2);
-
-    // Add a new partition via ObjectStore
-    final String ptnColVal3 = "ccc";
-    Partition ptn3 =
-        new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0, 0, sd, 
partParams);
-    objectStore.addPartition(ptn3);
-    ptn3 = objectStore.getPartition(dbName, tblName, 
Arrays.asList(ptnColVal3));
-
-    // Alter an existing partition ("aaa") via ObjectStore
-    final String ptnColVal1Alt = "aaaAlt";
-    Partition ptn1Atl =
-        new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0, 0, sd, 
partParams);
-    objectStore.alterPartition(dbName, tblName, Arrays.asList(ptnColVal1), 
ptn1Atl);
-    ptn1Atl = objectStore.getPartition(dbName, tblName, 
Arrays.asList(ptnColVal1Alt));
-
-    // Drop an existing partition ("bbb") via ObjectStore
-    objectStore.dropPartition(dbName, tblName, Arrays.asList(ptnColVal2));
-
-    // We update twice to accurately detect if cache is dirty or not
-    updateCache(cachedStore, 100, 500, 100);
-    updateCache(cachedStore, 100, 500, 100);
-
-    // Read the newly added partition via CachedStore
-    Partition newPtn = cachedStore.getPartition(dbName, tblName, 
Arrays.asList(ptnColVal3));
-    Assert.assertEquals(ptn3, newPtn);
-
-    // Read the altered partition via CachedStore
-    newPtn = cachedStore.getPartition(dbName, tblName, 
Arrays.asList(ptnColVal1Alt));
-    Assert.assertEquals(ptn1Atl, newPtn);
-
-    // Try to read the dropped partition via CachedStore
-    try {
-      newPtn = cachedStore.getPartition(dbName, tblName, 
Arrays.asList(ptnColVal2));
-      Assert.fail("The partition: " + ptnColVal2
-          + " should have been removed from the cache after running the update 
service");
-    } catch (NoSuchObjectException e) {
-      // Expected
-    }
-  }
-
-  //@Test
-  public void testTableColStatsOps() throws Exception {
-    // Add a db via ObjectStore
-    String dbName = "testTableColStatsOps";
-    String dbDescription = "testTableColStatsOps";
-    String dbLocation = "file:/tmp";
-    Map<String, String> dbParams = new HashMap<String, String>();
-    String dbOwner = "user1";
-    Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
-    db.setOwnerName(dbOwner);
-    db.setOwnerType(PrincipalType.USER);
-    objectStore.createDatabase(db);
-    db = objectStore.getDatabase(dbName);
-
-    // Add a table via ObjectStore
-    final String tblName = "tbl";
-    final String tblOwner = "user1";
-    final String serdeLocation = "file:/tmp";
-    final FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
-    // Stats values for col1
-    long col1LowVal = 5;
-    long col1HighVal = 500;
-    long col1Nulls = 10;
-    long col1DV = 20;
-    final  FieldSchema col2 = new FieldSchema("col2", "string", "string 
column");
-    // Stats values for col2
-    long col2MaxColLen = 100;
-    double col2AvgColLen = 45.5;
-    long col2Nulls = 5;
-    long col2DV = 40;
-    final FieldSchema col3 = new FieldSchema("col3", "boolean", "boolean 
column");
-    // Stats values for col3
-    long col3NumTrues = 100;
-    long col3NumFalses = 30;
-    long col3Nulls = 10;
-    final List<FieldSchema> cols = new ArrayList<FieldSchema>();
-    cols.add(col1);
-    cols.add(col2);
-    cols.add(col3);
-    Map<String, String> serdeParams = new HashMap<String, String>();
-    Map<String, String> tblParams = new HashMap<String, String>();
-    final SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", null);
-    StorageDescriptor sd =
-        new StorageDescriptor(cols, serdeLocation, "input", "output", false, 
0, serdeInfo, null,
-            null, serdeParams);
-    Table tbl =
-        new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new 
ArrayList<FieldSchema>(), tblParams,
-            null, null, TableType.MANAGED_TABLE.toString());
-    objectStore.createTable(tbl);
-    tbl = objectStore.getTable(dbName, tblName);
-
-    // Add ColumnStatistics for tbl to metastore DB via ObjectStore
-    ColumnStatistics stats = new ColumnStatistics();
-    ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, 
tblName);
-    List<ColumnStatisticsObj> colStatObjs = new 
ArrayList<ColumnStatisticsObj>();
-
-    // Col1
-    ColumnStatisticsData data1 = new ColumnStatisticsData();
-    ColumnStatisticsObj col1Stats = new ColumnStatisticsObj(col1.getName(), 
col1.getType(), data1);
-    LongColumnStatsDataInspector longStats = new 
LongColumnStatsDataInspector();
-    longStats.setLowValue(col1LowVal);
-    longStats.setHighValue(col1HighVal);
-    longStats.setNumNulls(col1Nulls);
-    longStats.setNumDVs(col1DV);
-    data1.setLongStats(longStats);
-    colStatObjs.add(col1Stats);
-
-    // Col2
-    ColumnStatisticsData data2 = new ColumnStatisticsData();
-    ColumnStatisticsObj col2Stats = new ColumnStatisticsObj(col2.getName(), 
col2.getType(), data2);
-    StringColumnStatsDataInspector stringStats = new 
StringColumnStatsDataInspector();
-    stringStats.setMaxColLen(col2MaxColLen);
-    stringStats.setAvgColLen(col2AvgColLen);
-    stringStats.setNumNulls(col2Nulls);
-    stringStats.setNumDVs(col2DV);
-    data2.setStringStats(stringStats);
-    colStatObjs.add(col2Stats);
-
-    // Col3
-    ColumnStatisticsData data3 = new ColumnStatisticsData();
-    ColumnStatisticsObj col3Stats = new ColumnStatisticsObj(col3.getName(), 
col3.getType(), data3);
-    BooleanColumnStatsData boolStats = new BooleanColumnStatsData();
-    boolStats.setNumTrues(col3NumTrues);
-    boolStats.setNumFalses(col3NumFalses);
-    boolStats.setNumNulls(col3Nulls);
-    data3.setBooleanStats(boolStats);
-    colStatObjs.add(col3Stats);
-
-    stats.setStatsDesc(statsDesc);
-    stats.setStatsObj(colStatObjs);
-
-    // Save to DB
-    objectStore.updateTableColumnStatistics(stats);
-
-    // Prewarm CachedStore
-    CachedStore.prewarm(objectStore);
-
-    // Read table stats via CachedStore
-    ColumnStatistics newStats =
-        cachedStore.getTableColumnStatistics(dbName, tblName,
-            Arrays.asList(col1.getName(), col2.getName(), col3.getName()));
-    Assert.assertEquals(stats, newStats);
-  }
-
-  private void updateCache(CachedStore cachedStore, long frequency, long 
sleepTime,
-      long shutdownTimeout) throws InterruptedException {
-    // Set cache refresh period to 100 milliseconds
-    CachedStore.setCacheRefreshPeriod(100);
-    // Start the CachedStore update service
-    CachedStore.startCacheUpdateService(cachedStore.getConf());
-    // Sleep for 500 ms so that cache update is complete
-    Thread.sleep(500);
-    // Stop cache update service
-    CachedStore.stopCacheUpdateService(100);
-  }
-
-  
/**********************************************************************************************
-   * Methods that test SharedCache
-   
*********************************************************************************************/
-
-  @Test
-  public void testSharedStoreDb() {
-    Database db1 = new Database();
-    Database db2 = new Database();
-    Database db3 = new Database();
-    Database newDb1 = new Database();
-    newDb1.setName("db1");
-
-    sharedCache.addDatabaseToCache("db1", db1);
-    sharedCache.addDatabaseToCache("db2", db2);
-    sharedCache.addDatabaseToCache("db3", db3);
-
-    Assert.assertEquals(sharedCache.getCachedDatabaseCount(), 3);
-
-    sharedCache.alterDatabaseInCache("db1", newDb1);
-
-    Assert.assertEquals(sharedCache.getCachedDatabaseCount(), 3);
-
-    sharedCache.removeDatabaseFromCache("db2");
-
-    Assert.assertEquals(sharedCache.getCachedDatabaseCount(), 2);
-
-    List<String> dbs = sharedCache.listCachedDatabases();
-    Assert.assertEquals(dbs.size(), 2);
-    Assert.assertTrue(dbs.contains("db1"));
-    Assert.assertTrue(dbs.contains("db3"));
-  }
-
-  @Test
-  public void testSharedStoreTable() {
-    Table tbl1 = new Table();
-    StorageDescriptor sd1 = new StorageDescriptor();
-    List<FieldSchema> cols1 = new ArrayList<FieldSchema>();
-    cols1.add(new FieldSchema("col1", "int", ""));
-    Map<String, String> params1 = new HashMap<String, String>();
-    params1.put("key", "value");
-    sd1.setCols(cols1);
-    sd1.setParameters(params1);
-    sd1.setLocation("loc1");
-    tbl1.setSd(sd1);
-    tbl1.setPartitionKeys(new ArrayList<FieldSchema>());
-
-    Table tbl2 = new Table();
-    StorageDescriptor sd2 = new StorageDescriptor();
-    List<FieldSchema> cols2 = new ArrayList<FieldSchema>();
-    cols2.add(new FieldSchema("col1", "int", ""));
-    Map<String, String> params2 = new HashMap<String, String>();
-    params2.put("key", "value");
-    sd2.setCols(cols2);
-    sd2.setParameters(params2);
-    sd2.setLocation("loc2");
-    tbl2.setSd(sd2);
-    tbl2.setPartitionKeys(new ArrayList<FieldSchema>());
-
-    Table tbl3 = new Table();
-    StorageDescriptor sd3 = new StorageDescriptor();
-    List<FieldSchema> cols3 = new ArrayList<FieldSchema>();
-    cols3.add(new FieldSchema("col3", "int", ""));
-    Map<String, String> params3 = new HashMap<String, String>();
-    params3.put("key2", "value2");
-    sd3.setCols(cols3);
-    sd3.setParameters(params3);
-    sd3.setLocation("loc3");
-    tbl3.setSd(sd3);
-    tbl3.setPartitionKeys(new ArrayList<FieldSchema>());
-
-    Table newTbl1 = new Table();
-    newTbl1.setDbName("db2");
-    newTbl1.setTableName("tbl1");
-    StorageDescriptor newSd1 = new StorageDescriptor();
-    List<FieldSchema> newCols1 = new ArrayList<FieldSchema>();
-    newCols1.add(new FieldSchema("newcol1", "int", ""));
-    Map<String, String> newParams1 = new HashMap<String, String>();
-    newParams1.put("key", "value");
-    newSd1.setCols(newCols1);
-    newSd1.setParameters(params1);
-    newSd1.setLocation("loc1");
-    newTbl1.setSd(newSd1);
-    newTbl1.setPartitionKeys(new ArrayList<FieldSchema>());
-
-    sharedCache.addTableToCache("db1", "tbl1", tbl1);
-    sharedCache.addTableToCache("db1", "tbl2", tbl2);
-    sharedCache.addTableToCache("db1", "tbl3", tbl3);
-    sharedCache.addTableToCache("db2", "tbl1", tbl1);
-
-    Assert.assertEquals(sharedCache.getCachedTableCount(), 4);
-    Assert.assertEquals(sharedCache.getSdCache().size(), 2);
-
-    Table t = sharedCache.getTableFromCache("db1", "tbl1");
-    Assert.assertEquals(t.getSd().getLocation(), "loc1");
-
-    sharedCache.removeTableFromCache("db1", "tbl1");
-    Assert.assertEquals(sharedCache.getCachedTableCount(), 3);
-    Assert.assertEquals(sharedCache.getSdCache().size(), 2);
-
-    sharedCache.alterTableInCache("db2", "tbl1", newTbl1);
-    Assert.assertEquals(sharedCache.getCachedTableCount(), 3);
-    Assert.assertEquals(sharedCache.getSdCache().size(), 3);
-
-    sharedCache.removeTableFromCache("db1", "tbl2");
-    Assert.assertEquals(sharedCache.getCachedTableCount(), 2);
-    Assert.assertEquals(sharedCache.getSdCache().size(), 2);
-  }
-
-
-  @Test
-  public void testSharedStorePartition() {
-    Partition part1 = new Partition();
-    StorageDescriptor sd1 = new StorageDescriptor();
-    List<FieldSchema> cols1 = new ArrayList<FieldSchema>();
-    cols1.add(new FieldSchema("col1", "int", ""));
-    Map<String, String> params1 = new HashMap<String, String>();
-    params1.put("key", "value");
-    sd1.setCols(cols1);
-    sd1.setParameters(params1);
-    sd1.setLocation("loc1");
-    part1.setSd(sd1);
-    part1.setValues(Arrays.asList("201701"));
-
-    Partition part2 = new Partition();
-    StorageDescriptor sd2 = new StorageDescriptor();
-    List<FieldSchema> cols2 = new ArrayList<FieldSchema>();
-    cols2.add(new FieldSchema("col1", "int", ""));
-    Map<String, String> params2 = new HashMap<String, String>();
-    params2.put("key", "value");
-    sd2.setCols(cols2);
-    sd2.setParameters(params2);
-    sd2.setLocation("loc2");
-    part2.setSd(sd2);
-    part2.setValues(Arrays.asList("201702"));
-
-    Partition part3 = new Partition();
-    StorageDescriptor sd3 = new StorageDescriptor();
-    List<FieldSchema> cols3 = new ArrayList<FieldSchema>();
-    cols3.add(new FieldSchema("col3", "int", ""));
-    Map<String, String> params3 = new HashMap<String, String>();
-    params3.put("key2", "value2");
-    sd3.setCols(cols3);
-    sd3.setParameters(params3);
-    sd3.setLocation("loc3");
-    part3.setSd(sd3);
-    part3.setValues(Arrays.asList("201703"));
-
-    Partition newPart1 = new Partition();
-    newPart1.setDbName("db1");
-    newPart1.setTableName("tbl1");
-    StorageDescriptor newSd1 = new StorageDescriptor();
-    List<FieldSchema> newCols1 = new ArrayList<FieldSchema>();
-    newCols1.add(new FieldSchema("newcol1", "int", ""));
-    Map<String, String> newParams1 = new HashMap<String, String>();
-    newParams1.put("key", "value");
-    newSd1.setCols(newCols1);
-    newSd1.setParameters(params1);
-    newSd1.setLocation("loc1");
-    newPart1.setSd(newSd1);
-    newPart1.setValues(Arrays.asList("201701"));
-
-    sharedCache.addPartitionToCache("db1", "tbl1", part1);
-    sharedCache.addPartitionToCache("db1", "tbl1", part2);
-    sharedCache.addPartitionToCache("db1", "tbl1", part3);
-    sharedCache.addPartitionToCache("db1", "tbl2", part1);
-
-    Assert.assertEquals(sharedCache.getCachedPartitionCount(), 4);
-    Assert.assertEquals(sharedCache.getSdCache().size(), 2);
-
-    Partition t = sharedCache.getPartitionFromCache("db1", "tbl1", 
Arrays.asList("201701"));
-    Assert.assertEquals(t.getSd().getLocation(), "loc1");
-
-    sharedCache.removePartitionFromCache("db1", "tbl2", 
Arrays.asList("201701"));
-    Assert.assertEquals(sharedCache.getCachedPartitionCount(), 3);
-    Assert.assertEquals(sharedCache.getSdCache().size(), 2);
-
-    sharedCache.alterPartitionInCache("db1", "tbl1", Arrays.asList("201701"), 
newPart1);
-    Assert.assertEquals(sharedCache.getCachedPartitionCount(), 3);
-    Assert.assertEquals(sharedCache.getSdCache().size(), 3);
-
-    sharedCache.removePartitionFromCache("db1", "tbl1", 
Arrays.asList("201702"));
-    Assert.assertEquals(sharedCache.getCachedPartitionCount(), 2);
-    Assert.assertEquals(sharedCache.getSdCache().size(), 2);
-  }
-
-  @Test
-  public void testAggrStatsRepeatedRead() throws Exception {
-    String dbName = "testTableColStatsOps";
-    String tblName = "tbl";
-    String colName = "f1";
-
-    Database db = new Database(dbName, null, "some_location", null);
-    cachedStore.createDatabase(db);
-
-    List<FieldSchema> cols = new ArrayList<FieldSchema>();
-    cols.add(new FieldSchema(colName, "int", null));
-    List<FieldSchema> partCols = new ArrayList<FieldSchema>();
-    partCols.add(new FieldSchema("col", "int", null));
-    StorageDescriptor sd =
-        new StorageDescriptor(cols, null, "input", "output", false, 0, new 
SerDeInfo("serde", "seriallib", new HashMap<String, String>()),
-            null, null, null);
-
-    Table tbl =
-        new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new 
HashMap<String, String>(),
-            null, null, TableType.MANAGED_TABLE.toString());
-    cachedStore.createTable(tbl);
-
-    List<String> partVals1 = new ArrayList<String>();
-    partVals1.add("1");
-    List<String> partVals2 = new ArrayList<String>();
-    partVals2.add("2");
-
-    Partition ptn1 =
-        new Partition(partVals1, dbName, tblName, 0, 0, sd, new 
HashMap<String, String>());
-    cachedStore.addPartition(ptn1);
-    Partition ptn2 =
-        new Partition(partVals2, dbName, tblName, 0, 0, sd, new 
HashMap<String, String>());
-    cachedStore.addPartition(ptn2);
-
-    ColumnStatistics stats = new ColumnStatistics();
-    ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, 
tblName);
-    statsDesc.setPartName("col");
-    List<ColumnStatisticsObj> colStatObjs = new 
ArrayList<ColumnStatisticsObj>();
-
-    ColumnStatisticsData data = new ColumnStatisticsData();
-    ColumnStatisticsObj colStats = new ColumnStatisticsObj(colName, "int", 
data);
-    LongColumnStatsDataInspector longStats = new 
LongColumnStatsDataInspector();
-    longStats.setLowValue(0);
-    longStats.setHighValue(100);
-    longStats.setNumNulls(50);
-    longStats.setNumDVs(30);
-    data.setLongStats(longStats);
-    colStatObjs.add(colStats);
-
-    stats.setStatsDesc(statsDesc);
-    stats.setStatsObj(colStatObjs);
-
-    cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals1);
-    cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals2);
-
-    List<String> colNames = new ArrayList<String>();
-    colNames.add(colName);
-    List<String> aggrPartVals = new ArrayList<String>();
-    aggrPartVals.add("1");
-    aggrPartVals.add("2");
-    AggrStats aggrStats = cachedStore.get_aggr_stats_for(dbName, tblName, 
aggrPartVals, colNames);
-    
Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(),
 100);
-    aggrStats = cachedStore.get_aggr_stats_for(dbName, tblName, aggrPartVals, 
colNames);
-    
Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(),
 100);
-  }
-
-  @Test
-  public void testPartitionAggrStats() throws Exception {
-    String dbName = "testTableColStatsOps1";
-    String tblName = "tbl1";
-    String colName = "f1";
-    
-    Database db = new Database(dbName, null, "some_location", null);
-    cachedStore.createDatabase(db);
-    
-    List<FieldSchema> cols = new ArrayList<FieldSchema>();
-    cols.add(new FieldSchema(colName, "int", null));
-    List<FieldSchema> partCols = new ArrayList<FieldSchema>();
-    partCols.add(new FieldSchema("col", "int", null));
-    StorageDescriptor sd =
-        new StorageDescriptor(cols, null, "input", "output", false, 0, new 
SerDeInfo("serde", "seriallib", new HashMap<String, String>()),
-            null, null, null);
-    
-    Table tbl =
-        new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new 
HashMap<String, String>(),
-            null, null, TableType.MANAGED_TABLE.toString());
-    cachedStore.createTable(tbl);
-    
-    List<String> partVals1 = new ArrayList<String>();
-    partVals1.add("1");
-    List<String> partVals2 = new ArrayList<String>();
-    partVals2.add("2");
-    
-    Partition ptn1 =
-        new Partition(partVals1, dbName, tblName, 0, 0, sd, new 
HashMap<String, String>());
-    cachedStore.addPartition(ptn1);
-    Partition ptn2 =
-        new Partition(partVals2, dbName, tblName, 0, 0, sd, new 
HashMap<String, String>());
-    cachedStore.addPartition(ptn2);
-    
-    ColumnStatistics stats = new ColumnStatistics();
-    ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, 
tblName);
-    statsDesc.setPartName("col");
-    List<ColumnStatisticsObj> colStatObjs = new 
ArrayList<ColumnStatisticsObj>();
-    
-    ColumnStatisticsData data = new ColumnStatisticsData();
-    ColumnStatisticsObj colStats = new ColumnStatisticsObj(colName, "int", 
data);
-    LongColumnStatsDataInspector longStats = new 
LongColumnStatsDataInspector();
-    longStats.setLowValue(0);
-    longStats.setHighValue(100);
-    longStats.setNumNulls(50);
-    longStats.setNumDVs(30);
-    data.setLongStats(longStats);
-    colStatObjs.add(colStats);
-    
-    stats.setStatsDesc(statsDesc);
-    stats.setStatsObj(colStatObjs);
-    
-    cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals1);
-    
-    longStats.setNumDVs(40);
-    cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals2);
-    
-    List<String> colNames = new ArrayList<String>();
-    colNames.add(colName);
-    List<String> aggrPartVals = new ArrayList<String>();
-    aggrPartVals.add("1");
-    aggrPartVals.add("2");
-    AggrStats aggrStats = cachedStore.get_aggr_stats_for(dbName, tblName, 
aggrPartVals, colNames);
-    
Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(),
 100);
-    
Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumDVs(),
 40);
-    aggrStats = cachedStore.get_aggr_stats_for(dbName, tblName, aggrPartVals, 
colNames);
-    
Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(),
 100);
-    
Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumDVs(),
 40);
-  }
-
-  @Test
-  public void testPartitionAggrStatsBitVector() throws Exception {
-    String dbName = "testTableColStatsOps2";
-    String tblName = "tbl2";
-    String colName = "f1";
-    
-    Database db = new Database(dbName, null, "some_location", null);
-    cachedStore.createDatabase(db);
-    
-    List<FieldSchema> cols = new ArrayList<FieldSchema>();
-    cols.add(new FieldSchema(colName, "int", null));
-    List<FieldSchema> partCols = new ArrayList<FieldSchema>();
-    partCols.add(new FieldSchema("col", "int", null));
-    StorageDescriptor sd =
-        new StorageDescriptor(cols, null, "input", "output", false, 0, new 
SerDeInfo("serde", "seriallib", new HashMap<String, String>()),
-            null, null, null);
-    
-    Table tbl =
-        new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new 
HashMap<String, String>(),
-            null, null, TableType.MANAGED_TABLE.toString());
-    cachedStore.createTable(tbl);
-    
-    List<String> partVals1 = new ArrayList<String>();
-    partVals1.add("1");
-    List<String> partVals2 = new ArrayList<String>();
-    partVals2.add("2");
-    
-    Partition ptn1 =
-        new Partition(partVals1, dbName, tblName, 0, 0, sd, new 
HashMap<String, String>());
-    cachedStore.addPartition(ptn1);
-    Partition ptn2 =
-        new Partition(partVals2, dbName, tblName, 0, 0, sd, new 
HashMap<String, String>());
-    cachedStore.addPartition(ptn2);
-    
-    ColumnStatistics stats = new ColumnStatistics();
-    ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, 
tblName);
-    statsDesc.setPartName("col");
-    List<ColumnStatisticsObj> colStatObjs = new 
ArrayList<ColumnStatisticsObj>();
-    
-    ColumnStatisticsData data = new ColumnStatisticsData();
-    ColumnStatisticsObj colStats = new ColumnStatisticsObj(colName, "int", 
data);
-    LongColumnStatsDataInspector longStats = new 
LongColumnStatsDataInspector();
-    longStats.setLowValue(0);
-    longStats.setHighValue(100);
-    longStats.setNumNulls(50);
-    longStats.setNumDVs(30);
-    
-    HyperLogLog hll = HyperLogLog.builder().build();
-    hll.addLong(1);
-    hll.addLong(2);
-    hll.addLong(3);
-    longStats.setBitVectors(hll.serialize());
-    
-    data.setLongStats(longStats);
-    colStatObjs.add(colStats);
-    
-    stats.setStatsDesc(statsDesc);
-    stats.setStatsObj(colStatObjs);
-    
-    cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals1);
-    
-    longStats.setNumDVs(40);
-    hll = HyperLogLog.builder().build();
-    hll.addLong(2);
-    hll.addLong(3);
-    hll.addLong(4);
-    hll.addLong(5);
-    longStats.setBitVectors(hll.serialize());
-    
-    cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals2);
-    
-    List<String> colNames = new ArrayList<String>();
-    colNames.add(colName);
-    List<String> aggrPartVals = new ArrayList<String>();
-    aggrPartVals.add("1");
-    aggrPartVals.add("2");
-    AggrStats aggrStats = cachedStore.get_aggr_stats_for(dbName, tblName, 
aggrPartVals, colNames);
-    
Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(),
 100);
-    
Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumDVs(),
 5);
-    aggrStats = cachedStore.get_aggr_stats_for(dbName, tblName, aggrPartVals, 
colNames);
-    
Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(),
 100);
-    
Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumDVs(),
 5);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
index 41fbb0c..50a55d8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
@@ -42,7 +42,6 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStore;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Order;
@@ -50,6 +49,7 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -1753,7 +1753,7 @@ public abstract class BaseSemanticAnalyzer {
     } else {
       throw new SemanticException("Unexpected date type " + 
colValue.getClass());
     }
-    return HiveMetaStore.PARTITION_DATE_FORMAT.get().format(value);
+    return MetaStoreUtils.PARTITION_DATE_FORMAT.get().format(value);
   }
 
   protected WriteEntity toWriteEntity(String location) throws 
SemanticException {

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java 
b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java
index b26401d..5c6bbbd 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.MetadataPpdResult;
 import org.apache.hadoop.hive.metastore.filemeta.OrcFileMetadataHandler;
-import org.apache.hadoop.hive.metastore.hbase.MetadataStore;
+import org.apache.hadoop.hive.metastore.MetadataStore;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import 
org.apache.hadoop.hive.ql.io.orc.ExternalCache.ExternalFooterCachesByConf;
 import org.apache.hadoop.hive.ql.metadata.HiveException;

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/pom.xml
----------------------------------------------------------------------
diff --git a/standalone-metastore/pom.xml b/standalone-metastore/pom.xml
index acc50ca..07b767b 100644
--- a/standalone-metastore/pom.xml
+++ b/standalone-metastore/pom.xml
@@ -33,9 +33,9 @@
 
   <dependencies>
     <dependency>
-      <groupId>com.jolbox</groupId>
-      <artifactId>bonecp</artifactId>
-      <version>${bonecp.version}</version>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>${jackson.new.version}</version>
     </dependency>
     <dependency>
       <groupId>com.github.joshelser</groupId>
@@ -53,6 +53,11 @@
       <version>${protobuf.version}</version>
     </dependency>
     <dependency>
+      <groupId>com.jolbox</groupId>
+      <artifactId>bonecp</artifactId>
+      <version>${bonecp.version}</version>
+    </dependency>
+    <dependency>
       <groupId>com.zaxxer</groupId>
       <artifactId>HikariCP</artifactId>
       <version>${hikaricp.version}</version>
@@ -78,6 +83,16 @@
       <version>${dropwizard.version}</version>
     </dependency>
     <dependency>
+      <groupId>javolution</groupId>
+      <artifactId>javolution</artifactId>
+      <version>${javolution.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.antlr</groupId>
+      <artifactId>antlr-runtime</artifactId>
+      <version>${antlr.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
       <version>${commons-lang3.version}</version>
@@ -284,6 +299,20 @@
         </executions>
       </plugin>
       <plugin>
+        <groupId>org.antlr</groupId>
+        <artifactId>antlr3-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>antlr</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <sourceDirectory>${basedir}/src/main/java</sourceDirectory>
+        </configuration>
+      </plugin>
+      <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-antrun-plugin</artifactId>
         <executions>

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/StatsSetupConst.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/StatsSetupConst.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/StatsSetupConst.java
new file mode 100644
index 0000000..e2e3ada
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/StatsSetupConst.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+
+/**
+ * A class that defines the constant strings used by the statistics 
implementation.
+ */
+
+public class StatsSetupConst {
+
+  protected static final Logger LOG = 
LoggerFactory.getLogger(StatsSetupConst.class.getName());
+
+  public enum StatDB {
+    fs {
+      @Override
+      public String getPublisher(Configuration conf) {
+        return "org.apache.hadoop.hive.ql.stats.fs.FSStatsPublisher";
+      }
+
+      @Override
+      public String getAggregator(Configuration conf) {
+        return "org.apache.hadoop.hive.ql.stats.fs.FSStatsAggregator";
+      }
+    },
+    custom {
+      @Override
+      public String getPublisher(Configuration conf) {
+        return MetastoreConf.getVar(conf, ConfVars.STATS_DEFAULT_PUBLISHER); }
+      @Override
+      public String getAggregator(Configuration conf) {
+        return MetastoreConf.getVar(conf,  ConfVars.STATS_DEFAULT_AGGREGATOR); 
}
+    };
+    public abstract String getPublisher(Configuration conf);
+    public abstract String getAggregator(Configuration conf);
+  }
+
+  // statistics stored in metastore
+  /**
+   * The name of the statistic Num Files to be published or gathered.
+   */
+  public static final String NUM_FILES = "numFiles";
+
+  /**
+   * The name of the statistic Num Partitions to be published or gathered.
+   */
+  public static final String NUM_PARTITIONS = "numPartitions";
+
+  /**
+   * The name of the statistic Total Size to be published or gathered.
+   */
+  public static final String TOTAL_SIZE = "totalSize";
+
+  /**
+   * The name of the statistic Row Count to be published or gathered.
+   */
+  public static final String ROW_COUNT = "numRows";
+
+  public static final String RUN_TIME_ROW_COUNT = "runTimeNumRows";
+
+  /**
+   * The name of the statistic Raw Data Size to be published or gathered.
+   */
+  public static final String RAW_DATA_SIZE = "rawDataSize";
+
+  /**
+   * Temp dir for writing stats from tasks.
+   */
+  public static final String STATS_TMP_LOC = "hive.stats.tmp.loc";
+
+  public static final String STATS_FILE_PREFIX = "tmpstats-";
+  /**
+   * List of all supported statistics
+   */
+  public static final String[] supportedStats = 
{NUM_FILES,ROW_COUNT,TOTAL_SIZE,RAW_DATA_SIZE};
+
+  /**
+   * List of all statistics that need to be collected during query execution. 
These are
+   * statistics that inherently require a scan of the data.
+   */
+  public static final String[] statsRequireCompute = new String[] 
{ROW_COUNT,RAW_DATA_SIZE};
+
+  /**
+   * List of statistics that can be collected quickly without requiring a scan 
of the data.
+   */
+  public static final String[] fastStats = new String[] {NUM_FILES,TOTAL_SIZE};
+
+  // This string constant is used to indicate to AlterHandler that
+  // alterPartition/alterTable is happening via statsTask or via user.
+  public static final String STATS_GENERATED = "STATS_GENERATED";
+
+  public static final String TASK = "TASK";
+
+  public static final String USER = "USER";
+
+  // This string constant is used by AlterHandler to figure out that it should 
not attempt to
+  // update stats. It is set by any client-side task which wishes to signal 
that no stats
+  // update should take place, such as with replication.
+  public static final String DO_NOT_UPDATE_STATS = "DO_NOT_UPDATE_STATS";
+
+  //This string constant will be persisted in metastore to indicate whether 
corresponding
+  //table or partition's statistics and table or partition's column statistics 
are accurate or not.
+  public static final String COLUMN_STATS_ACCURATE = "COLUMN_STATS_ACCURATE";
+
+  public static final String COLUMN_STATS = "COLUMN_STATS";
+
+  public static final String BASIC_STATS = "BASIC_STATS";
+
+  public static final String CASCADE = "CASCADE";
+
+  public static final String TRUE = "true";
+
+  public static final String FALSE = "false";
+
+  // The parameter keys for the table statistics. Those keys are excluded from 
'show create table' command output.
+  public static final String[] TABLE_PARAMS_STATS_KEYS = new String[] {
+    COLUMN_STATS_ACCURATE, NUM_FILES, TOTAL_SIZE,ROW_COUNT, RAW_DATA_SIZE, 
NUM_PARTITIONS};
+
+  private static class ColumnStatsAccurate {
+    private static ObjectReader objectReader;
+    private static ObjectWriter objectWriter;
+
+    static {
+      ObjectMapper objectMapper = new ObjectMapper();
+      objectReader = objectMapper.readerFor(ColumnStatsAccurate.class);
+      objectWriter = objectMapper.writerFor(ColumnStatsAccurate.class);
+    }
+
+    static class BooleanSerializer extends JsonSerializer<Boolean> {
+
+      @Override
+      public void serialize(Boolean value, JsonGenerator jsonGenerator,
+          SerializerProvider serializerProvider) throws IOException {
+        jsonGenerator.writeString(value.toString());
+      }
+    }
+
+    static class BooleanDeserializer extends JsonDeserializer<Boolean> {
+
+      public Boolean deserialize(JsonParser jsonParser,
+          DeserializationContext deserializationContext)
+              throws IOException {
+        return Boolean.valueOf(jsonParser.getValueAsString());
+      }
+    }
+
+    @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+    @JsonSerialize(using = BooleanSerializer.class)
+    @JsonDeserialize(using = BooleanDeserializer.class)
+    @JsonProperty(BASIC_STATS)
+    boolean basicStats;
+
+    @JsonInclude(JsonInclude.Include.NON_EMPTY)
+    @JsonProperty(COLUMN_STATS)
+    @JsonSerialize(contentUsing = BooleanSerializer.class)
+    @JsonDeserialize(contentUsing = BooleanDeserializer.class)
+    TreeMap<String, Boolean> columnStats = new TreeMap<>();
+
+  }
+
+  public static boolean areBasicStatsUptoDate(Map<String, String> params) {
+    if (params == null) {
+      return false;
+    }
+    ColumnStatsAccurate stats = 
parseStatsAcc(params.get(COLUMN_STATS_ACCURATE));
+    return stats.basicStats;
+  }
+
+  public static boolean areColumnStatsUptoDate(Map<String, String> params, 
String colName) {
+    if (params == null) {
+      return false;
+    }
+    ColumnStatsAccurate stats = 
parseStatsAcc(params.get(COLUMN_STATS_ACCURATE));
+    return stats.columnStats.containsKey(colName);
+  }
+
+  // It will only throw JSONException when stats.put(BASIC_STATS, TRUE)
+  // has duplicate key, which is not possible
+  // note that set basic stats false will wipe out column stats too.
+  public static void setBasicStatsState(Map<String, String> params, String 
setting) {
+    if (setting.equals(FALSE)) {
+      if (params!=null && params.containsKey(COLUMN_STATS_ACCURATE)) {
+        params.remove(COLUMN_STATS_ACCURATE);
+      }
+      return;
+    }
+    if (params == null) {
+      throw new RuntimeException("params are null...cant set 
columnstatstate!");
+    }
+    ColumnStatsAccurate stats = 
parseStatsAcc(params.get(COLUMN_STATS_ACCURATE));
+    stats.basicStats = true;
+    try {
+      params.put(COLUMN_STATS_ACCURATE, 
ColumnStatsAccurate.objectWriter.writeValueAsString(stats));
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("can't serialize column stats", e);
+    }
+  }
+
+  public static void setColumnStatsState(Map<String, String> params, 
List<String> colNames) {
+    if (params == null) {
+      throw new RuntimeException("params are null...cant set 
columnstatstate!");
+    }
+    if (colNames == null) {
+      return;
+    }
+    ColumnStatsAccurate stats = 
parseStatsAcc(params.get(COLUMN_STATS_ACCURATE));
+
+    for (String colName : colNames) {
+      if (!stats.columnStats.containsKey(colName)) {
+        stats.columnStats.put(colName, true);
+      }
+    }
+    try {
+      params.put(COLUMN_STATS_ACCURATE, 
ColumnStatsAccurate.objectWriter.writeValueAsString(stats));
+    } catch (JsonProcessingException e) {
+      LOG.trace(e.getMessage());
+    }
+  }
+
+  public static void clearColumnStatsState(Map<String, String> params) {
+    if (params == null) {
+      return;
+    }
+    ColumnStatsAccurate stats = 
parseStatsAcc(params.get(COLUMN_STATS_ACCURATE));
+    stats.columnStats.clear();
+
+    try {
+      params.put(COLUMN_STATS_ACCURATE, 
ColumnStatsAccurate.objectWriter.writeValueAsString(stats));
+    } catch (JsonProcessingException e) {
+      LOG.trace(e.getMessage());
+    }
+  }
+
+  public static void removeColumnStatsState(Map<String, String> params, 
List<String> colNames) {
+    if (params == null) {
+      return;
+    }
+    try {
+      ColumnStatsAccurate stats = 
parseStatsAcc(params.get(COLUMN_STATS_ACCURATE));
+      for (String string : colNames) {
+        stats.columnStats.remove(string);
+      }
+      params.put(COLUMN_STATS_ACCURATE, 
ColumnStatsAccurate.objectWriter.writeValueAsString(stats));
+    } catch (JsonProcessingException e) {
+      LOG.trace(e.getMessage());
+    }
+  }
+
+  public static void setStatsStateForCreateTable(Map<String, String> params,
+      List<String> cols, String setting) {
+    if (TRUE.equals(setting)) {
+      for (String stat : StatsSetupConst.supportedStats) {
+        params.put(stat, "0");
+      }
+    }
+    setBasicStatsState(params, setting);
+    setColumnStatsState(params, cols);
+  }
+  
+  private static ColumnStatsAccurate parseStatsAcc(String statsAcc) {
+    if (statsAcc == null) {
+      return new ColumnStatsAccurate();
+    }
+    try {
+      return ColumnStatsAccurate.objectReader.readValue(statsAcc);
+    } catch (Exception e) {
+      ColumnStatsAccurate ret = new ColumnStatsAccurate();
+      if (TRUE.equalsIgnoreCase(statsAcc)) {
+        ret.basicStats = true;
+      }
+      return ret;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimator.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimator.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimator.java
new file mode 100644
index 0000000..668db10
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.ndv;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+
+public interface NumDistinctValueEstimator {
+
+  Logger LOG = 
LoggerFactory.getLogger(NumDistinctValueEstimator.class.getName());
+
+  void reset();
+
+  byte[] serialize();
+  
+  NumDistinctValueEstimator deserialize(byte[] buf);
+
+  void addToEstimator(long v);
+
+  void addToEstimator(double d);
+
+  void addToEstimator(String s);
+
+  void addToEstimator(HiveDecimal decimal);
+
+  void mergeEstimators(NumDistinctValueEstimator o);
+
+  long estimateNumDistinctValues();
+
+  int lengthFor(JavaDataModel model);
+
+  boolean canMerge(NumDistinctValueEstimator o);
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimatorFactory.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimatorFactory.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimatorFactory.java
new file mode 100644
index 0000000..4e4dfb7
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimatorFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.common.ndv;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.common.ndv.fm.FMSketch;
+import org.apache.hadoop.hive.common.ndv.fm.FMSketchUtils;
+import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog;
+
+public class NumDistinctValueEstimatorFactory {
+
+  private NumDistinctValueEstimatorFactory() {
+  }
+
+  private static boolean isFMSketch(byte[] buf) throws IOException {
+    byte[] magic = new byte[2];
+    magic[0] = (byte) buf[0];
+    magic[1] = (byte) buf[1];
+    return Arrays.equals(magic, FMSketchUtils.MAGIC);
+  }
+
+  public static NumDistinctValueEstimator getNumDistinctValueEstimator(byte[] 
buf) {
+    // Right now we assume only FM and HLL are available.
+    try {
+      if (isFMSketch(buf)) {
+        return FMSketchUtils.deserializeFM(buf);
+      } else {
+        return HyperLogLog.builder().build().deserialize(buf);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static NumDistinctValueEstimator getEmptyNumDistinctValueEstimator(
+      NumDistinctValueEstimator n) {
+    if (n instanceof FMSketch) {
+      return new FMSketch(((FMSketch) n).getNumBitVectors());
+    } else {
+      return HyperLogLog.builder().build();
+    }
+  }
+
+  public static NumDistinctValueEstimator 
getEmptyNumDistinctValueEstimator(String func,
+      int numBitVectors) {
+    if ("fm".equals(func.toLowerCase())) {
+      return new FMSketch(numBitVectors);
+    } else if ("hll".equals(func.toLowerCase())) {
+      return HyperLogLog.builder().build();
+    } else {
+      throw new RuntimeException("Can not recognize " + func);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/fm/FMSketch.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/fm/FMSketch.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/fm/FMSketch.java
new file mode 100644
index 0000000..f6cdc4c
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/fm/FMSketch.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.ndv.fm;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Random;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javolution.util.FastBitSet;
+
+public class FMSketch implements NumDistinctValueEstimator {
+
+  static final Logger LOG = LoggerFactory.getLogger(FMSketch.class.getName());
+
+  /* We want a,b,x to come from a finite field of size 0 to k, where k is a 
prime number.
+   * 2^p - 1 is prime for p = 31. Hence bitvectorSize has to be 31. Pick k to 
be 2^p -1.
+   * If a,b,x didn't come from a finite field ax1 + b mod k and ax2 + b mod k 
will not be pair wise
+   * independent. As a consequence, the hash values will not distribute 
uniformly from 0 to 2^p-1
+   * thus introducing errors in the estimates.
+   */
+  public static final int BIT_VECTOR_SIZE = 31;
+
+  // Refer to Flajolet-Martin'86 for the value of phi
+  private static final double PHI = 0.77351;
+
+  private final int[] a;
+  private final int[] b;
+  private final FastBitSet[] bitVector;
+
+  private final Random aValue;
+  private final Random bValue;
+  
+  private int numBitVectors;
+
+  /* Create a new distinctValueEstimator
+   */
+  public FMSketch(int numBitVectors) {
+    this.numBitVectors = numBitVectors;
+    bitVector = new FastBitSet[numBitVectors];
+    for (int i=0; i< numBitVectors; i++) {
+      bitVector[i] = new FastBitSet(BIT_VECTOR_SIZE);
+    }
+
+    a = new int[numBitVectors];
+    b = new int[numBitVectors];
+
+    /* Use a large prime number as a seed to the random number generator.
+     * Java's random number generator uses the Linear Congruential Generator 
to generate random
+     * numbers using the following recurrence relation,
+     *
+     * X(n+1) = (a X(n) + c ) mod m
+     *
+     *  where X0 is the seed. Java implementation uses m = 2^48. This is 
problematic because 2^48
+     *  is not a prime number and hence the set of numbers from 0 to m don't 
form a finite field.
+     *  If these numbers don't come from a finite field any give X(n) and 
X(n+1) may not be pair
+     *  wise independent.
+     *
+     *  However, empirically passing in prime numbers as seeds seems to work 
better than when passing
+     *  composite numbers as seeds. Ideally Java's Random should pick m such 
that m is prime.
+     *
+     */
+    aValue = new Random(99397);
+    bValue = new Random(9876413);
+
+    for (int i = 0; i < numBitVectors; i++) {
+      int randVal;
+      /* a and b shouldn't be even; If a and b are even, then none of the 
values
+       * will set bit 0 thus introducing errors in the estimate. Both a and b 
can be even
+       * 25% of the times and as a result 25% of the bit vectors could be 
inaccurate. To avoid this
+       * always pick odd values for a and b.
+       */
+      do {
+        randVal = aValue.nextInt();
+      } while (randVal % 2 == 0);
+
+      a[i] = randVal;
+
+      do {
+        randVal = bValue.nextInt();
+      } while (randVal % 2 == 0);
+
+      b[i] = randVal;
+
+      if (a[i] < 0) {
+        a[i] = a[i] + (1 << BIT_VECTOR_SIZE - 1);
+      }
+
+      if (b[i] < 0) {
+        b[i] = b[i] + (1 << BIT_VECTOR_SIZE - 1);
+      }
+    }
+  }
+
+  /**
+   * Resets a distinctValueEstimator object to its original state.
+   */
+  public void reset() {
+    for (int i=0; i< numBitVectors; i++) {
+      bitVector[i].clear();
+    }
+  }
+
+  public FastBitSet getBitVector(int index) {
+    return bitVector[index];
+  }
+
+  public FastBitSet setBitVector(FastBitSet fastBitSet, int index) {
+    return bitVector[index] = fastBitSet;
+  }
+
+  public int getNumBitVectors() {
+    return numBitVectors;
+  }
+
+  public int getBitVectorSize() {
+    return BIT_VECTOR_SIZE;
+  }
+
+  public void printNumDistinctValueEstimator() {
+    String t = new String();
+
+    LOG.debug("NumDistinctValueEstimator");
+    LOG.debug("Number of Vectors: {}", numBitVectors);
+    LOG.debug("Vector Size: {}", BIT_VECTOR_SIZE);
+
+    for (int i=0; i < numBitVectors; i++) {
+      t = t + bitVector[i].toString();
+    }
+
+    LOG.debug("Serialized Vectors: ");
+    LOG.debug(t);
+  }
+
+  @Override
+  public byte[] serialize() {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    // write bytes to bos ...
+    try {
+      FMSketchUtils.serializeFM(bos, this);
+      final byte[] result = bos.toByteArray();
+      bos.close();
+      return result;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public NumDistinctValueEstimator deserialize(byte[] buf) {
+    InputStream is = new ByteArrayInputStream(buf);
+    try {
+      NumDistinctValueEstimator n = FMSketchUtils.deserializeFM(is);
+      is.close();
+      return n;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private int generateHash(long v, int hashNum) {
+    int mod = (1<<BIT_VECTOR_SIZE) - 1;
+    long tempHash = a[hashNum] * v  + b[hashNum];
+    tempHash %= mod;
+    int hash = (int) tempHash;
+
+    /* Hash function should map the long value to 0...2^L-1.
+     * Hence hash value has to be non-negative.
+     */
+    if (hash < 0) {
+      hash = hash + mod;
+    }
+    return hash;
+  }
+
+  private int generateHashForPCSA(long v) {
+    int mod = 1 << (BIT_VECTOR_SIZE - 1) - 1;
+    long tempHash = a[0] * v + b[0];
+    tempHash %= mod;
+    int hash = (int) tempHash;
+
+    /* Hash function should map the long value to 0...2^L-1.
+     * Hence hash value has to be non-negative.
+     */
+    if (hash < 0) {
+      hash = hash + mod + 1;
+    }
+    return hash;
+  }
+
+  public void addToEstimator(long v) {
+    /* Update summary bitVector :
+     * Generate hash value of the long value and mod it by 2^bitVectorSize-1.
+     * In this implementation bitVectorSize is 31.
+     */
+
+    for (int i = 0; i<numBitVectors; i++) {
+      int hash = generateHash(v,i);
+      int index;
+
+      // Find the index of the least significant bit that is 1
+      for (index=0; index<BIT_VECTOR_SIZE; index++) {
+        if (hash % 2 != 0) {
+          break;
+        }
+        hash = hash >> 1;
+      }
+
+      // Set bitvector[index] := 1
+      bitVector[i].set(index);
+    }
+  }
+
+  public void addToEstimatorPCSA(long v) {
+    int hash = generateHashForPCSA(v);
+    int rho = hash/numBitVectors;
+    int index;
+
+    // Find the index of the least significant bit that is 1
+    for (index=0; index<BIT_VECTOR_SIZE; index++) {
+      if (rho % 2 != 0) {
+        break;
+      }
+      rho = rho >> 1;
+    }
+
+    // Set bitvector[index] := 1
+    bitVector[hash%numBitVectors].set(index);
+  }
+
+  public void addToEstimator(double d) {
+    int v = new Double(d).hashCode();
+    addToEstimator(v);
+  }
+
+  public void addToEstimatorPCSA(double d) {
+    int v = new Double(d).hashCode();
+    addToEstimatorPCSA(v);
+  }
+
+  public void addToEstimator(HiveDecimal decimal) {
+    int v = decimal.hashCode();
+    addToEstimator(v);
+  }
+
+  public void addToEstimatorPCSA(HiveDecimal decimal) {
+    int v = decimal.hashCode();
+    addToEstimatorPCSA(v);
+  }
+
+  public void mergeEstimators(FMSketch o) {
+    // Bitwise OR the bitvector with the bitvector in the agg buffer
+    for (int i=0; i<numBitVectors; i++) {
+      bitVector[i].or(o.getBitVector(i));
+    }
+  }
+
+  public long estimateNumDistinctValuesPCSA() {
+    double numDistinctValues = 0.0;
+    long S = 0;
+
+    for (int i=0; i < numBitVectors; i++) {
+      int index = 0;
+      while (bitVector[i].get(index) && index < BIT_VECTOR_SIZE) {
+        index = index + 1;
+      }
+      S = S + index;
+    }
+
+    numDistinctValues = ((numBitVectors/PHI) * Math.pow(2.0, S/numBitVectors));
+    return ((long)numDistinctValues);
+  }
+
+  /* We use the Flajolet-Martin estimator to estimate the number of distinct 
values.FM uses the
+   * location of the least significant zero as an estimate of log2(phi*ndvs).
+   */
+  public long estimateNumDistinctValues() {
+    int sumLeastSigZero = 0;
+    double avgLeastSigZero;
+    double numDistinctValues;
+
+    for (int i=0; i< numBitVectors; i++) {
+      int leastSigZero = bitVector[i].nextClearBit(0);
+      sumLeastSigZero += leastSigZero;
+    }
+
+    avgLeastSigZero =
+        sumLeastSigZero/(numBitVectors * 1.0) - (Math.log(PHI)/Math.log(2.0));
+    numDistinctValues = Math.pow(2.0, avgLeastSigZero);
+    return ((long)(numDistinctValues));
+  }
+
+  @InterfaceAudience.LimitedPrivate(value = {"Hive" })
+  static int lengthFor(JavaDataModel model, Integer numVector) {
+    int length = model.object();
+    length += model.primitive1() * 2;       // two int
+    length += model.primitive2();           // one double
+    length += model.lengthForRandom() * 2;  // two Random
+
+    if (numVector == null) {
+      numVector = 16; // HiveConf hive.stats.ndv.error default produces 16 
vectors
+    }
+
+    if (numVector > 0) {
+      length += model.array() * 3;                    // three array
+      length += model.primitive1() * numVector * 2;   // two int array
+      length += (model.object() + model.array() + model.primitive1() +
+          model.primitive2()) * numVector;   // bitset array
+    }
+    return length;
+  }
+
+  public int lengthFor(JavaDataModel model) {
+    return lengthFor(model, getNumBitVectors());
+  }
+
+  // the caller needs to gurrantee that they are the same type based on 
numBitVectors
+  @Override
+  public void mergeEstimators(NumDistinctValueEstimator o) {
+    // Bitwise OR the bitvector with the bitvector in the agg buffer
+    for (int i = 0; i < numBitVectors; i++) {
+      bitVector[i].or(((FMSketch) o).getBitVector(i));
+    }
+  }
+
+  @Override
+  public void addToEstimator(String s) {
+    int v = s.hashCode();
+    addToEstimator(v);
+  }
+
+  @Override
+  public boolean canMerge(NumDistinctValueEstimator o) {
+    return o instanceof FMSketch && this.numBitVectors == ((FMSketch) 
o).numBitVectors;
+  }
+}

Reply via email to