HIVE-18661 : CachedStore: Use metastore notification log events to update cache. (Mahesh Kumar Behera, reviewed by Daniel Dai)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ef7c3963 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ef7c3963 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ef7c3963 Branch: refs/heads/master Commit: ef7c3963be035635ef75ee202b9826b9beb407d7 Parents: 87f8ecc Author: Mahesh Kumar Behera <[email protected]> Authored: Tue Dec 18 13:26:56 2018 +0530 Committer: Mahesh Kumar Behera <[email protected]> Committed: Tue Dec 18 13:26:56 2018 +0530 ---------------------------------------------------------------------- .../listener/DbNotificationListener.java | 72 +++ .../listener/TestDbNotificationListener.java | 83 +-- .../TestCachedStoreUpdateUsingEvents.java | 535 +++++++++++++++++++ .../TestReplicationScenariosAcidTables.java | 2 +- .../hadoop/hive/ql/parse/repl/DumpType.java | 28 + .../dump/events/DeletePartColStatHandler.java | 48 ++ .../dump/events/DeleteTableColStatHandler.java | 48 ++ .../repl/dump/events/EventHandlerFactory.java | 4 + .../dump/events/UpdatePartColStatHandler.java | 48 ++ .../dump/events/UpdateTableColStatHandler.java | 48 ++ .../load/message/DeletePartColStatHandler.java | 43 ++ .../load/message/DeleteTableColStatHandler.java | 43 ++ .../load/message/UpdatePartColStatHandler.java | 43 ++ .../load/message/UpdateTableColStatHandler.java | 43 ++ .../metastore/api/NotificationEventRequest.java | 163 +++++- .../src/gen/thrift/gen-php/metastore/Types.php | 49 ++ .../gen/thrift/gen-py/hive_metastore/ttypes.py | 23 +- .../gen/thrift/gen-rb/hive_metastore_types.rb | 4 +- .../hive/metastore/conf/MetastoreConf.java | 2 + .../src/main/thrift/hive_metastore.thrift | 1 + .../hadoop/hive/metastore/HiveAlterHandler.java | 92 ++-- .../hadoop/hive/metastore/HiveMetaStore.java | 99 +++- .../hive/metastore/MetaStoreEventListener.java | 47 ++ .../metastore/MetaStoreListenerNotifier.java | 12 + .../hadoop/hive/metastore/ObjectStore.java | 20 +- .../hive/metastore/cache/CachedStore.java | 493 ++++++++++++++--- .../hive/metastore/cache/SharedCache.java | 11 +- .../events/DeletePartitionColumnStatEvent.java | 81 +++ .../events/DeleteTableColumnStatEvent.java | 64 +++ .../events/UpdatePartitionColumnStatEvent.java | 93 ++++ .../events/UpdateTableColumnStatEvent.java | 84 +++ .../DeletePartitionColumnStatMessage.java | 36 ++ .../messaging/DeleteTableColumnStatMessage.java | 30 ++ .../hive/metastore/messaging/EventMessage.java | 6 +- .../metastore/messaging/MessageBuilder.java | 38 ++ .../messaging/MessageDeserializer.java | 28 + .../UpdatePartitionColumnStatMessage.java | 42 ++ .../messaging/UpdateTableColumnStatMessage.java | 39 ++ .../JSONDeletePartitionColumnStatMessage.java | 102 ++++ .../json/JSONDeleteTableColumnStatMessage.java | 85 +++ .../messaging/json/JSONMessageDeserializer.java | 40 ++ .../JSONUpdatePartitionColumnStatMessage.java | 133 +++++ .../json/JSONUpdateTableColumnStatMessage.java | 121 +++++ .../messaging/json/gzip/DeSerializer.java | 24 + .../hive/metastore/TestHiveAlterHandler.java | 6 +- 45 files changed, 2998 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index fe101d3..fa7ab25 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.hive.metastore.events.AddForeignKeyEvent; @@ -79,6 +80,10 @@ import org.apache.hadoop.hive.metastore.events.AbortTxnEvent; import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent; import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.metastore.events.AcidWriteEvent; +import org.apache.hadoop.hive.metastore.events.UpdateTableColumnStatEvent; +import org.apache.hadoop.hive.metastore.events.DeleteTableColumnStatEvent; +import org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEvent; +import org.apache.hadoop.hive.metastore.events.DeletePartitionColumnStatEvent; import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage; import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage; import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage; @@ -107,6 +112,10 @@ import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.MessageSerializer; import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage; import org.apache.hadoop.hive.metastore.messaging.PartitionFiles; +import org.apache.hadoop.hive.metastore.messaging.UpdateTableColumnStatMessage; +import org.apache.hadoop.hive.metastore.messaging.DeleteTableColumnStatMessage; +import org.apache.hadoop.hive.metastore.messaging.UpdatePartitionColumnStatMessage; +import org.apache.hadoop.hive.metastore.messaging.DeletePartitionColumnStatMessage; import org.apache.hadoop.hive.metastore.tools.SQLGenerator; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; @@ -744,6 +753,69 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener } } + @Override + public void onUpdateTableColumnStat(UpdateTableColumnStatEvent updateTableColumnStatEvent) throws MetaException { + UpdateTableColumnStatMessage msg = MessageBuilder.getInstance() + .buildUpdateTableColumnStatMessage(updateTableColumnStatEvent.getColStats(), + updateTableColumnStatEvent.getTableParameters(), + updateTableColumnStatEvent.getValidWriteIds(), updateTableColumnStatEvent.getWriteId()); + NotificationEvent event = new NotificationEvent(0, now(), EventType.UPDATE_TABLE_COLUMN_STAT.toString(), + msgEncoder.getSerializer().serialize(msg)); + ColumnStatisticsDesc statDesc = updateTableColumnStatEvent.getColStats().getStatsDesc(); + event.setCatName(statDesc.isSetCatName() ? statDesc.getCatName() : DEFAULT_CATALOG_NAME); + event.setDbName(statDesc.getDbName()); + event.setTableName(statDesc.getTableName()); + process(event, updateTableColumnStatEvent); + } + + @Override + public void onDeleteTableColumnStat(DeleteTableColumnStatEvent deleteTableColumnStatEvent) throws MetaException { + DeleteTableColumnStatMessage msg = MessageBuilder.getInstance() + .buildDeleteTableColumnStatMessage(deleteTableColumnStatEvent.getDBName(), + deleteTableColumnStatEvent.getColName()); + NotificationEvent event = new NotificationEvent(0, now(), EventType.DELETE_TABLE_COLUMN_STAT.toString(), + msgEncoder.getSerializer().serialize(msg)); + event.setCatName(deleteTableColumnStatEvent.getCatName()); + event.setDbName(deleteTableColumnStatEvent.getDBName()); + event.setTableName(deleteTableColumnStatEvent.getTableName()); + process(event, deleteTableColumnStatEvent); + } + + @Override + public void onUpdatePartitionColumnStat(UpdatePartitionColumnStatEvent updatePartColStatEvent) throws MetaException { + UpdatePartitionColumnStatMessage msg = MessageBuilder.getInstance() + .buildUpdatePartitionColumnStatMessage(updatePartColStatEvent.getPartColStats(), + updatePartColStatEvent.getPartVals(), + updatePartColStatEvent.getPartParameters(), + updatePartColStatEvent.getValidWriteIds(), updatePartColStatEvent.getWriteId()); + NotificationEvent event = new NotificationEvent(0, now(), EventType.UPDATE_PARTITION_COLUMN_STAT.toString(), + msgEncoder.getSerializer().serialize(msg)); + ColumnStatisticsDesc statDesc = updatePartColStatEvent.getPartColStats().getStatsDesc(); + event.setCatName(statDesc.isSetCatName() ? statDesc.getCatName() : DEFAULT_CATALOG_NAME); + event.setDbName(statDesc.getDbName()); + event.setTableName(statDesc.getTableName()); + process(event, updatePartColStatEvent); + } + + @Override + public void onDeletePartitionColumnStat(DeletePartitionColumnStatEvent deletePartColStatEvent) throws MetaException { + DeletePartitionColumnStatMessage msg = MessageBuilder.getInstance() + .buildDeletePartitionColumnStatMessage(deletePartColStatEvent.getDBName(), + deletePartColStatEvent.getColName(), deletePartColStatEvent.getPartName(), + deletePartColStatEvent.getPartVals()); + NotificationEvent event = new NotificationEvent(0, now(), EventType.DELETE_PARTITION_COLUMN_STAT.toString(), + msgEncoder.getSerializer().serialize(msg)); + event.setCatName(deletePartColStatEvent.getCatName()); + event.setDbName(deletePartColStatEvent.getDBName()); + event.setTableName(deletePartColStatEvent.getTableName()); + process(event, deletePartColStatEvent); + } + + @Override + public boolean doesAddEventsToNotificationLogTable() { + return true; + } + private int now() { long millis = System.currentTimeMillis(); millis /= 1000; http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java ---------------------------------------------------------------------- diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index d4b7b02..be4f9ae 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -1250,7 +1250,7 @@ public class TestDbNotificationListener { // Get notifications from metastore NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); - assertEquals(6, rsp.getEventsSize()); + assertEquals(7, rsp.getEventsSize()); NotificationEvent event = rsp.getEvents().get(0); assertEquals(firstEventId + 1, event.getEventId()); assertEquals(EventType.CREATE_TABLE.toString(), event.getEventType()); @@ -1261,14 +1261,14 @@ public class TestDbNotificationListener { // Parse the message field verifyInsert(event, defaultDbName, tblName); - event = rsp.getEvents().get(4); - assertEquals(firstEventId + 5, event.getEventId()); - assertEquals(EventType.ALTER_TABLE.toString(), event.getEventType()); - event = rsp.getEvents().get(5); assertEquals(firstEventId + 6, event.getEventId()); + assertEquals(EventType.ALTER_TABLE.toString(), event.getEventType()); + + event = rsp.getEvents().get(6); + assertEquals(firstEventId + 7, event.getEventId()); assertEquals(EventType.DROP_TABLE.toString(), event.getEventType()); - testEventCounts(defaultDbName, firstEventId, null, null, 6); + testEventCounts(defaultDbName, firstEventId, null, null, 7); } @Test @@ -1285,7 +1285,7 @@ public class TestDbNotificationListener { // Get notifications from metastore NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); - assertEquals(6, rsp.getEventsSize()); + assertEquals(7, rsp.getEventsSize()); NotificationEvent event = rsp.getEvents().get(0); assertEquals(firstEventId + 1, event.getEventId()); assertEquals(EventType.CREATE_TABLE.toString(), event.getEventType()); @@ -1296,10 +1296,10 @@ public class TestDbNotificationListener { // Parse the message field verifyInsert(event, null, sourceTblName); - event = rsp.getEvents().get(4); - assertEquals(firstEventId + 5, event.getEventId()); + event = rsp.getEvents().get(5); + assertEquals(firstEventId + 6, event.getEventId()); assertEquals(EventType.CREATE_TABLE.toString(), event.getEventType()); - testEventCounts(defaultDbName, firstEventId, null, null, 6); + testEventCounts(defaultDbName, firstEventId, null, null, 7); } @Test @@ -1349,9 +1349,9 @@ public class TestDbNotificationListener { // Event 9, 10 driver.run("alter table " + tblName + " add partition (ds = 'yesterday')"); - testEventCounts(defaultDbName, firstEventId, null, null, 10); + testEventCounts(defaultDbName, firstEventId, null, null, 13); // Test a limit higher than available events - testEventCounts(defaultDbName, firstEventId, null, 100, 10); + testEventCounts(defaultDbName, firstEventId, null, 100, 13); // Test toEventId lower than current eventId testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 5, null, 5); @@ -1371,84 +1371,89 @@ public class TestDbNotificationListener { // Get notifications from metastore NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); - assertEquals(24, rsp.getEventsSize()); + assertEquals(31, rsp.getEventsSize()); + NotificationEvent event = rsp.getEvents().get(1); assertEquals(firstEventId + 2, event.getEventId()); assertEquals(EventType.ADD_PARTITION.toString(), event.getEventType()); event = rsp.getEvents().get(3); assertEquals(firstEventId + 4, event.getEventId()); + assertEquals(EventType.UPDATE_PARTITION_COLUMN_STAT.toString(), event.getEventType()); + + event = rsp.getEvents().get(4); + assertEquals(firstEventId + 5, event.getEventId()); assertEquals(EventType.INSERT.toString(), event.getEventType()); // Parse the message field verifyInsert(event, null, tblName); - event = rsp.getEvents().get(6); - assertEquals(firstEventId + 7, event.getEventId()); + event = rsp.getEvents().get(8); + assertEquals(firstEventId + 9, event.getEventId()); assertEquals(EventType.INSERT.toString(), event.getEventType()); // Parse the message field verifyInsert(event, null, tblName); - event = rsp.getEvents().get(9); - assertEquals(firstEventId + 10, event.getEventId()); + event = rsp.getEvents().get(12); + assertEquals(firstEventId + 13, event.getEventId()); assertEquals(EventType.ADD_PARTITION.toString(), event.getEventType()); - event = rsp.getEvents().get(10); - assertEquals(firstEventId + 11, event.getEventId()); + event = rsp.getEvents().get(13); + assertEquals(firstEventId + 14, event.getEventId()); assertEquals(EventType.INSERT.toString(), event.getEventType()); // Parse the message field verifyInsert(event, null, tblName); - event = rsp.getEvents().get(13); - assertEquals(firstEventId + 14, event.getEventId()); + event = rsp.getEvents().get(17); + assertEquals(firstEventId + 18, event.getEventId()); assertEquals(EventType.INSERT.toString(), event.getEventType()); // Parse the message field verifyInsert(event, null, tblName); - event = rsp.getEvents().get(16); - assertEquals(firstEventId + 17, event.getEventId()); + event = rsp.getEvents().get(21); + assertEquals(firstEventId + 22, event.getEventId()); assertEquals(EventType.ADD_PARTITION.toString(), event.getEventType()); - event = rsp.getEvents().get(18); - assertEquals(firstEventId + 19, event.getEventId()); + event = rsp.getEvents().get(24); + assertEquals(firstEventId + 25, event.getEventId()); assertEquals(EventType.DROP_PARTITION.toString(), event.getEventType()); - event = rsp.getEvents().get(19); - assertEquals(firstEventId + 20, event.getEventId()); + event = rsp.getEvents().get(25); + assertEquals(firstEventId + 26, event.getEventId()); assertEquals(EventType.ADD_PARTITION.toString(), event.getEventType()); - event = rsp.getEvents().get(20); - assertEquals(firstEventId + 21, event.getEventId()); + event = rsp.getEvents().get(26); + assertEquals(firstEventId + 27, event.getEventId()); assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType()); assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*")); // Test fromEventId different from the very first - testEventCounts(defaultDbName, event.getEventId(), null, null, 3); + testEventCounts(defaultDbName, event.getEventId(), null, null, 4); - event = rsp.getEvents().get(21); - assertEquals(firstEventId + 22, event.getEventId()); + event = rsp.getEvents().get(28); + assertEquals(firstEventId + 29, event.getEventId()); assertEquals(EventType.INSERT.toString(), event.getEventType()); // replace-overwrite introduces no new files assertTrue(event.getMessage().matches(".*\"files\":\\[\\].*")); - event = rsp.getEvents().get(22); - assertEquals(firstEventId + 23, event.getEventId()); + event = rsp.getEvents().get(29); + assertEquals(firstEventId + 30, event.getEventId()); assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType()); assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*")); - event = rsp.getEvents().get(23); - assertEquals(firstEventId + 24, event.getEventId()); + event = rsp.getEvents().get(30); + assertEquals(firstEventId + 31, event.getEventId()); assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType()); assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*")); - testEventCounts(defaultDbName, firstEventId, null, null, 24); + testEventCounts(defaultDbName, firstEventId, null, null, 31); // Test a limit within the available events testEventCounts(defaultDbName, firstEventId, null, 10, 10); // Test toEventId greater than current eventId - testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 100, null, 24); + testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 100, null, 31); // Test toEventId greater than current eventId with some limit within available events testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 100, 10, 10); // Test toEventId greater than current eventId with some limit beyond available events - testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 100, 50, 24); + testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 100, 50, 31); } private void verifyInsert(NotificationEvent event, String dbName, String tblName) throws Exception { http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestCachedStoreUpdateUsingEvents.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestCachedStoreUpdateUsingEvents.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestCachedStoreUpdateUsingEvents.java new file mode 100644 index 0000000..83f12a5 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestCachedStoreUpdateUsingEvents.java @@ -0,0 +1,535 @@ +package org.apache.hadoop.hive.metastore.cache; + +import java.util.*; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.*; +import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder; +import org.apache.hadoop.hive.metastore.client.builder.TableBuilder; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.utils.FileUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hive.hcatalog.listener.DbNotificationListener; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import jline.internal.Log; + +import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; + +public class TestCachedStoreUpdateUsingEvents { + + private RawStore rawStore; + private SharedCache sharedCache; + private Configuration conf; + private HiveMetaStore.HMSHandler hmsHandler; + + @Before + public void setUp() throws Exception { + conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + // Disable memory estimation for this test class + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); + MetastoreConf.setVar(conf, ConfVars.TRANSACTIONAL_EVENT_LISTENERS, DbNotificationListener.class.getName()); + MetastoreConf.setVar(conf, ConfVars.RAW_STORE_IMPL, "org.apache.hadoop.hive.metastore.cache.CachedStore"); + MetastoreConf.setBoolVar(conf, ConfVars.METASTORE_CACHE_CAN_USE_EVENT, true); + MetaStoreTestUtils.setConfForStandloneMode(conf); + + hmsHandler = new HiveMetaStore.HMSHandler("testCachedStore", conf, true); + + rawStore = hmsHandler.getMS(); + sharedCache = CachedStore.getSharedCache(); + + // Stop the CachedStore cache update service. We'll start it explicitly to control the test + CachedStore.stopCacheUpdateService(1); + + // Create the 'hive' catalog with new warehouse directory + HiveMetaStore.HMSHandler.createDefaultCatalog(rawStore, new Warehouse(conf)); + } + + private Database createTestDb(String dbName, String dbOwner) { + String dbDescription = dbName; + String dbLocation = "file:/tmp"; + Map<String, String> dbParams = new HashMap<>(); + Database db = new Database(dbName, dbDescription, dbLocation, dbParams); + db.setOwnerName(dbOwner); + db.setOwnerType(PrincipalType.USER); + db.setCatalogName(DEFAULT_CATALOG_NAME); + return db; + } + + private Table createTestTblParam(String dbName, String tblName, String tblOwner, + List<FieldSchema> cols, List<FieldSchema> ptnCols, Map<String, String> tblParams) { + String serdeLocation = "file:/tmp"; + Map<String, String> serdeParams = new HashMap<>(); + SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", new HashMap<>()); + StorageDescriptor sd = new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, + serdeInfo, null, null, serdeParams); + sd.setStoredAsSubDirectories(false); + Table tbl = new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols, tblParams, null, null, + TableType.MANAGED_TABLE.toString()); + tbl.setCatName(DEFAULT_CATALOG_NAME); + return tbl; + } + + private Table createTestTbl(String dbName, String tblName, String tblOwner, + List<FieldSchema> cols, List<FieldSchema> ptnCols) { + return createTestTblParam(dbName, tblName, tblOwner, cols, ptnCols, new HashMap<>()); + } + + private void compareTables(Table tbl1, Table tbl2) { + Assert.assertEquals(tbl1.getDbName(), tbl2.getDbName()); + Assert.assertEquals(tbl1.getSd(), tbl2.getSd()); + Assert.assertEquals(tbl1.getParameters(), tbl2.getParameters()); + Assert.assertEquals(tbl1.getTableName(), tbl2.getTableName()); + Assert.assertEquals(tbl1.getCatName(), tbl2.getCatName()); + Assert.assertEquals(tbl1.getCreateTime(), tbl2.getCreateTime()); + Assert.assertEquals(tbl1.getCreationMetadata(), tbl2.getCreationMetadata()); + Assert.assertEquals(tbl1.getId(), tbl2.getId()); + } + + private void comparePartitions(Partition part1, Partition part2) { + Assert.assertEquals(part1.getParameters(), part2.getParameters()); + Assert.assertEquals(part1.getCatName(), part2.getCatName()); + Assert.assertEquals(part1.getCreateTime(), part2.getCreateTime()); + Assert.assertEquals(part1.getTableName(), part2.getTableName()); + Assert.assertEquals(part1.getDbName().toLowerCase(), part2.getDbName().toLowerCase()); + Assert.assertEquals(part1.getLastAccessTime(), part2.getLastAccessTime()); + } + + @Test + public void testDatabaseOpsForUpdateUsingEvents() throws Exception { + RawStore rawStore = hmsHandler.getMS(); + + // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); + CachedStore.prewarm(rawStore); + + // Add a db via rawStore + String dbName = "testDatabaseOps"; + String dbOwner = "user1"; + Database db = createTestDb(dbName, dbOwner); + + hmsHandler.create_database(db); + db = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); + + // Read database via CachedStore + Database dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME, dbName); + Assert.assertEquals(db, dbRead); + + // Add another db via rawStore + final String dbName1 = "testDatabaseOps1"; + Database db1 = createTestDb(dbName1, dbOwner); + hmsHandler.create_database(db1); + db1 = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1); + + // Read database via CachedStore + dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME, dbName1); + Assert.assertEquals(db1, dbRead); + + // Alter the db via rawStore (can only alter owner or parameters) + dbOwner = "user2"; + Database newdb = new Database(db); + newdb.setOwnerName(dbOwner); + hmsHandler.alter_database(dbName, newdb); + newdb = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); + + // Read db via cachedStore + dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME, dbName); + Assert.assertEquals(newdb, dbRead); + + // Add another db via rawStore + final String dbName2 = "testDatabaseOps2"; + Database db2 = createTestDb(dbName2, dbOwner); + hmsHandler.create_database(db2); + db2 = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName2); + + // Alter db "testDatabaseOps" via rawStore + dbOwner = "user1"; + newdb = new Database(db); + newdb.setOwnerName(dbOwner); + hmsHandler.alter_database(dbName, newdb); + newdb = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); + + // Drop db "testDatabaseOps1" via rawStore + Database dropDb = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1); + hmsHandler.drop_database(dbName1, true, true); + + // Read the newly added db via CachedStore + dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME, dbName2); + Assert.assertEquals(db2, dbRead); + + // Read the altered db via CachedStore (altered user from "user2" to "user1") + dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME, dbName); + Assert.assertEquals(newdb, dbRead); + + // Try to read the dropped db after cache update + dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME, dbName1); + Assert.assertEquals(null, dbRead); + + // Clean up + hmsHandler.drop_database(dbName, true, true); + hmsHandler.drop_database(dbName2, true, true); + sharedCache.getDatabaseCache().clear(); + sharedCache.getTableCache().clear(); + sharedCache.getSdCache().clear(); + } + + @Test + public void testTableOpsForUpdateUsingEvents() throws Exception { + long lastEventId = -1; + RawStore rawStore = hmsHandler.getMS(); + + // Add a db via rawStore + String dbName = "test_table_ops"; + String dbOwner = "user1"; + Database db = createTestDb(dbName, dbOwner); + hmsHandler.create_database(db); + db = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); + + // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); + CachedStore.prewarm(rawStore); + + // Add a table via rawStore + String tblName = "tbl"; + String tblOwner = "user1"; + FieldSchema col1 = new FieldSchema("col1", "int", "integer column"); + FieldSchema col2 = new FieldSchema("col2", "string", "string column"); + List<FieldSchema> cols = new ArrayList<FieldSchema>(); + cols.add(col1); + cols.add(col2); + List<FieldSchema> ptnCols = new ArrayList<FieldSchema>(); + Table tbl = createTestTbl(dbName, tblName, tblOwner, cols, ptnCols); + hmsHandler.create_table(tbl); + tbl = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName); + + // Read database, table via CachedStore + Database dbRead= sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME, dbName); + Assert.assertEquals(db, dbRead); + Table tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName); + compareTables(tblRead, tbl); + + // Add a new table via rawStore + String tblName2 = "tbl2"; + Table tbl2 = createTestTbl(dbName, tblName2, tblOwner, cols, ptnCols); + hmsHandler.create_table(tbl2); + tbl2 = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName2); + + // Alter table "tbl" via rawStore + tblOwner = "role1"; + Table newTable = new Table(tbl); + newTable.setOwner(tblOwner); + newTable.setOwnerType(PrincipalType.ROLE); + hmsHandler.alter_table(dbName, tblName, newTable); + newTable = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName); + + Assert.assertEquals("Owner of the table did not change.", tblOwner, newTable.getOwner()); + Assert.assertEquals("Owner type of the table did not change", PrincipalType.ROLE, newTable.getOwnerType()); + + // Drop table "tbl2" via rawStore + hmsHandler.drop_table(dbName, tblName2, true); + + // Read the altered "tbl" via CachedStore + tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName); + compareTables(tblRead, newTable); + + // Try to read the dropped "tbl2" via CachedStore (should throw exception) + tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName2); + Assert.assertNull(tblRead); + + // Clean up + hmsHandler.drop_database(dbName, true, true); + + tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName2); + Assert.assertNull(tblRead); + + tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName); + Assert.assertNull(tblRead); + + sharedCache.getDatabaseCache().clear(); + sharedCache.getTableCache().clear(); + sharedCache.getSdCache().clear(); + } + + @Test + public void testPartitionOpsForUpdateUsingEvents() throws Exception { + long lastEventId = -1; + RawStore rawStore = hmsHandler.getMS(); + + // Add a db via rawStore + String dbName = "test_partition_ops"; + String dbOwner = "user1"; + Database db = createTestDb(dbName, dbOwner); + hmsHandler.create_database(db); + db = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); + + // Add a table via rawStore + String tblName = "tbl"; + String tblOwner = "user1"; + FieldSchema col1 = new FieldSchema("col1", "int", "integer column"); + FieldSchema col2 = new FieldSchema("col2", "string", "string column"); + List<FieldSchema> cols = new ArrayList<FieldSchema>(); + cols.add(col1); + cols.add(col2); + FieldSchema ptnCol1 = new FieldSchema("part1", "string", "string partition column"); + List<FieldSchema> ptnCols = new ArrayList<FieldSchema>(); + ptnCols.add(ptnCol1); + Table tbl = createTestTbl(dbName, tblName, tblOwner, cols, ptnCols); + hmsHandler.create_table(tbl); + tbl = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName); + + // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); + CachedStore.prewarm(rawStore); + + final String ptnColVal1 = "aaa"; + Map<String, String> partParams = new HashMap<String, String>(); + Partition ptn1 = + new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0, 0, tbl.getSd(), partParams); + ptn1.setCatName(DEFAULT_CATALOG_NAME); + hmsHandler.add_partition(ptn1); + ptn1 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1)); + + final String ptnColVal2 = "bbb"; + Partition ptn2 = + new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0, 0, tbl.getSd(), partParams); + ptn2.setCatName(DEFAULT_CATALOG_NAME); + hmsHandler.add_partition(ptn2); + ptn2 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal2)); + + // Read database, table, partition via CachedStore + Database dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME.toLowerCase(), dbName.toLowerCase()); + Assert.assertEquals(db, dbRead); + Table tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME.toLowerCase(), dbName.toLowerCase(), tblName.toLowerCase()); + compareTables(tbl, tblRead); + Partition ptn1Read = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME.toLowerCase(), dbName.toLowerCase(), tblName.toLowerCase(), Arrays.asList(ptnColVal1)); + comparePartitions(ptn1, ptn1Read); + Partition ptn2Read = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME.toLowerCase(), dbName.toLowerCase(), tblName.toLowerCase(), Arrays.asList(ptnColVal2)); + comparePartitions(ptn2, ptn2Read); + + // Add a new partition via rawStore + final String ptnColVal3 = "ccc"; + Partition ptn3 = + new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0, 0, tbl.getSd(), partParams); + ptn3.setCatName(DEFAULT_CATALOG_NAME); + hmsHandler.add_partition(ptn3); + ptn3 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal3)); + + // Alter an existing partition ("aaa") via rawStore + ptn1 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1)); + final String ptnColVal1Alt = "aaa"; + Partition ptn1Atl = + new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0, 0, tbl.getSd(), partParams); + ptn1Atl.setCatName(DEFAULT_CATALOG_NAME); + hmsHandler.alter_partitions(dbName, tblName, Arrays.asList(ptn1Atl)); + ptn1Atl = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1Alt)); + + // Drop an existing partition ("bbb") via rawStore + Partition ptnDrop = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal2)); + hmsHandler.drop_partition(dbName, tblName, Arrays.asList(ptnColVal2), false); + + // Read the newly added partition via CachedStore + Partition ptnRead = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal3)); + comparePartitions(ptn3, ptnRead); + + // Read the altered partition via CachedStore + ptnRead = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1Alt)); + Assert.assertEquals(ptn1Atl.getParameters(), ptnRead.getParameters()); + + ptnRead = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal2)); + Assert.assertEquals(null, ptnRead); + + // Drop table "tbl" via rawStore, it should remove the partition also + hmsHandler.drop_table(dbName, tblName, true); + + ptnRead = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1Alt)); + Assert.assertEquals(null, ptnRead); + + ptnRead = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal3)); + Assert.assertEquals(null, ptnRead); + + // Clean up + rawStore.dropDatabase(DEFAULT_CATALOG_NAME, dbName); + sharedCache.getDatabaseCache().clear(); + sharedCache.getTableCache().clear(); + sharedCache.getSdCache().clear(); + } + + @Test + public void testTableColumnStatistics() throws Throwable { + String dbName = "column_stats_test_db"; + String tblName = "tbl"; + String typeName = "person"; + String tblOwner = "testowner"; + int lastAccessed = 6796; + String dbOwner = "user1"; + + // Add a db via rawStore + Database db = createTestDb(dbName, dbOwner); + hmsHandler.create_database(db); + db = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); + + Map<String, String> tableParams = new HashMap<>(); + tableParams.put("test_param_1", "hi"); + tableParams.put("test_param_2", "50"); + + // Add a table via rawStore + List<FieldSchema> cols = new ArrayList<FieldSchema>(); + cols.add(new FieldSchema("income", "int", "integer column")); + cols.add(new FieldSchema("name", "string", "string column")); + + List<FieldSchema> ptnCols = new ArrayList<FieldSchema>(); + ptnCols.add(new FieldSchema("ds", "string", "string partition column")); + ptnCols.add(new FieldSchema("hr", "int", "integer partition column")); + + Table tbl = createTestTblParam(dbName, tblName, tblOwner, cols, null, tableParams); + hmsHandler.create_table(tbl); + + // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); + CachedStore.prewarm(rawStore); + + // Create a ColumnStatistics Obj + String[] colName = new String[]{"income", "name"}; + double lowValue = 50000.21; + double highValue = 1200000.4525; + long numNulls = 3; + long numDVs = 22; + double avgColLen = 50.30; + long maxColLen = 102; + String[] colType = new String[] {"double", "string"}; + boolean isTblLevel = true; + String partName = null; + List<ColumnStatisticsObj> statsObjs = new ArrayList<>(); + + ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(); + statsDesc.setDbName(dbName); + statsDesc.setTableName(tblName); + statsDesc.setIsTblLevel(isTblLevel); + statsDesc.setPartName(partName); + + ColumnStatisticsObj statsObj = new ColumnStatisticsObj(); + statsObj.setColName(colName[0]); + statsObj.setColType(colType[0]); + + ColumnStatisticsData statsData = new ColumnStatisticsData(); + DoubleColumnStatsData numericStats = new DoubleColumnStatsData(); + statsData.setDoubleStats(numericStats); + + statsData.getDoubleStats().setHighValue(highValue); + statsData.getDoubleStats().setLowValue(lowValue); + statsData.getDoubleStats().setNumDVs(numDVs); + statsData.getDoubleStats().setNumNulls(numNulls); + + statsObj.setStatsData(statsData); + statsObjs.add(statsObj); + + statsObj = new ColumnStatisticsObj(); + statsObj.setColName(colName[1]); + statsObj.setColType(colType[1]); + + statsData = new ColumnStatisticsData(); + StringColumnStatsData stringStats = new StringColumnStatsData(); + statsData.setStringStats(stringStats); + statsData.getStringStats().setAvgColLen(avgColLen); + statsData.getStringStats().setMaxColLen(maxColLen); + statsData.getStringStats().setNumDVs(numDVs); + statsData.getStringStats().setNumNulls(numNulls); + + statsObj.setStatsData(statsData); + statsObjs.add(statsObj); + + ColumnStatistics colStats = new ColumnStatistics(); + colStats.setStatsDesc(statsDesc); + colStats.setStatsObj(statsObjs); + + // write stats objs persistently + hmsHandler.update_table_column_statistics(colStats); + + ColumnStatisticsObj colStatsCache = sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME, + dbName, tblName, Lists.newArrayList(colName[0])).get(0); + Assert.assertEquals(colStatsCache.getColName(), colName[0]); + Assert.assertEquals(colStatsCache.getStatsData().getDoubleStats().getLowValue(), lowValue, 0.01); + Assert.assertEquals(colStatsCache.getStatsData().getDoubleStats().getHighValue(), highValue, 0.01); + Assert.assertEquals(colStatsCache.getStatsData().getDoubleStats().getNumNulls(), numNulls); + Assert.assertEquals(colStatsCache.getStatsData().getDoubleStats().getNumDVs(), numDVs); + + // test delete column stats; if no col name is passed all column stats associated with the + // table is deleted + boolean status = hmsHandler.delete_table_column_statistics(dbName, tblName, null); + Assert.assertEquals(status, true); + + Assert.assertEquals(sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME, + dbName, tblName, Lists.newArrayList(colName[0])).isEmpty(), true); + + tblName = "tbl_part"; + cols = new ArrayList<>(); + cols.add(new FieldSchema(colName[0], "int", null)); + List<FieldSchema> partCols = new ArrayList<>(); + partCols.add(new FieldSchema("col", "int", null)); + StorageDescriptor sd = + new StorageDescriptor(cols, null, "input", "output", false, + 0, new SerDeInfo("serde", "seriallib", new HashMap<>()), + null, null, null); + + tbl = new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new HashMap<>(), + null, null, TableType.MANAGED_TABLE.toString()); + tbl.setCatName(DEFAULT_CATALOG_NAME); + + hmsHandler.create_table(tbl); + + List<String> partVals1 = new ArrayList<>(); + partVals1.add("1"); + List<String> partVals2 = new ArrayList<>(); + partVals2.add("2"); + + Partition ptn1 = + new Partition(partVals1, dbName, tblName, 0, 0, sd, new HashMap<>()); + ptn1.setCatName(DEFAULT_CATALOG_NAME); + hmsHandler.add_partition(ptn1); + Partition ptn2 = + new Partition(partVals2, dbName, tblName, 0, 0, sd, new HashMap<>()); + ptn2.setCatName(DEFAULT_CATALOG_NAME); + hmsHandler.add_partition(ptn2); + + List<String> partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1); + partName = partitions.get(0); + isTblLevel = false; + + // create a new columnstatistics desc to represent partition level column stats + statsDesc = new ColumnStatisticsDesc(); + statsDesc.setDbName(dbName); + statsDesc.setTableName(tblName); + statsDesc.setPartName(partName); + statsDesc.setIsTblLevel(isTblLevel); + + colStats = new ColumnStatistics(); + colStats.setStatsDesc(statsDesc); + colStats.setStatsObj(statsObjs); + + hmsHandler.update_partition_column_statistics(colStats); + ColumnStatisticsObj colStats2 = sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, + CachedStore.partNameToVals(partName), colName[1]); + // compare stats obj to ensure what we get is what we wrote + Assert.assertEquals(colStats.getStatsDesc().getPartName(), partName); + Assert.assertEquals(colStats2.getColName(), colName[1]); + Assert.assertEquals(colStats2.getStatsData().getStringStats().getMaxColLen(), maxColLen); + Assert.assertEquals(colStats2.getStatsData().getStringStats().getAvgColLen(), avgColLen, 0.01); + Assert.assertEquals(colStats2.getStatsData().getStringStats().getNumNulls(), numNulls); + Assert.assertEquals(colStats2.getStatsData().getStringStats().getNumDVs(), numDVs); + + // test stats deletion at partition level + hmsHandler.delete_partition_column_statistics(dbName, tblName, partName, colName[1]); + + colStats2 = sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, + CachedStore.partNameToVals(partName), colName[1]); + Assert.assertEquals(colStats2, null); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 822532c..4472a61 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -496,7 +496,7 @@ public class TestReplicationScenariosAcidTables { primary.dump(primaryDbName, bootStrapDump.lastReplicationId); long lastReplId = Long.parseLong(bootStrapDump.lastReplicationId); - primary.testEventCounts(primaryDbName, lastReplId, null, null, 21); + primary.testEventCounts(primaryDbName, lastReplId, null, null, 22); // Test load replica.load(replicatedDbName, incrementalDump.dumpLocation) http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java index 2e42267..4ec1bac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java @@ -41,6 +41,10 @@ import org.apache.hadoop.hive.ql.parse.repl.load.message.OpenTxnHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.CommitTxnHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.AbortTxnHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.AllocWriteIdHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.UpdateTableColStatHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.DeleteTableColStatHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.UpdatePartColStatHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.DeletePartColStatHandler; public enum DumpType { @@ -211,6 +215,30 @@ public enum DumpType { public MessageHandler handler() { return new AllocWriteIdHandler(); } + }, + EVENT_UPDATE_TABLE_COL_STAT("EVENT_UPDATE_TABLE_COL_STAT") { + @Override + public MessageHandler handler() { + return new UpdateTableColStatHandler(); + } + }, + EVENT_DELETE_TABLE_COL_STAT("EVENT_DELETE_TABLE_COL_STAT") { + @Override + public MessageHandler handler() { + return new DeleteTableColStatHandler(); + } + }, + EVENT_UPDATE_PART_COL_STAT("EVENT_UPDATE_PART_COL_STAT") { + @Override + public MessageHandler handler() { + return new UpdatePartColStatHandler(); + } + }, + EVENT_DELETE_PART_COL_STAT("EVENT_DELETE_PART_COL_STAT") { + @Override + public MessageHandler handler() { + return new DeletePartColStatHandler(); + } }; String type = null; http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DeletePartColStatHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DeletePartColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DeletePartColStatHandler.java new file mode 100644 index 0000000..7edbcff --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DeletePartColStatHandler.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.DeletePartitionColumnStatMessage; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; + +class DeletePartColStatHandler extends AbstractEventHandler<DeletePartitionColumnStatMessage> { + + DeletePartColStatHandler(NotificationEvent event) { + super(event); + } + + @Override + DeletePartitionColumnStatMessage eventMessage(String stringRepresentation) { + return deserializer.getDeletePartitionColumnStatMessage(stringRepresentation); + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Processing#{} DeletePartitionColumnStatMessage message : {}", fromEventId(), eventMessageAsJSON); + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(eventMessageAsJSON); + dmd.write(); + } + + @Override + public DumpType dumpType() { + return DumpType.EVENT_DELETE_PART_COL_STAT; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DeleteTableColStatHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DeleteTableColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DeleteTableColStatHandler.java new file mode 100644 index 0000000..8e94cea --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DeleteTableColStatHandler.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.DeleteTableColumnStatMessage; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; + +class DeleteTableColStatHandler extends AbstractEventHandler<DeleteTableColumnStatMessage> { + + DeleteTableColStatHandler(NotificationEvent event) { + super(event); + } + + @Override + DeleteTableColumnStatMessage eventMessage(String stringRepresentation) { + return deserializer.getDeleteTableColumnStatMessage(stringRepresentation); + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Processing#{} DeleteTableColumnStat message : {}", fromEventId(), eventMessageAsJSON); + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(eventMessageAsJSON); + dmd.write(); + } + + @Override + public DumpType dumpType() { + return DumpType.EVENT_DELETE_TABLE_COL_STAT; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java index 2a0379e..91f5503 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java @@ -54,6 +54,10 @@ public class EventHandlerFactory { register(MessageBuilder.COMMIT_TXN_EVENT, CommitTxnHandler.class); register(MessageBuilder.ABORT_TXN_EVENT, AbortTxnHandler.class); register(MessageBuilder.ALLOC_WRITE_ID_EVENT, AllocWriteIdHandler.class); + register(MessageBuilder.UPDATE_TBL_COL_STAT_EVENT, UpdateTableColStatHandler.class); + register(MessageBuilder.DELETE_TBL_COL_STAT_EVENT, DeleteTableColStatHandler.class); + register(MessageBuilder.UPDATE_PART_COL_STAT_EVENT, UpdatePartColStatHandler.class); + register(MessageBuilder.DELETE_PART_COL_STAT_EVENT, DeletePartColStatHandler.class); } static void register(String event, Class<? extends EventHandler> handlerClazz) { http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java new file mode 100644 index 0000000..332005b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.UpdatePartitionColumnStatMessage; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; + +class UpdatePartColStatHandler extends AbstractEventHandler<UpdatePartitionColumnStatMessage> { + + UpdatePartColStatHandler(NotificationEvent event) { + super(event); + } + + @Override + UpdatePartitionColumnStatMessage eventMessage(String stringRepresentation) { + return deserializer.getUpdatePartitionColumnStatMessage(stringRepresentation); + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Processing#{} UpdateTableColumnStat message : {}", fromEventId(), eventMessageAsJSON); + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(eventMessageAsJSON); + dmd.write(); + } + + @Override + public DumpType dumpType() { + return DumpType.EVENT_UPDATE_PART_COL_STAT; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java new file mode 100644 index 0000000..a3aecde --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.UpdateTableColumnStatMessage; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; + +class UpdateTableColStatHandler extends AbstractEventHandler<UpdateTableColumnStatMessage> { + + UpdateTableColStatHandler(NotificationEvent event) { + super(event); + } + + @Override + UpdateTableColumnStatMessage eventMessage(String stringRepresentation) { + return deserializer.getUpdateTableColumnStatMessage(stringRepresentation); + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Processing#{} UpdateTableColumnStat message : {}", fromEventId(), eventMessageAsJSON); + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(eventMessageAsJSON); + dmd.write(); + } + + @Override + public DumpType dumpType() { + return DumpType.EVENT_UPDATE_TABLE_COL_STAT; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DeletePartColStatHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DeletePartColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DeletePartColStatHandler.java new file mode 100644 index 0000000..095d377 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DeletePartColStatHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +/** + * DeletePartColStatHandler + * Target(Load) side handler for partition stat delete event + */ +public class DeletePartColStatHandler extends AbstractMessageHandler { + @Override + public List<Task<? extends Serializable>> handle(Context context) + throws SemanticException { + context.log.info("Replication of partition stat delete event is not supported yet"); + if (!context.isDbNameEmpty()) { + updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null); + } + return Collections.singletonList(TaskFactory.get(new DependencyCollectionWork(), context.hiveConf)); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DeleteTableColStatHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DeleteTableColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DeleteTableColStatHandler.java new file mode 100644 index 0000000..488f89d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DeleteTableColStatHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +/** + * DeleteTableColStatHandler + * Target(Load) side handler for table stat delete event + */ +public class DeleteTableColStatHandler extends AbstractMessageHandler { + @Override + public List<Task<? extends Serializable>> handle(Context context) + throws SemanticException { + context.log.info("Replication of table stat delete event is not supported yet"); + if (!context.isDbNameEmpty()) { + updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null); + } + return Collections.singletonList(TaskFactory.get(new DependencyCollectionWork(), context.hiveConf)); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdatePartColStatHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdatePartColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdatePartColStatHandler.java new file mode 100644 index 0000000..4ba2ac4 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdatePartColStatHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +/** + * UpdatePartColStatHandler + * Target(Load) side handler for partition stat update event + */ +public class UpdatePartColStatHandler extends AbstractMessageHandler { + @Override + public List<Task<? extends Serializable>> handle(Context context) + throws SemanticException { + context.log.info("Replication of partition stat update event is not supported yet"); + if (!context.isDbNameEmpty()) { + updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null); + } + return Collections.singletonList(TaskFactory.get(new DependencyCollectionWork(), context.hiveConf)); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java new file mode 100644 index 0000000..eb3d18a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +/** + * UpdateTableColStatHandler + * Target(Load) side handler for table stat update event + */ +public class UpdateTableColStatHandler extends AbstractMessageHandler { + @Override + public List<Task<? extends Serializable>> handle(Context context) + throws SemanticException { + context.log.info("Replication of table stat update event is not supported yet"); + if (!context.isDbNameEmpty()) { + updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null); + } + return Collections.singletonList(TaskFactory.get(new DependencyCollectionWork(), context.hiveConf)); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventRequest.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventRequest.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventRequest.java index f016204..77e5771 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventRequest.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventRequest.java @@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory; private static final org.apache.thrift.protocol.TField LAST_EVENT_FIELD_DESC = new org.apache.thrift.protocol.TField("lastEvent", org.apache.thrift.protocol.TType.I64, (short)1); private static final org.apache.thrift.protocol.TField MAX_EVENTS_FIELD_DESC = new org.apache.thrift.protocol.TField("maxEvents", org.apache.thrift.protocol.TType.I32, (short)2); + private static final org.apache.thrift.protocol.TField EVENT_TYPE_SKIP_LIST_FIELD_DESC = new org.apache.thrift.protocol.TField("eventTypeSkipList", org.apache.thrift.protocol.TType.LIST, (short)3); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { @@ -49,11 +50,13 @@ import org.slf4j.LoggerFactory; private long lastEvent; // required private int maxEvents; // optional + private List<String> eventTypeSkipList; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { LAST_EVENT((short)1, "lastEvent"), - MAX_EVENTS((short)2, "maxEvents"); + MAX_EVENTS((short)2, "maxEvents"), + EVENT_TYPE_SKIP_LIST((short)3, "eventTypeSkipList"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -72,6 +75,8 @@ import org.slf4j.LoggerFactory; return LAST_EVENT; case 2: // MAX_EVENTS return MAX_EVENTS; + case 3: // EVENT_TYPE_SKIP_LIST + return EVENT_TYPE_SKIP_LIST; default: return null; } @@ -115,7 +120,7 @@ import org.slf4j.LoggerFactory; private static final int __LASTEVENT_ISSET_ID = 0; private static final int __MAXEVENTS_ISSET_ID = 1; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.MAX_EVENTS}; + private static final _Fields optionals[] = {_Fields.MAX_EVENTS,_Fields.EVENT_TYPE_SKIP_LIST}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -123,6 +128,9 @@ import org.slf4j.LoggerFactory; new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); tmpMap.put(_Fields.MAX_EVENTS, new org.apache.thrift.meta_data.FieldMetaData("maxEvents", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); + tmpMap.put(_Fields.EVENT_TYPE_SKIP_LIST, new org.apache.thrift.meta_data.FieldMetaData("eventTypeSkipList", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(NotificationEventRequest.class, metaDataMap); } @@ -145,6 +153,10 @@ import org.slf4j.LoggerFactory; __isset_bitfield = other.__isset_bitfield; this.lastEvent = other.lastEvent; this.maxEvents = other.maxEvents; + if (other.isSetEventTypeSkipList()) { + List<String> __this__eventTypeSkipList = new ArrayList<String>(other.eventTypeSkipList); + this.eventTypeSkipList = __this__eventTypeSkipList; + } } public NotificationEventRequest deepCopy() { @@ -157,6 +169,7 @@ import org.slf4j.LoggerFactory; this.lastEvent = 0; setMaxEventsIsSet(false); this.maxEvents = 0; + this.eventTypeSkipList = null; } public long getLastEvent() { @@ -203,6 +216,44 @@ import org.slf4j.LoggerFactory; __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __MAXEVENTS_ISSET_ID, value); } + public int getEventTypeSkipListSize() { + return (this.eventTypeSkipList == null) ? 0 : this.eventTypeSkipList.size(); + } + + public java.util.Iterator<String> getEventTypeSkipListIterator() { + return (this.eventTypeSkipList == null) ? null : this.eventTypeSkipList.iterator(); + } + + public void addToEventTypeSkipList(String elem) { + if (this.eventTypeSkipList == null) { + this.eventTypeSkipList = new ArrayList<String>(); + } + this.eventTypeSkipList.add(elem); + } + + public List<String> getEventTypeSkipList() { + return this.eventTypeSkipList; + } + + public void setEventTypeSkipList(List<String> eventTypeSkipList) { + this.eventTypeSkipList = eventTypeSkipList; + } + + public void unsetEventTypeSkipList() { + this.eventTypeSkipList = null; + } + + /** Returns true if field eventTypeSkipList is set (has been assigned a value) and false otherwise */ + public boolean isSetEventTypeSkipList() { + return this.eventTypeSkipList != null; + } + + public void setEventTypeSkipListIsSet(boolean value) { + if (!value) { + this.eventTypeSkipList = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case LAST_EVENT: @@ -221,6 +272,14 @@ import org.slf4j.LoggerFactory; } break; + case EVENT_TYPE_SKIP_LIST: + if (value == null) { + unsetEventTypeSkipList(); + } else { + setEventTypeSkipList((List<String>)value); + } + break; + } } @@ -232,6 +291,9 @@ import org.slf4j.LoggerFactory; case MAX_EVENTS: return getMaxEvents(); + case EVENT_TYPE_SKIP_LIST: + return getEventTypeSkipList(); + } throw new IllegalStateException(); } @@ -247,6 +309,8 @@ import org.slf4j.LoggerFactory; return isSetLastEvent(); case MAX_EVENTS: return isSetMaxEvents(); + case EVENT_TYPE_SKIP_LIST: + return isSetEventTypeSkipList(); } throw new IllegalStateException(); } @@ -282,6 +346,15 @@ import org.slf4j.LoggerFactory; return false; } + boolean this_present_eventTypeSkipList = true && this.isSetEventTypeSkipList(); + boolean that_present_eventTypeSkipList = true && that.isSetEventTypeSkipList(); + if (this_present_eventTypeSkipList || that_present_eventTypeSkipList) { + if (!(this_present_eventTypeSkipList && that_present_eventTypeSkipList)) + return false; + if (!this.eventTypeSkipList.equals(that.eventTypeSkipList)) + return false; + } + return true; } @@ -299,6 +372,11 @@ import org.slf4j.LoggerFactory; if (present_maxEvents) list.add(maxEvents); + boolean present_eventTypeSkipList = true && (isSetEventTypeSkipList()); + list.add(present_eventTypeSkipList); + if (present_eventTypeSkipList) + list.add(eventTypeSkipList); + return list.hashCode(); } @@ -330,6 +408,16 @@ import org.slf4j.LoggerFactory; return lastComparison; } } + lastComparison = Boolean.valueOf(isSetEventTypeSkipList()).compareTo(other.isSetEventTypeSkipList()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetEventTypeSkipList()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.eventTypeSkipList, other.eventTypeSkipList); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -359,6 +447,16 @@ import org.slf4j.LoggerFactory; sb.append(this.maxEvents); first = false; } + if (isSetEventTypeSkipList()) { + if (!first) sb.append(", "); + sb.append("eventTypeSkipList:"); + if (this.eventTypeSkipList == null) { + sb.append("null"); + } else { + sb.append(this.eventTypeSkipList); + } + first = false; + } sb.append(")"); return sb.toString(); } @@ -424,6 +522,24 @@ import org.slf4j.LoggerFactory; org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 3: // EVENT_TYPE_SKIP_LIST + if (schemeField.type == org.apache.thrift.protocol.TType.LIST) { + { + org.apache.thrift.protocol.TList _list732 = iprot.readListBegin(); + struct.eventTypeSkipList = new ArrayList<String>(_list732.size); + String _elem733; + for (int _i734 = 0; _i734 < _list732.size; ++_i734) + { + _elem733 = iprot.readString(); + struct.eventTypeSkipList.add(_elem733); + } + iprot.readListEnd(); + } + struct.setEventTypeSkipListIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -445,6 +561,20 @@ import org.slf4j.LoggerFactory; oprot.writeI32(struct.maxEvents); oprot.writeFieldEnd(); } + if (struct.eventTypeSkipList != null) { + if (struct.isSetEventTypeSkipList()) { + oprot.writeFieldBegin(EVENT_TYPE_SKIP_LIST_FIELD_DESC); + { + oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.eventTypeSkipList.size())); + for (String _iter735 : struct.eventTypeSkipList) + { + oprot.writeString(_iter735); + } + oprot.writeListEnd(); + } + oprot.writeFieldEnd(); + } + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -467,10 +597,22 @@ import org.slf4j.LoggerFactory; if (struct.isSetMaxEvents()) { optionals.set(0); } - oprot.writeBitSet(optionals, 1); + if (struct.isSetEventTypeSkipList()) { + optionals.set(1); + } + oprot.writeBitSet(optionals, 2); if (struct.isSetMaxEvents()) { oprot.writeI32(struct.maxEvents); } + if (struct.isSetEventTypeSkipList()) { + { + oprot.writeI32(struct.eventTypeSkipList.size()); + for (String _iter736 : struct.eventTypeSkipList) + { + oprot.writeString(_iter736); + } + } + } } @Override @@ -478,11 +620,24 @@ import org.slf4j.LoggerFactory; TTupleProtocol iprot = (TTupleProtocol) prot; struct.lastEvent = iprot.readI64(); struct.setLastEventIsSet(true); - BitSet incoming = iprot.readBitSet(1); + BitSet incoming = iprot.readBitSet(2); if (incoming.get(0)) { struct.maxEvents = iprot.readI32(); struct.setMaxEventsIsSet(true); } + if (incoming.get(1)) { + { + org.apache.thrift.protocol.TList _list737 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32()); + struct.eventTypeSkipList = new ArrayList<String>(_list737.size); + String _elem738; + for (int _i739 = 0; _i739 < _list737.size; ++_i739) + { + _elem738 = iprot.readString(); + struct.eventTypeSkipList.add(_elem738); + } + } + struct.setEventTypeSkipListIsSet(true); + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php index 39f8b1f..48396ea 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php @@ -22347,6 +22347,10 @@ class NotificationEventRequest { * @var int */ public $maxEvents = null; + /** + * @var string[] + */ + public $eventTypeSkipList = null; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -22359,6 +22363,14 @@ class NotificationEventRequest { 'var' => 'maxEvents', 'type' => TType::I32, ), + 3 => array( + 'var' => 'eventTypeSkipList', + 'type' => TType::LST, + 'etype' => TType::STRING, + 'elem' => array( + 'type' => TType::STRING, + ), + ), ); } if (is_array($vals)) { @@ -22368,6 +22380,9 @@ class NotificationEventRequest { if (isset($vals['maxEvents'])) { $this->maxEvents = $vals['maxEvents']; } + if (isset($vals['eventTypeSkipList'])) { + $this->eventTypeSkipList = $vals['eventTypeSkipList']; + } } } @@ -22404,6 +22419,23 @@ class NotificationEventRequest { $xfer += $input->skip($ftype); } break; + case 3: + if ($ftype == TType::LST) { + $this->eventTypeSkipList = array(); + $_size647 = 0; + $_etype650 = 0; + $xfer += $input->readListBegin($_etype650, $_size647); + for ($_i651 = 0; $_i651 < $_size647; ++$_i651) + { + $elem652 = null; + $xfer += $input->readString($elem652); + $this->eventTypeSkipList []= $elem652; + } + $xfer += $input->readListEnd(); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -22427,6 +22459,23 @@ class NotificationEventRequest { $xfer += $output->writeI32($this->maxEvents); $xfer += $output->writeFieldEnd(); } + if ($this->eventTypeSkipList !== null) { + if (!is_array($this->eventTypeSkipList)) { + throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA); + } + $xfer += $output->writeFieldBegin('eventTypeSkipList', TType::LST, 3); + { + $output->writeListBegin(TType::STRING, count($this->eventTypeSkipList)); + { + foreach ($this->eventTypeSkipList as $iter653) + { + $xfer += $output->writeString($iter653); + } + } + $output->writeListEnd(); + } + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 4ef4aad..b85731e 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -15582,17 +15582,20 @@ class NotificationEventRequest: Attributes: - lastEvent - maxEvents + - eventTypeSkipList """ thrift_spec = ( None, # 0 (1, TType.I64, 'lastEvent', None, None, ), # 1 (2, TType.I32, 'maxEvents', None, None, ), # 2 + (3, TType.LIST, 'eventTypeSkipList', (TType.STRING,None), None, ), # 3 ) - def __init__(self, lastEvent=None, maxEvents=None,): + def __init__(self, lastEvent=None, maxEvents=None, eventTypeSkipList=None,): self.lastEvent = lastEvent self.maxEvents = maxEvents + self.eventTypeSkipList = eventTypeSkipList def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -15613,6 +15616,16 @@ class NotificationEventRequest: self.maxEvents = iprot.readI32() else: iprot.skip(ftype) + elif fid == 3: + if ftype == TType.LIST: + self.eventTypeSkipList = [] + (_etype647, _size644) = iprot.readListBegin() + for _i648 in xrange(_size644): + _elem649 = iprot.readString() + self.eventTypeSkipList.append(_elem649) + iprot.readListEnd() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -15631,6 +15644,13 @@ class NotificationEventRequest: oprot.writeFieldBegin('maxEvents', TType.I32, 2) oprot.writeI32(self.maxEvents) oprot.writeFieldEnd() + if self.eventTypeSkipList is not None: + oprot.writeFieldBegin('eventTypeSkipList', TType.LIST, 3) + oprot.writeListBegin(TType.STRING, len(self.eventTypeSkipList)) + for iter650 in self.eventTypeSkipList: + oprot.writeString(iter650) + oprot.writeListEnd() + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -15644,6 +15664,7 @@ class NotificationEventRequest: value = 17 value = (value * 31) ^ hash(self.lastEvent) value = (value * 31) ^ hash(self.maxEvents) + value = (value * 31) ^ hash(self.eventTypeSkipList) return value def __repr__(self): http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb index 97dc069..6215479 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -3460,10 +3460,12 @@ class NotificationEventRequest include ::Thrift::Struct, ::Thrift::Struct_Union LASTEVENT = 1 MAXEVENTS = 2 + EVENTTYPESKIPLIST = 3 FIELDS = { LASTEVENT => {:type => ::Thrift::Types::I64, :name => 'lastEvent'}, - MAXEVENTS => {:type => ::Thrift::Types::I32, :name => 'maxEvents', :optional => true} + MAXEVENTS => {:type => ::Thrift::Types::I32, :name => 'maxEvents', :optional => true}, + EVENTTYPESKIPLIST => {:type => ::Thrift::Types::LIST, :name => 'eventTypeSkipList', :element => {:type => ::Thrift::Types::STRING}, :optional => true} } def struct_fields; FIELDS; end http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 400097b..fb0b2fe 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -953,6 +953,8 @@ public class MetastoreConf { "Time interval describing how often the reaper runs"), TOKEN_SIGNATURE("metastore.token.signature", "hive.metastore.token.signature", "", "The delegation token service name to match when selecting a token from the current user's tokens."), + METASTORE_CACHE_CAN_USE_EVENT("metastore.cache.can.use.event", "hive.metastore.cache.can.use.event", false, + "Can notification events from notification log table be used for updating the metastore cache."), TRANSACTIONAL_EVENT_LISTENERS("metastore.transactional.event.listeners", "hive.metastore.transactional.event.listeners", "", "A comma separated list of Java classes that implement the org.apache.riven.MetaStoreEventListener" +
