http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore2.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore2.java b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore2.java new file mode 100644 index 0000000..fa4e02a --- /dev/null +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore2.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.EventMessage; +import org.apache.hadoop.hive.metastore.model.MNotificationLog; +import org.apache.hadoop.hive.metastore.model.MNotificationNextId; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hive.metastore.TestOldSchema.dropAllStoreObjects; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +// Tests from TestObjectStore that can't be moved yet due to references to EventMessage. Once +// EventMessage has been moved this should be recombined with TestObjectStore. + +public class TestObjectStore2 { + private ObjectStore objectStore = null; + + public static class MockPartitionExpressionProxy implements PartitionExpressionProxy { + @Override + public String convertExprToFilter(byte[] expr) throws MetaException { + return null; + } + + @Override + public boolean filterPartitionsByExpr(List<FieldSchema> partColumns, + byte[] expr, String defaultPartitionName, List<String> partitionNames) + throws MetaException { + return false; + } + + @Override + public FileMetadataExprType getMetadataType(String inputFormat) { + return null; + } + + @Override + public SearchArgument createSarg(byte[] expr) { + return null; + } + + @Override + public FileFormatProxy getFileFormatProxy(FileMetadataExprType type) { + return null; + } + } + + @Before + public void setUp() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.EXPRESSION_PROXY_CLASS, + MockPartitionExpressionProxy.class.getName()); + + objectStore = new ObjectStore(); + objectStore.setConf(conf); + dropAllStoreObjects(objectStore); + } + + /** + * Test notification operations + */ + // TODO MS-SPLIT uncomment once we move EventMessage over + @Test + public void testNotificationOps() throws InterruptedException { + final int NO_EVENT_ID = 0; + final int FIRST_EVENT_ID = 1; + final int SECOND_EVENT_ID = 2; + + NotificationEvent event = + new NotificationEvent(0, 0, EventMessage.EventType.CREATE_DATABASE.toString(), ""); + NotificationEventResponse eventResponse; + CurrentNotificationEventId eventId; + + // Verify that there is no notifications available yet + eventId = objectStore.getCurrentNotificationEventId(); + assertEquals(NO_EVENT_ID, eventId.getEventId()); + + // Verify that addNotificationEvent() updates the NotificationEvent with the new event ID + objectStore.addNotificationEvent(event); + assertEquals(FIRST_EVENT_ID, event.getEventId()); + objectStore.addNotificationEvent(event); + assertEquals(SECOND_EVENT_ID, event.getEventId()); + + // Verify that objectStore fetches the latest notification event ID + eventId = objectStore.getCurrentNotificationEventId(); + assertEquals(SECOND_EVENT_ID, eventId.getEventId()); + + // Verify that getNextNotification() returns all events + eventResponse = objectStore.getNextNotification(new NotificationEventRequest()); + assertEquals(2, eventResponse.getEventsSize()); + assertEquals(FIRST_EVENT_ID, eventResponse.getEvents().get(0).getEventId()); + assertEquals(SECOND_EVENT_ID, eventResponse.getEvents().get(1).getEventId()); + + // Verify that getNextNotification(last) returns events after a specified event + eventResponse = objectStore.getNextNotification(new NotificationEventRequest(FIRST_EVENT_ID)); + assertEquals(1, eventResponse.getEventsSize()); + assertEquals(SECOND_EVENT_ID, eventResponse.getEvents().get(0).getEventId()); + + // Verify that getNextNotification(last) returns zero events if there are no more notifications available + eventResponse = objectStore.getNextNotification(new NotificationEventRequest(SECOND_EVENT_ID)); + assertEquals(0, eventResponse.getEventsSize()); + + // Verify that cleanNotificationEvents() cleans up all old notifications + Thread.sleep(1); + objectStore.cleanNotificationEvents(1); + eventResponse = objectStore.getNextNotification(new NotificationEventRequest()); + assertEquals(0, eventResponse.getEventsSize()); + } + + @Ignore( + "This test is here to allow testing with other databases like mysql / postgres etc\n" + + " with user changes to the code. This cannot be run on apache derby because of\n" + + " https://db.apache.org/derby/docs/10.10/devguide/cdevconcepts842385.html" + ) + @Test + public void testConcurrentAddNotifications() throws ExecutionException, InterruptedException { + + final int NUM_THREADS = 10; + CyclicBarrier cyclicBarrier = new CyclicBarrier(NUM_THREADS, + () -> LoggerFactory.getLogger("test") + .debug(NUM_THREADS + " threads going to add notification")); + + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.EXPRESSION_PROXY_CLASS, + MockPartitionExpressionProxy.class.getName()); + /* + Below are the properties that need to be set based on what database this test is going to be run + */ + +// conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "com.mysql.jdbc.Driver"); +// conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, +// "jdbc:mysql://localhost:3306/metastore_db"); +// conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, ""); +// conf.setVar(HiveConf.ConfVars.METASTOREPWD, ""); + + /* + we have to add this one manually as for tests the db is initialized via the metastoreDiretSQL + and we don't run the schema creation sql that includes the an insert for notification_sequence + which can be locked. the entry in notification_sequence happens via notification_event insertion. + */ + objectStore.getPersistenceManager().newQuery(MNotificationLog.class, "eventType==''").execute(); + objectStore.getPersistenceManager().newQuery(MNotificationNextId.class, "nextEventId==-1").execute(); + + objectStore.addNotificationEvent( + new NotificationEvent(0, 0, + EventMessage.EventType.CREATE_DATABASE.toString(), + "CREATE DATABASE DB initial")); + + ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS); + for (int i = 0; i < NUM_THREADS; i++) { + final int n = i; + + executorService.execute( + () -> { + ObjectStore store = new ObjectStore(); + store.setConf(conf); + + String eventType = EventMessage.EventType.CREATE_DATABASE.toString(); + NotificationEvent dbEvent = + new NotificationEvent(0, 0, eventType, + "CREATE DATABASE DB" + n); + System.out.println("ADDING NOTIFICATION"); + + try { + cyclicBarrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new RuntimeException(e); + } + store.addNotificationEvent(dbEvent); + System.out.println("FINISH NOTIFICATION"); + }); + } + executorService.shutdown(); + assertTrue(executorService.awaitTermination(15, TimeUnit.SECONDS)); + + // we have to setup this again as the underlying PMF keeps getting reinitialized with original + // reference closed + ObjectStore store = new ObjectStore(); + store.setConf(conf); + + NotificationEventResponse eventResponse = store.getNextNotification( + new NotificationEventRequest()); + assertEquals(NUM_THREADS + 1, eventResponse.getEventsSize()); + long previousId = 0; + for (NotificationEvent event : eventResponse.getEvents()) { + assertTrue("previous:" + previousId + " current:" + event.getEventId(), + previousId < event.getEventId()); + assertTrue(previousId + 1 == event.getEventId()); + previousId = event.getEventId(); + } + } +}
http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/metastore/src/test/org/apache/hadoop/hive/metastore/TestRawStoreProxy.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/TestRawStoreProxy.java b/metastore/src/test/org/apache/hadoop/hive/metastore/TestRawStoreProxy.java deleted file mode 100644 index 68d65a8..0000000 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/TestRawStoreProxy.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.metastore; - -import static org.junit.Assert.fail; - -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.junit.Test; - -public class TestRawStoreProxy { - - static class TestStore extends ObjectStore { - @Override - public void setConf(Configuration conf) { - // noop - } - - public void noopMethod() throws MetaException { - Deadline.checkTimeout(); - } - - public void exceptions() throws IllegalStateException, MetaException { - Deadline.checkTimeout(); - throw new IllegalStateException("throwing an exception"); - } - } - - @Test - public void testExceptionDispatch() throws Throwable { - HiveConf hiveConf = new HiveConf(); - hiveConf.setTimeVar(HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, 10, - TimeUnit.MILLISECONDS); - RawStoreProxy rsp = new RawStoreProxy(hiveConf, hiveConf, TestStore.class, 1); - try { - rsp.invoke(null, TestStore.class.getMethod("exceptions"), new Object[] {}); - fail("an exception is expected"); - } catch (IllegalStateException ise) { - // expected - } - Thread.sleep(20); - // this shouldn't throw an exception - rsp.invoke(null, TestStore.class.getMethod("noopMethod"), new Object[] {}); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java deleted file mode 100644 index b6d2df5..0000000 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java +++ /dev/null @@ -1,901 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore.cache; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.ObjectStore; -import org.apache.hadoop.hive.metastore.TableType; -import org.apache.hadoop.hive.metastore.TestObjectStore.MockPartitionExpressionProxy; -import org.apache.hadoop.hive.metastore.api.AggrStats; -import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.PrincipalType; -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.columnstats.cache.LongColumnStatsDataInspector; -import org.apache.hadoop.hive.metastore.columnstats.cache.StringColumnStatsDataInspector; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class TestCachedStore { - - private ObjectStore objectStore; - private CachedStore cachedStore; - private SharedCache sharedCache; - - @Before - public void setUp() throws Exception { - HiveConf conf = new HiveConf(); - conf.setBoolean(HiveConf.ConfVars.HIVE_IN_TEST.varname, true); - conf.setVar(HiveConf.ConfVars.METASTORE_EXPRESSION_PROXY_CLASS, - MockPartitionExpressionProxy.class.getName()); - objectStore = new ObjectStore(); - objectStore.setConf(conf); - cachedStore = new CachedStore(); - cachedStore.setConf(conf); - // Stop the CachedStore cache update service. We'll start it explicitly to control the test - CachedStore.stopCacheUpdateService(1); - cachedStore.setInitializedForTest(); - - // Stop the CachedStore cache update service. We'll start it explicitly to control the test - CachedStore.stopCacheUpdateService(1); - sharedCache = new SharedCache(); - sharedCache.getDatabaseCache().clear(); - sharedCache.getTableCache().clear(); - sharedCache.getPartitionCache().clear(); - sharedCache.getSdCache().clear(); - sharedCache.getPartitionColStatsCache().clear(); - } - - /********************************************************************************************** - * Methods that test CachedStore - *********************************************************************************************/ - - @Test - public void testDatabaseOps() throws Exception { - // Add a db via ObjectStore - String dbName = "testDatabaseOps"; - String dbDescription = "testDatabaseOps"; - String dbLocation = "file:/tmp"; - Map<String, String> dbParams = new HashMap<String, String>(); - String dbOwner = "user1"; - Database db = new Database(dbName, dbDescription, dbLocation, dbParams); - db.setOwnerName(dbOwner); - db.setOwnerType(PrincipalType.USER); - objectStore.createDatabase(db); - db = objectStore.getDatabase(dbName); - // Prewarm CachedStore - CachedStore.prewarm(objectStore); - - // Read database via CachedStore - Database dbNew = cachedStore.getDatabase(dbName); - Assert.assertEquals(db, dbNew); - - // Add another db via CachedStore - final String dbName1 = "testDatabaseOps1"; - final String dbDescription1 = "testDatabaseOps1"; - Database db1 = new Database(dbName1, dbDescription1, dbLocation, dbParams); - db1.setOwnerName(dbOwner); - db1.setOwnerType(PrincipalType.USER); - cachedStore.createDatabase(db1); - db1 = cachedStore.getDatabase(dbName1); - - // Read db via ObjectStore - dbNew = objectStore.getDatabase(dbName1); - Assert.assertEquals(db1, dbNew); - - // Alter the db via CachedStore (can only alter owner or parameters) - db = new Database(dbName, dbDescription, dbLocation, dbParams); - dbOwner = "user2"; - db.setOwnerName(dbOwner); - db.setOwnerType(PrincipalType.USER); - cachedStore.alterDatabase(dbName, db); - db = cachedStore.getDatabase(dbName); - - // Read db via ObjectStore - dbNew = objectStore.getDatabase(dbName); - Assert.assertEquals(db, dbNew); - - // Add another db via ObjectStore - final String dbName2 = "testDatabaseOps2"; - final String dbDescription2 = "testDatabaseOps2"; - Database db2 = new Database(dbName2, dbDescription2, dbLocation, dbParams); - db2.setOwnerName(dbOwner); - db2.setOwnerType(PrincipalType.USER); - objectStore.createDatabase(db2); - db2 = objectStore.getDatabase(dbName2); - - // Alter db "testDatabaseOps" via ObjectStore - dbOwner = "user1"; - db = new Database(dbName, dbDescription, dbLocation, dbParams); - db.setOwnerName(dbOwner); - db.setOwnerType(PrincipalType.USER); - objectStore.alterDatabase(dbName, db); - db = objectStore.getDatabase(dbName); - - // Drop db "testDatabaseOps1" via ObjectStore - objectStore.dropDatabase(dbName1); - - // We update twice to accurately detect if cache is dirty or not - updateCache(cachedStore, 100, 500, 100); - updateCache(cachedStore, 100, 500, 100); - - // Read the newly added db via CachedStore - dbNew = cachedStore.getDatabase(dbName2); - Assert.assertEquals(db2, dbNew); - - // Read the altered db via CachedStore (altered user from "user2" to "user1") - dbNew = cachedStore.getDatabase(dbName); - Assert.assertEquals(db, dbNew); - - // Try to read the dropped db after cache update - try { - dbNew = cachedStore.getDatabase(dbName1); - Assert.fail("The database: " + dbName1 - + " should have been removed from the cache after running the update service"); - } catch (NoSuchObjectException e) { - // Expected - } - - // Clean up - objectStore.dropDatabase(dbName); - objectStore.dropDatabase(dbName2); - } - - @Test - public void testTableOps() throws Exception { - // Add a db via ObjectStore - String dbName = "testTableOps"; - String dbDescription = "testTableOps"; - String dbLocation = "file:/tmp"; - Map<String, String> dbParams = new HashMap<String, String>(); - String dbOwner = "user1"; - Database db = new Database(dbName, dbDescription, dbLocation, dbParams); - db.setOwnerName(dbOwner); - db.setOwnerType(PrincipalType.USER); - objectStore.createDatabase(db); - db = objectStore.getDatabase(dbName); - - // Add a table via ObjectStore - String tblName = "tbl"; - String tblOwner = "user1"; - String serdeLocation = "file:/tmp"; - FieldSchema col1 = new FieldSchema("col1", "int", "integer column"); - FieldSchema col2 = new FieldSchema("col2", "string", "string column"); - List<FieldSchema> cols = new ArrayList<FieldSchema>(); - cols.add(col1); - cols.add(col2); - Map<String, String> serdeParams = new HashMap<String, String>(); - Map<String, String> tblParams = new HashMap<String, String>(); - SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", new HashMap<String, String>()); - StorageDescriptor sd = - new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null, - null, serdeParams); - sd.setStoredAsSubDirectories(false); - Table tbl = - new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams, - null, null, TableType.MANAGED_TABLE.toString()); - objectStore.createTable(tbl); - tbl = objectStore.getTable(dbName, tblName); - - // Prewarm CachedStore - CachedStore.prewarm(objectStore); - - // Read database, table via CachedStore - Database dbNew = cachedStore.getDatabase(dbName); - Assert.assertEquals(db, dbNew); - Table tblNew = cachedStore.getTable(dbName, tblName); - Assert.assertEquals(tbl, tblNew); - - // Add a new table via CachedStore - String tblName1 = "tbl1"; - Table tbl1 = - new Table(tblName1, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams, - null, null, TableType.MANAGED_TABLE.toString()); - cachedStore.createTable(tbl1); - tbl1 = cachedStore.getTable(dbName, tblName1); - - // Read via object store - tblNew = objectStore.getTable(dbName, tblName1); - Assert.assertEquals(tbl1, tblNew); - - // Add a new table via ObjectStore - String tblName2 = "tbl2"; - Table tbl2 = - new Table(tblName2, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams, - null, null, TableType.MANAGED_TABLE.toString()); - objectStore.createTable(tbl2); - tbl2 = objectStore.getTable(dbName, tblName2); - - // Alter table "tbl" via ObjectStore - tblOwner = "user2"; - tbl = - new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams, - null, null, TableType.MANAGED_TABLE.toString()); - objectStore.alterTable(dbName, tblName, tbl); - tbl = objectStore.getTable(dbName, tblName); - - // Drop table "tbl1" via ObjectStore - objectStore.dropTable(dbName, tblName1); - - // We update twice to accurately detect if cache is dirty or not - updateCache(cachedStore, 100, 500, 100); - updateCache(cachedStore, 100, 500, 100); - - // Read "tbl2" via CachedStore - tblNew = cachedStore.getTable(dbName, tblName2); - Assert.assertEquals(tbl2, tblNew); - - // Read the altered "tbl" via CachedStore - tblNew = cachedStore.getTable(dbName, tblName); - Assert.assertEquals(tbl, tblNew); - - // Try to read the dropped "tbl1" via CachedStore (should throw exception) - tblNew = cachedStore.getTable(dbName, tblName1); - Assert.assertNull(tblNew); - - // Should return "tbl" and "tbl2" - List<String> tblNames = cachedStore.getTables(dbName, "*"); - Assert.assertTrue(tblNames.contains(tblName)); - Assert.assertTrue(!tblNames.contains(tblName1)); - Assert.assertTrue(tblNames.contains(tblName2)); - - // Clean up - objectStore.dropTable(dbName, tblName); - objectStore.dropTable(dbName, tblName2); - objectStore.dropDatabase(dbName); - } - - @Test - public void testPartitionOps() throws Exception { - // Add a db via ObjectStore - String dbName = "testPartitionOps"; - String dbDescription = "testPartitionOps"; - String dbLocation = "file:/tmp"; - Map<String, String> dbParams = new HashMap<String, String>(); - String dbOwner = "user1"; - Database db = new Database(dbName, dbDescription, dbLocation, dbParams); - db.setOwnerName(dbOwner); - db.setOwnerType(PrincipalType.USER); - objectStore.createDatabase(db); - db = objectStore.getDatabase(dbName); - - // Add a table via ObjectStore - String tblName = "tbl"; - String tblOwner = "user1"; - String serdeLocation = "file:/tmp"; - FieldSchema col1 = new FieldSchema("col1", "int", "integer column"); - FieldSchema col2 = new FieldSchema("col2", "string", "string column"); - List<FieldSchema> cols = new ArrayList<FieldSchema>(); - cols.add(col1); - cols.add(col2); - Map<String, String> serdeParams = new HashMap<String, String>(); - Map<String, String> tblParams = new HashMap<String, String>(); - SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", null); - StorageDescriptor sd = - new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null, - null, serdeParams); - FieldSchema ptnCol1 = new FieldSchema("part1", "string", "string partition column"); - List<FieldSchema> ptnCols = new ArrayList<FieldSchema>(); - ptnCols.add(ptnCol1); - Table tbl = - new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols, tblParams, null, null, - TableType.MANAGED_TABLE.toString()); - objectStore.createTable(tbl); - tbl = objectStore.getTable(dbName, tblName); - final String ptnColVal1 = "aaa"; - Map<String, String> partParams = new HashMap<String, String>(); - Partition ptn1 = - new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0, 0, sd, partParams); - objectStore.addPartition(ptn1); - ptn1 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1)); - final String ptnColVal2 = "bbb"; - Partition ptn2 = - new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0, 0, sd, partParams); - objectStore.addPartition(ptn2); - ptn2 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2)); - - // Prewarm CachedStore - CachedStore.prewarm(objectStore); - - // Read database, table, partition via CachedStore - Database dbNew = cachedStore.getDatabase(dbName); - Assert.assertEquals(db, dbNew); - Table tblNew = cachedStore.getTable(dbName, tblName); - Assert.assertEquals(tbl, tblNew); - Partition newPtn1 = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1)); - Assert.assertEquals(ptn1, newPtn1); - Partition newPtn2 = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2)); - Assert.assertEquals(ptn2, newPtn2); - - // Add a new partition via ObjectStore - final String ptnColVal3 = "ccc"; - Partition ptn3 = - new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0, 0, sd, partParams); - objectStore.addPartition(ptn3); - ptn3 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal3)); - - // Alter an existing partition ("aaa") via ObjectStore - final String ptnColVal1Alt = "aaaAlt"; - Partition ptn1Atl = - new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0, 0, sd, partParams); - objectStore.alterPartition(dbName, tblName, Arrays.asList(ptnColVal1), ptn1Atl); - ptn1Atl = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt)); - - // Drop an existing partition ("bbb") via ObjectStore - objectStore.dropPartition(dbName, tblName, Arrays.asList(ptnColVal2)); - - // We update twice to accurately detect if cache is dirty or not - updateCache(cachedStore, 100, 500, 100); - updateCache(cachedStore, 100, 500, 100); - - // Read the newly added partition via CachedStore - Partition newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal3)); - Assert.assertEquals(ptn3, newPtn); - - // Read the altered partition via CachedStore - newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt)); - Assert.assertEquals(ptn1Atl, newPtn); - - // Try to read the dropped partition via CachedStore - try { - newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2)); - Assert.fail("The partition: " + ptnColVal2 - + " should have been removed from the cache after running the update service"); - } catch (NoSuchObjectException e) { - // Expected - } - } - - //@Test - public void testTableColStatsOps() throws Exception { - // Add a db via ObjectStore - String dbName = "testTableColStatsOps"; - String dbDescription = "testTableColStatsOps"; - String dbLocation = "file:/tmp"; - Map<String, String> dbParams = new HashMap<String, String>(); - String dbOwner = "user1"; - Database db = new Database(dbName, dbDescription, dbLocation, dbParams); - db.setOwnerName(dbOwner); - db.setOwnerType(PrincipalType.USER); - objectStore.createDatabase(db); - db = objectStore.getDatabase(dbName); - - // Add a table via ObjectStore - final String tblName = "tbl"; - final String tblOwner = "user1"; - final String serdeLocation = "file:/tmp"; - final FieldSchema col1 = new FieldSchema("col1", "int", "integer column"); - // Stats values for col1 - long col1LowVal = 5; - long col1HighVal = 500; - long col1Nulls = 10; - long col1DV = 20; - final FieldSchema col2 = new FieldSchema("col2", "string", "string column"); - // Stats values for col2 - long col2MaxColLen = 100; - double col2AvgColLen = 45.5; - long col2Nulls = 5; - long col2DV = 40; - final FieldSchema col3 = new FieldSchema("col3", "boolean", "boolean column"); - // Stats values for col3 - long col3NumTrues = 100; - long col3NumFalses = 30; - long col3Nulls = 10; - final List<FieldSchema> cols = new ArrayList<FieldSchema>(); - cols.add(col1); - cols.add(col2); - cols.add(col3); - Map<String, String> serdeParams = new HashMap<String, String>(); - Map<String, String> tblParams = new HashMap<String, String>(); - final SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", null); - StorageDescriptor sd = - new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null, - null, serdeParams); - Table tbl = - new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams, - null, null, TableType.MANAGED_TABLE.toString()); - objectStore.createTable(tbl); - tbl = objectStore.getTable(dbName, tblName); - - // Add ColumnStatistics for tbl to metastore DB via ObjectStore - ColumnStatistics stats = new ColumnStatistics(); - ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, tblName); - List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>(); - - // Col1 - ColumnStatisticsData data1 = new ColumnStatisticsData(); - ColumnStatisticsObj col1Stats = new ColumnStatisticsObj(col1.getName(), col1.getType(), data1); - LongColumnStatsDataInspector longStats = new LongColumnStatsDataInspector(); - longStats.setLowValue(col1LowVal); - longStats.setHighValue(col1HighVal); - longStats.setNumNulls(col1Nulls); - longStats.setNumDVs(col1DV); - data1.setLongStats(longStats); - colStatObjs.add(col1Stats); - - // Col2 - ColumnStatisticsData data2 = new ColumnStatisticsData(); - ColumnStatisticsObj col2Stats = new ColumnStatisticsObj(col2.getName(), col2.getType(), data2); - StringColumnStatsDataInspector stringStats = new StringColumnStatsDataInspector(); - stringStats.setMaxColLen(col2MaxColLen); - stringStats.setAvgColLen(col2AvgColLen); - stringStats.setNumNulls(col2Nulls); - stringStats.setNumDVs(col2DV); - data2.setStringStats(stringStats); - colStatObjs.add(col2Stats); - - // Col3 - ColumnStatisticsData data3 = new ColumnStatisticsData(); - ColumnStatisticsObj col3Stats = new ColumnStatisticsObj(col3.getName(), col3.getType(), data3); - BooleanColumnStatsData boolStats = new BooleanColumnStatsData(); - boolStats.setNumTrues(col3NumTrues); - boolStats.setNumFalses(col3NumFalses); - boolStats.setNumNulls(col3Nulls); - data3.setBooleanStats(boolStats); - colStatObjs.add(col3Stats); - - stats.setStatsDesc(statsDesc); - stats.setStatsObj(colStatObjs); - - // Save to DB - objectStore.updateTableColumnStatistics(stats); - - // Prewarm CachedStore - CachedStore.prewarm(objectStore); - - // Read table stats via CachedStore - ColumnStatistics newStats = - cachedStore.getTableColumnStatistics(dbName, tblName, - Arrays.asList(col1.getName(), col2.getName(), col3.getName())); - Assert.assertEquals(stats, newStats); - } - - private void updateCache(CachedStore cachedStore, long frequency, long sleepTime, - long shutdownTimeout) throws InterruptedException { - // Set cache refresh period to 100 milliseconds - CachedStore.setCacheRefreshPeriod(100); - // Start the CachedStore update service - CachedStore.startCacheUpdateService(cachedStore.getConf()); - // Sleep for 500 ms so that cache update is complete - Thread.sleep(500); - // Stop cache update service - CachedStore.stopCacheUpdateService(100); - } - - /********************************************************************************************** - * Methods that test SharedCache - *********************************************************************************************/ - - @Test - public void testSharedStoreDb() { - Database db1 = new Database(); - Database db2 = new Database(); - Database db3 = new Database(); - Database newDb1 = new Database(); - newDb1.setName("db1"); - - sharedCache.addDatabaseToCache("db1", db1); - sharedCache.addDatabaseToCache("db2", db2); - sharedCache.addDatabaseToCache("db3", db3); - - Assert.assertEquals(sharedCache.getCachedDatabaseCount(), 3); - - sharedCache.alterDatabaseInCache("db1", newDb1); - - Assert.assertEquals(sharedCache.getCachedDatabaseCount(), 3); - - sharedCache.removeDatabaseFromCache("db2"); - - Assert.assertEquals(sharedCache.getCachedDatabaseCount(), 2); - - List<String> dbs = sharedCache.listCachedDatabases(); - Assert.assertEquals(dbs.size(), 2); - Assert.assertTrue(dbs.contains("db1")); - Assert.assertTrue(dbs.contains("db3")); - } - - @Test - public void testSharedStoreTable() { - Table tbl1 = new Table(); - StorageDescriptor sd1 = new StorageDescriptor(); - List<FieldSchema> cols1 = new ArrayList<FieldSchema>(); - cols1.add(new FieldSchema("col1", "int", "")); - Map<String, String> params1 = new HashMap<String, String>(); - params1.put("key", "value"); - sd1.setCols(cols1); - sd1.setParameters(params1); - sd1.setLocation("loc1"); - tbl1.setSd(sd1); - tbl1.setPartitionKeys(new ArrayList<FieldSchema>()); - - Table tbl2 = new Table(); - StorageDescriptor sd2 = new StorageDescriptor(); - List<FieldSchema> cols2 = new ArrayList<FieldSchema>(); - cols2.add(new FieldSchema("col1", "int", "")); - Map<String, String> params2 = new HashMap<String, String>(); - params2.put("key", "value"); - sd2.setCols(cols2); - sd2.setParameters(params2); - sd2.setLocation("loc2"); - tbl2.setSd(sd2); - tbl2.setPartitionKeys(new ArrayList<FieldSchema>()); - - Table tbl3 = new Table(); - StorageDescriptor sd3 = new StorageDescriptor(); - List<FieldSchema> cols3 = new ArrayList<FieldSchema>(); - cols3.add(new FieldSchema("col3", "int", "")); - Map<String, String> params3 = new HashMap<String, String>(); - params3.put("key2", "value2"); - sd3.setCols(cols3); - sd3.setParameters(params3); - sd3.setLocation("loc3"); - tbl3.setSd(sd3); - tbl3.setPartitionKeys(new ArrayList<FieldSchema>()); - - Table newTbl1 = new Table(); - newTbl1.setDbName("db2"); - newTbl1.setTableName("tbl1"); - StorageDescriptor newSd1 = new StorageDescriptor(); - List<FieldSchema> newCols1 = new ArrayList<FieldSchema>(); - newCols1.add(new FieldSchema("newcol1", "int", "")); - Map<String, String> newParams1 = new HashMap<String, String>(); - newParams1.put("key", "value"); - newSd1.setCols(newCols1); - newSd1.setParameters(params1); - newSd1.setLocation("loc1"); - newTbl1.setSd(newSd1); - newTbl1.setPartitionKeys(new ArrayList<FieldSchema>()); - - sharedCache.addTableToCache("db1", "tbl1", tbl1); - sharedCache.addTableToCache("db1", "tbl2", tbl2); - sharedCache.addTableToCache("db1", "tbl3", tbl3); - sharedCache.addTableToCache("db2", "tbl1", tbl1); - - Assert.assertEquals(sharedCache.getCachedTableCount(), 4); - Assert.assertEquals(sharedCache.getSdCache().size(), 2); - - Table t = sharedCache.getTableFromCache("db1", "tbl1"); - Assert.assertEquals(t.getSd().getLocation(), "loc1"); - - sharedCache.removeTableFromCache("db1", "tbl1"); - Assert.assertEquals(sharedCache.getCachedTableCount(), 3); - Assert.assertEquals(sharedCache.getSdCache().size(), 2); - - sharedCache.alterTableInCache("db2", "tbl1", newTbl1); - Assert.assertEquals(sharedCache.getCachedTableCount(), 3); - Assert.assertEquals(sharedCache.getSdCache().size(), 3); - - sharedCache.removeTableFromCache("db1", "tbl2"); - Assert.assertEquals(sharedCache.getCachedTableCount(), 2); - Assert.assertEquals(sharedCache.getSdCache().size(), 2); - } - - - @Test - public void testSharedStorePartition() { - Partition part1 = new Partition(); - StorageDescriptor sd1 = new StorageDescriptor(); - List<FieldSchema> cols1 = new ArrayList<FieldSchema>(); - cols1.add(new FieldSchema("col1", "int", "")); - Map<String, String> params1 = new HashMap<String, String>(); - params1.put("key", "value"); - sd1.setCols(cols1); - sd1.setParameters(params1); - sd1.setLocation("loc1"); - part1.setSd(sd1); - part1.setValues(Arrays.asList("201701")); - - Partition part2 = new Partition(); - StorageDescriptor sd2 = new StorageDescriptor(); - List<FieldSchema> cols2 = new ArrayList<FieldSchema>(); - cols2.add(new FieldSchema("col1", "int", "")); - Map<String, String> params2 = new HashMap<String, String>(); - params2.put("key", "value"); - sd2.setCols(cols2); - sd2.setParameters(params2); - sd2.setLocation("loc2"); - part2.setSd(sd2); - part2.setValues(Arrays.asList("201702")); - - Partition part3 = new Partition(); - StorageDescriptor sd3 = new StorageDescriptor(); - List<FieldSchema> cols3 = new ArrayList<FieldSchema>(); - cols3.add(new FieldSchema("col3", "int", "")); - Map<String, String> params3 = new HashMap<String, String>(); - params3.put("key2", "value2"); - sd3.setCols(cols3); - sd3.setParameters(params3); - sd3.setLocation("loc3"); - part3.setSd(sd3); - part3.setValues(Arrays.asList("201703")); - - Partition newPart1 = new Partition(); - newPart1.setDbName("db1"); - newPart1.setTableName("tbl1"); - StorageDescriptor newSd1 = new StorageDescriptor(); - List<FieldSchema> newCols1 = new ArrayList<FieldSchema>(); - newCols1.add(new FieldSchema("newcol1", "int", "")); - Map<String, String> newParams1 = new HashMap<String, String>(); - newParams1.put("key", "value"); - newSd1.setCols(newCols1); - newSd1.setParameters(params1); - newSd1.setLocation("loc1"); - newPart1.setSd(newSd1); - newPart1.setValues(Arrays.asList("201701")); - - sharedCache.addPartitionToCache("db1", "tbl1", part1); - sharedCache.addPartitionToCache("db1", "tbl1", part2); - sharedCache.addPartitionToCache("db1", "tbl1", part3); - sharedCache.addPartitionToCache("db1", "tbl2", part1); - - Assert.assertEquals(sharedCache.getCachedPartitionCount(), 4); - Assert.assertEquals(sharedCache.getSdCache().size(), 2); - - Partition t = sharedCache.getPartitionFromCache("db1", "tbl1", Arrays.asList("201701")); - Assert.assertEquals(t.getSd().getLocation(), "loc1"); - - sharedCache.removePartitionFromCache("db1", "tbl2", Arrays.asList("201701")); - Assert.assertEquals(sharedCache.getCachedPartitionCount(), 3); - Assert.assertEquals(sharedCache.getSdCache().size(), 2); - - sharedCache.alterPartitionInCache("db1", "tbl1", Arrays.asList("201701"), newPart1); - Assert.assertEquals(sharedCache.getCachedPartitionCount(), 3); - Assert.assertEquals(sharedCache.getSdCache().size(), 3); - - sharedCache.removePartitionFromCache("db1", "tbl1", Arrays.asList("201702")); - Assert.assertEquals(sharedCache.getCachedPartitionCount(), 2); - Assert.assertEquals(sharedCache.getSdCache().size(), 2); - } - - @Test - public void testAggrStatsRepeatedRead() throws Exception { - String dbName = "testTableColStatsOps"; - String tblName = "tbl"; - String colName = "f1"; - - Database db = new Database(dbName, null, "some_location", null); - cachedStore.createDatabase(db); - - List<FieldSchema> cols = new ArrayList<FieldSchema>(); - cols.add(new FieldSchema(colName, "int", null)); - List<FieldSchema> partCols = new ArrayList<FieldSchema>(); - partCols.add(new FieldSchema("col", "int", null)); - StorageDescriptor sd = - new StorageDescriptor(cols, null, "input", "output", false, 0, new SerDeInfo("serde", "seriallib", new HashMap<String, String>()), - null, null, null); - - Table tbl = - new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new HashMap<String, String>(), - null, null, TableType.MANAGED_TABLE.toString()); - cachedStore.createTable(tbl); - - List<String> partVals1 = new ArrayList<String>(); - partVals1.add("1"); - List<String> partVals2 = new ArrayList<String>(); - partVals2.add("2"); - - Partition ptn1 = - new Partition(partVals1, dbName, tblName, 0, 0, sd, new HashMap<String, String>()); - cachedStore.addPartition(ptn1); - Partition ptn2 = - new Partition(partVals2, dbName, tblName, 0, 0, sd, new HashMap<String, String>()); - cachedStore.addPartition(ptn2); - - ColumnStatistics stats = new ColumnStatistics(); - ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, tblName); - statsDesc.setPartName("col"); - List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>(); - - ColumnStatisticsData data = new ColumnStatisticsData(); - ColumnStatisticsObj colStats = new ColumnStatisticsObj(colName, "int", data); - LongColumnStatsDataInspector longStats = new LongColumnStatsDataInspector(); - longStats.setLowValue(0); - longStats.setHighValue(100); - longStats.setNumNulls(50); - longStats.setNumDVs(30); - data.setLongStats(longStats); - colStatObjs.add(colStats); - - stats.setStatsDesc(statsDesc); - stats.setStatsObj(colStatObjs); - - cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals1); - cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals2); - - List<String> colNames = new ArrayList<String>(); - colNames.add(colName); - List<String> aggrPartVals = new ArrayList<String>(); - aggrPartVals.add("1"); - aggrPartVals.add("2"); - AggrStats aggrStats = cachedStore.get_aggr_stats_for(dbName, tblName, aggrPartVals, colNames); - Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(), 100); - aggrStats = cachedStore.get_aggr_stats_for(dbName, tblName, aggrPartVals, colNames); - Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(), 100); - } - - @Test - public void testPartitionAggrStats() throws Exception { - String dbName = "testTableColStatsOps1"; - String tblName = "tbl1"; - String colName = "f1"; - - Database db = new Database(dbName, null, "some_location", null); - cachedStore.createDatabase(db); - - List<FieldSchema> cols = new ArrayList<FieldSchema>(); - cols.add(new FieldSchema(colName, "int", null)); - List<FieldSchema> partCols = new ArrayList<FieldSchema>(); - partCols.add(new FieldSchema("col", "int", null)); - StorageDescriptor sd = - new StorageDescriptor(cols, null, "input", "output", false, 0, new SerDeInfo("serde", "seriallib", new HashMap<String, String>()), - null, null, null); - - Table tbl = - new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new HashMap<String, String>(), - null, null, TableType.MANAGED_TABLE.toString()); - cachedStore.createTable(tbl); - - List<String> partVals1 = new ArrayList<String>(); - partVals1.add("1"); - List<String> partVals2 = new ArrayList<String>(); - partVals2.add("2"); - - Partition ptn1 = - new Partition(partVals1, dbName, tblName, 0, 0, sd, new HashMap<String, String>()); - cachedStore.addPartition(ptn1); - Partition ptn2 = - new Partition(partVals2, dbName, tblName, 0, 0, sd, new HashMap<String, String>()); - cachedStore.addPartition(ptn2); - - ColumnStatistics stats = new ColumnStatistics(); - ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, tblName); - statsDesc.setPartName("col"); - List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>(); - - ColumnStatisticsData data = new ColumnStatisticsData(); - ColumnStatisticsObj colStats = new ColumnStatisticsObj(colName, "int", data); - LongColumnStatsDataInspector longStats = new LongColumnStatsDataInspector(); - longStats.setLowValue(0); - longStats.setHighValue(100); - longStats.setNumNulls(50); - longStats.setNumDVs(30); - data.setLongStats(longStats); - colStatObjs.add(colStats); - - stats.setStatsDesc(statsDesc); - stats.setStatsObj(colStatObjs); - - cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals1); - - longStats.setNumDVs(40); - cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals2); - - List<String> colNames = new ArrayList<String>(); - colNames.add(colName); - List<String> aggrPartVals = new ArrayList<String>(); - aggrPartVals.add("1"); - aggrPartVals.add("2"); - AggrStats aggrStats = cachedStore.get_aggr_stats_for(dbName, tblName, aggrPartVals, colNames); - Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(), 100); - Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumDVs(), 40); - aggrStats = cachedStore.get_aggr_stats_for(dbName, tblName, aggrPartVals, colNames); - Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(), 100); - Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumDVs(), 40); - } - - @Test - public void testPartitionAggrStatsBitVector() throws Exception { - String dbName = "testTableColStatsOps2"; - String tblName = "tbl2"; - String colName = "f1"; - - Database db = new Database(dbName, null, "some_location", null); - cachedStore.createDatabase(db); - - List<FieldSchema> cols = new ArrayList<FieldSchema>(); - cols.add(new FieldSchema(colName, "int", null)); - List<FieldSchema> partCols = new ArrayList<FieldSchema>(); - partCols.add(new FieldSchema("col", "int", null)); - StorageDescriptor sd = - new StorageDescriptor(cols, null, "input", "output", false, 0, new SerDeInfo("serde", "seriallib", new HashMap<String, String>()), - null, null, null); - - Table tbl = - new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new HashMap<String, String>(), - null, null, TableType.MANAGED_TABLE.toString()); - cachedStore.createTable(tbl); - - List<String> partVals1 = new ArrayList<String>(); - partVals1.add("1"); - List<String> partVals2 = new ArrayList<String>(); - partVals2.add("2"); - - Partition ptn1 = - new Partition(partVals1, dbName, tblName, 0, 0, sd, new HashMap<String, String>()); - cachedStore.addPartition(ptn1); - Partition ptn2 = - new Partition(partVals2, dbName, tblName, 0, 0, sd, new HashMap<String, String>()); - cachedStore.addPartition(ptn2); - - ColumnStatistics stats = new ColumnStatistics(); - ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, tblName); - statsDesc.setPartName("col"); - List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>(); - - ColumnStatisticsData data = new ColumnStatisticsData(); - ColumnStatisticsObj colStats = new ColumnStatisticsObj(colName, "int", data); - LongColumnStatsDataInspector longStats = new LongColumnStatsDataInspector(); - longStats.setLowValue(0); - longStats.setHighValue(100); - longStats.setNumNulls(50); - longStats.setNumDVs(30); - - HyperLogLog hll = HyperLogLog.builder().build(); - hll.addLong(1); - hll.addLong(2); - hll.addLong(3); - longStats.setBitVectors(hll.serialize()); - - data.setLongStats(longStats); - colStatObjs.add(colStats); - - stats.setStatsDesc(statsDesc); - stats.setStatsObj(colStatObjs); - - cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals1); - - longStats.setNumDVs(40); - hll = HyperLogLog.builder().build(); - hll.addLong(2); - hll.addLong(3); - hll.addLong(4); - hll.addLong(5); - longStats.setBitVectors(hll.serialize()); - - cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals2); - - List<String> colNames = new ArrayList<String>(); - colNames.add(colName); - List<String> aggrPartVals = new ArrayList<String>(); - aggrPartVals.add("1"); - aggrPartVals.add("2"); - AggrStats aggrStats = cachedStore.get_aggr_stats_for(dbName, tblName, aggrPartVals, colNames); - Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(), 100); - Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumDVs(), 5); - aggrStats = cachedStore.get_aggr_stats_for(dbName, tblName, aggrPartVals, colNames); - Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(), 100); - Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumDVs(), 5); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index 41fbb0c..50a55d8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -42,7 +42,6 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStore; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; @@ -50,6 +49,7 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey; import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -1753,7 +1753,7 @@ public abstract class BaseSemanticAnalyzer { } else { throw new SemanticException("Unexpected date type " + colValue.getClass()); } - return HiveMetaStore.PARTITION_DATE_FORMAT.get().format(value); + return MetaStoreUtils.PARTITION_DATE_FORMAT.get().format(value); } protected WriteEntity toWriteEntity(String location) throws SemanticException { http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java index b26401d..5c6bbbd 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java @@ -41,7 +41,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.MetadataPpdResult; import org.apache.hadoop.hive.metastore.filemeta.OrcFileMetadataHandler; -import org.apache.hadoop.hive.metastore.hbase.MetadataStore; +import org.apache.hadoop.hive.metastore.MetadataStore; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.io.orc.ExternalCache.ExternalFooterCachesByConf; import org.apache.hadoop.hive.ql.metadata.HiveException; http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/pom.xml ---------------------------------------------------------------------- diff --git a/standalone-metastore/pom.xml b/standalone-metastore/pom.xml index acc50ca..07b767b 100644 --- a/standalone-metastore/pom.xml +++ b/standalone-metastore/pom.xml @@ -33,9 +33,9 @@ <dependencies> <dependency> - <groupId>com.jolbox</groupId> - <artifactId>bonecp</artifactId> - <version>${bonecp.version}</version> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.new.version}</version> </dependency> <dependency> <groupId>com.github.joshelser</groupId> @@ -53,6 +53,11 @@ <version>${protobuf.version}</version> </dependency> <dependency> + <groupId>com.jolbox</groupId> + <artifactId>bonecp</artifactId> + <version>${bonecp.version}</version> + </dependency> + <dependency> <groupId>com.zaxxer</groupId> <artifactId>HikariCP</artifactId> <version>${hikaricp.version}</version> @@ -78,6 +83,16 @@ <version>${dropwizard.version}</version> </dependency> <dependency> + <groupId>javolution</groupId> + <artifactId>javolution</artifactId> + <version>${javolution.version}</version> + </dependency> + <dependency> + <groupId>org.antlr</groupId> + <artifactId>antlr-runtime</artifactId> + <version>${antlr.version}</version> + </dependency> + <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>${commons-lang3.version}</version> @@ -284,6 +299,20 @@ </executions> </plugin> <plugin> + <groupId>org.antlr</groupId> + <artifactId>antlr3-maven-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>antlr</goal> + </goals> + </execution> + </executions> + <configuration> + <sourceDirectory>${basedir}/src/main/java</sourceDirectory> + </configuration> + </plugin> + <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-antrun-plugin</artifactId> <executions> http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/StatsSetupConst.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/StatsSetupConst.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/StatsSetupConst.java new file mode 100644 index 0000000..e2e3ada --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/StatsSetupConst.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + + +/** + * A class that defines the constant strings used by the statistics implementation. + */ + +public class StatsSetupConst { + + protected static final Logger LOG = LoggerFactory.getLogger(StatsSetupConst.class.getName()); + + public enum StatDB { + fs { + @Override + public String getPublisher(Configuration conf) { + return "org.apache.hadoop.hive.ql.stats.fs.FSStatsPublisher"; + } + + @Override + public String getAggregator(Configuration conf) { + return "org.apache.hadoop.hive.ql.stats.fs.FSStatsAggregator"; + } + }, + custom { + @Override + public String getPublisher(Configuration conf) { + return MetastoreConf.getVar(conf, ConfVars.STATS_DEFAULT_PUBLISHER); } + @Override + public String getAggregator(Configuration conf) { + return MetastoreConf.getVar(conf, ConfVars.STATS_DEFAULT_AGGREGATOR); } + }; + public abstract String getPublisher(Configuration conf); + public abstract String getAggregator(Configuration conf); + } + + // statistics stored in metastore + /** + * The name of the statistic Num Files to be published or gathered. + */ + public static final String NUM_FILES = "numFiles"; + + /** + * The name of the statistic Num Partitions to be published or gathered. + */ + public static final String NUM_PARTITIONS = "numPartitions"; + + /** + * The name of the statistic Total Size to be published or gathered. + */ + public static final String TOTAL_SIZE = "totalSize"; + + /** + * The name of the statistic Row Count to be published or gathered. + */ + public static final String ROW_COUNT = "numRows"; + + public static final String RUN_TIME_ROW_COUNT = "runTimeNumRows"; + + /** + * The name of the statistic Raw Data Size to be published or gathered. + */ + public static final String RAW_DATA_SIZE = "rawDataSize"; + + /** + * Temp dir for writing stats from tasks. + */ + public static final String STATS_TMP_LOC = "hive.stats.tmp.loc"; + + public static final String STATS_FILE_PREFIX = "tmpstats-"; + /** + * List of all supported statistics + */ + public static final String[] supportedStats = {NUM_FILES,ROW_COUNT,TOTAL_SIZE,RAW_DATA_SIZE}; + + /** + * List of all statistics that need to be collected during query execution. These are + * statistics that inherently require a scan of the data. + */ + public static final String[] statsRequireCompute = new String[] {ROW_COUNT,RAW_DATA_SIZE}; + + /** + * List of statistics that can be collected quickly without requiring a scan of the data. + */ + public static final String[] fastStats = new String[] {NUM_FILES,TOTAL_SIZE}; + + // This string constant is used to indicate to AlterHandler that + // alterPartition/alterTable is happening via statsTask or via user. + public static final String STATS_GENERATED = "STATS_GENERATED"; + + public static final String TASK = "TASK"; + + public static final String USER = "USER"; + + // This string constant is used by AlterHandler to figure out that it should not attempt to + // update stats. It is set by any client-side task which wishes to signal that no stats + // update should take place, such as with replication. + public static final String DO_NOT_UPDATE_STATS = "DO_NOT_UPDATE_STATS"; + + //This string constant will be persisted in metastore to indicate whether corresponding + //table or partition's statistics and table or partition's column statistics are accurate or not. + public static final String COLUMN_STATS_ACCURATE = "COLUMN_STATS_ACCURATE"; + + public static final String COLUMN_STATS = "COLUMN_STATS"; + + public static final String BASIC_STATS = "BASIC_STATS"; + + public static final String CASCADE = "CASCADE"; + + public static final String TRUE = "true"; + + public static final String FALSE = "false"; + + // The parameter keys for the table statistics. Those keys are excluded from 'show create table' command output. + public static final String[] TABLE_PARAMS_STATS_KEYS = new String[] { + COLUMN_STATS_ACCURATE, NUM_FILES, TOTAL_SIZE,ROW_COUNT, RAW_DATA_SIZE, NUM_PARTITIONS}; + + private static class ColumnStatsAccurate { + private static ObjectReader objectReader; + private static ObjectWriter objectWriter; + + static { + ObjectMapper objectMapper = new ObjectMapper(); + objectReader = objectMapper.readerFor(ColumnStatsAccurate.class); + objectWriter = objectMapper.writerFor(ColumnStatsAccurate.class); + } + + static class BooleanSerializer extends JsonSerializer<Boolean> { + + @Override + public void serialize(Boolean value, JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeString(value.toString()); + } + } + + static class BooleanDeserializer extends JsonDeserializer<Boolean> { + + public Boolean deserialize(JsonParser jsonParser, + DeserializationContext deserializationContext) + throws IOException { + return Boolean.valueOf(jsonParser.getValueAsString()); + } + } + + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + @JsonSerialize(using = BooleanSerializer.class) + @JsonDeserialize(using = BooleanDeserializer.class) + @JsonProperty(BASIC_STATS) + boolean basicStats; + + @JsonInclude(JsonInclude.Include.NON_EMPTY) + @JsonProperty(COLUMN_STATS) + @JsonSerialize(contentUsing = BooleanSerializer.class) + @JsonDeserialize(contentUsing = BooleanDeserializer.class) + TreeMap<String, Boolean> columnStats = new TreeMap<>(); + + } + + public static boolean areBasicStatsUptoDate(Map<String, String> params) { + if (params == null) { + return false; + } + ColumnStatsAccurate stats = parseStatsAcc(params.get(COLUMN_STATS_ACCURATE)); + return stats.basicStats; + } + + public static boolean areColumnStatsUptoDate(Map<String, String> params, String colName) { + if (params == null) { + return false; + } + ColumnStatsAccurate stats = parseStatsAcc(params.get(COLUMN_STATS_ACCURATE)); + return stats.columnStats.containsKey(colName); + } + + // It will only throw JSONException when stats.put(BASIC_STATS, TRUE) + // has duplicate key, which is not possible + // note that set basic stats false will wipe out column stats too. + public static void setBasicStatsState(Map<String, String> params, String setting) { + if (setting.equals(FALSE)) { + if (params!=null && params.containsKey(COLUMN_STATS_ACCURATE)) { + params.remove(COLUMN_STATS_ACCURATE); + } + return; + } + if (params == null) { + throw new RuntimeException("params are null...cant set columnstatstate!"); + } + ColumnStatsAccurate stats = parseStatsAcc(params.get(COLUMN_STATS_ACCURATE)); + stats.basicStats = true; + try { + params.put(COLUMN_STATS_ACCURATE, ColumnStatsAccurate.objectWriter.writeValueAsString(stats)); + } catch (JsonProcessingException e) { + throw new RuntimeException("can't serialize column stats", e); + } + } + + public static void setColumnStatsState(Map<String, String> params, List<String> colNames) { + if (params == null) { + throw new RuntimeException("params are null...cant set columnstatstate!"); + } + if (colNames == null) { + return; + } + ColumnStatsAccurate stats = parseStatsAcc(params.get(COLUMN_STATS_ACCURATE)); + + for (String colName : colNames) { + if (!stats.columnStats.containsKey(colName)) { + stats.columnStats.put(colName, true); + } + } + try { + params.put(COLUMN_STATS_ACCURATE, ColumnStatsAccurate.objectWriter.writeValueAsString(stats)); + } catch (JsonProcessingException e) { + LOG.trace(e.getMessage()); + } + } + + public static void clearColumnStatsState(Map<String, String> params) { + if (params == null) { + return; + } + ColumnStatsAccurate stats = parseStatsAcc(params.get(COLUMN_STATS_ACCURATE)); + stats.columnStats.clear(); + + try { + params.put(COLUMN_STATS_ACCURATE, ColumnStatsAccurate.objectWriter.writeValueAsString(stats)); + } catch (JsonProcessingException e) { + LOG.trace(e.getMessage()); + } + } + + public static void removeColumnStatsState(Map<String, String> params, List<String> colNames) { + if (params == null) { + return; + } + try { + ColumnStatsAccurate stats = parseStatsAcc(params.get(COLUMN_STATS_ACCURATE)); + for (String string : colNames) { + stats.columnStats.remove(string); + } + params.put(COLUMN_STATS_ACCURATE, ColumnStatsAccurate.objectWriter.writeValueAsString(stats)); + } catch (JsonProcessingException e) { + LOG.trace(e.getMessage()); + } + } + + public static void setStatsStateForCreateTable(Map<String, String> params, + List<String> cols, String setting) { + if (TRUE.equals(setting)) { + for (String stat : StatsSetupConst.supportedStats) { + params.put(stat, "0"); + } + } + setBasicStatsState(params, setting); + setColumnStatsState(params, cols); + } + + private static ColumnStatsAccurate parseStatsAcc(String statsAcc) { + if (statsAcc == null) { + return new ColumnStatsAccurate(); + } + try { + return ColumnStatsAccurate.objectReader.readValue(statsAcc); + } catch (Exception e) { + ColumnStatsAccurate ret = new ColumnStatsAccurate(); + if (TRUE.equalsIgnoreCase(statsAcc)) { + ret.basicStats = true; + } + return ret; + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimator.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimator.java new file mode 100644 index 0000000..668db10 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common.ndv; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.util.JavaDataModel; + +public interface NumDistinctValueEstimator { + + Logger LOG = LoggerFactory.getLogger(NumDistinctValueEstimator.class.getName()); + + void reset(); + + byte[] serialize(); + + NumDistinctValueEstimator deserialize(byte[] buf); + + void addToEstimator(long v); + + void addToEstimator(double d); + + void addToEstimator(String s); + + void addToEstimator(HiveDecimal decimal); + + void mergeEstimators(NumDistinctValueEstimator o); + + long estimateNumDistinctValues(); + + int lengthFor(JavaDataModel model); + + boolean canMerge(NumDistinctValueEstimator o); + +} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimatorFactory.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimatorFactory.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimatorFactory.java new file mode 100644 index 0000000..4e4dfb7 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/NumDistinctValueEstimatorFactory.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.common.ndv; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.hive.common.ndv.fm.FMSketch; +import org.apache.hadoop.hive.common.ndv.fm.FMSketchUtils; +import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog; + +public class NumDistinctValueEstimatorFactory { + + private NumDistinctValueEstimatorFactory() { + } + + private static boolean isFMSketch(byte[] buf) throws IOException { + byte[] magic = new byte[2]; + magic[0] = (byte) buf[0]; + magic[1] = (byte) buf[1]; + return Arrays.equals(magic, FMSketchUtils.MAGIC); + } + + public static NumDistinctValueEstimator getNumDistinctValueEstimator(byte[] buf) { + // Right now we assume only FM and HLL are available. + try { + if (isFMSketch(buf)) { + return FMSketchUtils.deserializeFM(buf); + } else { + return HyperLogLog.builder().build().deserialize(buf); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static NumDistinctValueEstimator getEmptyNumDistinctValueEstimator( + NumDistinctValueEstimator n) { + if (n instanceof FMSketch) { + return new FMSketch(((FMSketch) n).getNumBitVectors()); + } else { + return HyperLogLog.builder().build(); + } + } + + public static NumDistinctValueEstimator getEmptyNumDistinctValueEstimator(String func, + int numBitVectors) { + if ("fm".equals(func.toLowerCase())) { + return new FMSketch(numBitVectors); + } else if ("hll".equals(func.toLowerCase())) { + return HyperLogLog.builder().build(); + } else { + throw new RuntimeException("Can not recognize " + func); + } + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/133d3c47/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/fm/FMSketch.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/fm/FMSketch.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/fm/FMSketch.java new file mode 100644 index 0000000..f6cdc4c --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/common/ndv/fm/FMSketch.java @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common.ndv.fm; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Random; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javolution.util.FastBitSet; + +public class FMSketch implements NumDistinctValueEstimator { + + static final Logger LOG = LoggerFactory.getLogger(FMSketch.class.getName()); + + /* We want a,b,x to come from a finite field of size 0 to k, where k is a prime number. + * 2^p - 1 is prime for p = 31. Hence bitvectorSize has to be 31. Pick k to be 2^p -1. + * If a,b,x didn't come from a finite field ax1 + b mod k and ax2 + b mod k will not be pair wise + * independent. As a consequence, the hash values will not distribute uniformly from 0 to 2^p-1 + * thus introducing errors in the estimates. + */ + public static final int BIT_VECTOR_SIZE = 31; + + // Refer to Flajolet-Martin'86 for the value of phi + private static final double PHI = 0.77351; + + private final int[] a; + private final int[] b; + private final FastBitSet[] bitVector; + + private final Random aValue; + private final Random bValue; + + private int numBitVectors; + + /* Create a new distinctValueEstimator + */ + public FMSketch(int numBitVectors) { + this.numBitVectors = numBitVectors; + bitVector = new FastBitSet[numBitVectors]; + for (int i=0; i< numBitVectors; i++) { + bitVector[i] = new FastBitSet(BIT_VECTOR_SIZE); + } + + a = new int[numBitVectors]; + b = new int[numBitVectors]; + + /* Use a large prime number as a seed to the random number generator. + * Java's random number generator uses the Linear Congruential Generator to generate random + * numbers using the following recurrence relation, + * + * X(n+1) = (a X(n) + c ) mod m + * + * where X0 is the seed. Java implementation uses m = 2^48. This is problematic because 2^48 + * is not a prime number and hence the set of numbers from 0 to m don't form a finite field. + * If these numbers don't come from a finite field any give X(n) and X(n+1) may not be pair + * wise independent. + * + * However, empirically passing in prime numbers as seeds seems to work better than when passing + * composite numbers as seeds. Ideally Java's Random should pick m such that m is prime. + * + */ + aValue = new Random(99397); + bValue = new Random(9876413); + + for (int i = 0; i < numBitVectors; i++) { + int randVal; + /* a and b shouldn't be even; If a and b are even, then none of the values + * will set bit 0 thus introducing errors in the estimate. Both a and b can be even + * 25% of the times and as a result 25% of the bit vectors could be inaccurate. To avoid this + * always pick odd values for a and b. + */ + do { + randVal = aValue.nextInt(); + } while (randVal % 2 == 0); + + a[i] = randVal; + + do { + randVal = bValue.nextInt(); + } while (randVal % 2 == 0); + + b[i] = randVal; + + if (a[i] < 0) { + a[i] = a[i] + (1 << BIT_VECTOR_SIZE - 1); + } + + if (b[i] < 0) { + b[i] = b[i] + (1 << BIT_VECTOR_SIZE - 1); + } + } + } + + /** + * Resets a distinctValueEstimator object to its original state. + */ + public void reset() { + for (int i=0; i< numBitVectors; i++) { + bitVector[i].clear(); + } + } + + public FastBitSet getBitVector(int index) { + return bitVector[index]; + } + + public FastBitSet setBitVector(FastBitSet fastBitSet, int index) { + return bitVector[index] = fastBitSet; + } + + public int getNumBitVectors() { + return numBitVectors; + } + + public int getBitVectorSize() { + return BIT_VECTOR_SIZE; + } + + public void printNumDistinctValueEstimator() { + String t = new String(); + + LOG.debug("NumDistinctValueEstimator"); + LOG.debug("Number of Vectors: {}", numBitVectors); + LOG.debug("Vector Size: {}", BIT_VECTOR_SIZE); + + for (int i=0; i < numBitVectors; i++) { + t = t + bitVector[i].toString(); + } + + LOG.debug("Serialized Vectors: "); + LOG.debug(t); + } + + @Override + public byte[] serialize() { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + // write bytes to bos ... + try { + FMSketchUtils.serializeFM(bos, this); + final byte[] result = bos.toByteArray(); + bos.close(); + return result; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public NumDistinctValueEstimator deserialize(byte[] buf) { + InputStream is = new ByteArrayInputStream(buf); + try { + NumDistinctValueEstimator n = FMSketchUtils.deserializeFM(is); + is.close(); + return n; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private int generateHash(long v, int hashNum) { + int mod = (1<<BIT_VECTOR_SIZE) - 1; + long tempHash = a[hashNum] * v + b[hashNum]; + tempHash %= mod; + int hash = (int) tempHash; + + /* Hash function should map the long value to 0...2^L-1. + * Hence hash value has to be non-negative. + */ + if (hash < 0) { + hash = hash + mod; + } + return hash; + } + + private int generateHashForPCSA(long v) { + int mod = 1 << (BIT_VECTOR_SIZE - 1) - 1; + long tempHash = a[0] * v + b[0]; + tempHash %= mod; + int hash = (int) tempHash; + + /* Hash function should map the long value to 0...2^L-1. + * Hence hash value has to be non-negative. + */ + if (hash < 0) { + hash = hash + mod + 1; + } + return hash; + } + + public void addToEstimator(long v) { + /* Update summary bitVector : + * Generate hash value of the long value and mod it by 2^bitVectorSize-1. + * In this implementation bitVectorSize is 31. + */ + + for (int i = 0; i<numBitVectors; i++) { + int hash = generateHash(v,i); + int index; + + // Find the index of the least significant bit that is 1 + for (index=0; index<BIT_VECTOR_SIZE; index++) { + if (hash % 2 != 0) { + break; + } + hash = hash >> 1; + } + + // Set bitvector[index] := 1 + bitVector[i].set(index); + } + } + + public void addToEstimatorPCSA(long v) { + int hash = generateHashForPCSA(v); + int rho = hash/numBitVectors; + int index; + + // Find the index of the least significant bit that is 1 + for (index=0; index<BIT_VECTOR_SIZE; index++) { + if (rho % 2 != 0) { + break; + } + rho = rho >> 1; + } + + // Set bitvector[index] := 1 + bitVector[hash%numBitVectors].set(index); + } + + public void addToEstimator(double d) { + int v = new Double(d).hashCode(); + addToEstimator(v); + } + + public void addToEstimatorPCSA(double d) { + int v = new Double(d).hashCode(); + addToEstimatorPCSA(v); + } + + public void addToEstimator(HiveDecimal decimal) { + int v = decimal.hashCode(); + addToEstimator(v); + } + + public void addToEstimatorPCSA(HiveDecimal decimal) { + int v = decimal.hashCode(); + addToEstimatorPCSA(v); + } + + public void mergeEstimators(FMSketch o) { + // Bitwise OR the bitvector with the bitvector in the agg buffer + for (int i=0; i<numBitVectors; i++) { + bitVector[i].or(o.getBitVector(i)); + } + } + + public long estimateNumDistinctValuesPCSA() { + double numDistinctValues = 0.0; + long S = 0; + + for (int i=0; i < numBitVectors; i++) { + int index = 0; + while (bitVector[i].get(index) && index < BIT_VECTOR_SIZE) { + index = index + 1; + } + S = S + index; + } + + numDistinctValues = ((numBitVectors/PHI) * Math.pow(2.0, S/numBitVectors)); + return ((long)numDistinctValues); + } + + /* We use the Flajolet-Martin estimator to estimate the number of distinct values.FM uses the + * location of the least significant zero as an estimate of log2(phi*ndvs). + */ + public long estimateNumDistinctValues() { + int sumLeastSigZero = 0; + double avgLeastSigZero; + double numDistinctValues; + + for (int i=0; i< numBitVectors; i++) { + int leastSigZero = bitVector[i].nextClearBit(0); + sumLeastSigZero += leastSigZero; + } + + avgLeastSigZero = + sumLeastSigZero/(numBitVectors * 1.0) - (Math.log(PHI)/Math.log(2.0)); + numDistinctValues = Math.pow(2.0, avgLeastSigZero); + return ((long)(numDistinctValues)); + } + + @InterfaceAudience.LimitedPrivate(value = {"Hive" }) + static int lengthFor(JavaDataModel model, Integer numVector) { + int length = model.object(); + length += model.primitive1() * 2; // two int + length += model.primitive2(); // one double + length += model.lengthForRandom() * 2; // two Random + + if (numVector == null) { + numVector = 16; // HiveConf hive.stats.ndv.error default produces 16 vectors + } + + if (numVector > 0) { + length += model.array() * 3; // three array + length += model.primitive1() * numVector * 2; // two int array + length += (model.object() + model.array() + model.primitive1() + + model.primitive2()) * numVector; // bitset array + } + return length; + } + + public int lengthFor(JavaDataModel model) { + return lengthFor(model, getNumBitVectors()); + } + + // the caller needs to gurrantee that they are the same type based on numBitVectors + @Override + public void mergeEstimators(NumDistinctValueEstimator o) { + // Bitwise OR the bitvector with the bitvector in the agg buffer + for (int i = 0; i < numBitVectors; i++) { + bitVector[i].or(((FMSketch) o).getBitVector(i)); + } + } + + @Override + public void addToEstimator(String s) { + int v = s.hashCode(); + addToEstimator(v); + } + + @Override + public boolean canMerge(NumDistinctValueEstimator o) { + return o instanceof FMSketch && this.numBitVectors == ((FMSketch) o).numBitVectors; + } +}
