HIVE-20679: DDL operations on hive might create large messages for DBNotification (Anishek Agarwal, reviewed by Sankar Hariappan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b4302bb7 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b4302bb7 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b4302bb7 Branch: refs/heads/master Commit: b4302bb7ad967f15ca1b708685b2ac669e3cf037 Parents: b829955 Author: Anishek Agarwal <[email protected]> Authored: Mon Oct 22 13:51:43 2018 +0530 Committer: Anishek Agarwal <[email protected]> Committed: Mon Oct 22 13:51:43 2018 +0530 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 +- .../listener/DbNotificationListener.java | 182 +++++--- .../json/JSONCreateFunctionMessage.java | 3 +- .../messaging/json/JSONDropFunctionMessage.java | 3 +- .../messaging/json/JSONMessageFactory.java | 39 +- .../listener/TestDbNotificationListener.java | 14 +- .../TestReplAcidTablesWithJsonMessage.java | 43 ++ ...eplAcrossInstancesWithJsonMessageFormat.java | 45 ++ ...ncrementalLoadAcidTablesWithJsonMessage.java | 46 ++ .../ql/parse/TestReplWithJsonMessageFormat.java | 39 ++ .../hive/ql/parse/TestReplicationScenarios.java | 82 ++-- .../TestReplicationScenariosAcidTables.java | 61 +-- ...TestReplicationScenariosAcrossInstances.java | 103 +++-- ...ationScenariosIncrementalLoadAcidTables.java | 55 ++- .../hadoop/hive/ql/parse/WarehouseInstance.java | 2 +- .../ql/cache/results/QueryResultsCache.java | 14 +- .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 5 +- .../repl/bootstrap/load/LoadConstraint.java | 4 +- .../parse/repl/dump/events/AbortTxnHandler.java | 12 +- .../events/AbstractConstraintEventHandler.java | 3 +- .../repl/dump/events/AbstractEventHandler.java | 32 +- .../repl/dump/events/AddForeignKeyHandler.java | 12 +- .../events/AddNotNullConstraintHandler.java | 12 +- .../repl/dump/events/AddPartitionHandler.java | 10 +- .../repl/dump/events/AddPrimaryKeyHandler.java | 12 +- .../dump/events/AddUniqueConstraintHandler.java | 13 +- .../repl/dump/events/AllocWriteIdHandler.java | 12 +- .../repl/dump/events/AlterDatabaseHandler.java | 12 +- .../repl/dump/events/AlterPartitionHandler.java | 21 +- .../repl/dump/events/AlterTableHandler.java | 18 +- .../repl/dump/events/CommitTxnHandler.java | 28 +- .../repl/dump/events/CreateDatabaseHandler.java | 13 +- .../repl/dump/events/CreateFunctionHandler.java | 13 +- .../repl/dump/events/CreateTableHandler.java | 15 +- .../parse/repl/dump/events/DefaultHandler.java | 9 + .../repl/dump/events/DropConstraintHandler.java | 13 +- .../repl/dump/events/DropDatabaseHandler.java | 12 +- .../repl/dump/events/DropFunctionHandler.java | 12 +- .../repl/dump/events/DropPartitionHandler.java | 12 +- .../repl/dump/events/DropTableHandler.java | 12 +- .../repl/dump/events/EventHandlerFactory.java | 44 +- .../parse/repl/dump/events/InsertHandler.java | 22 +- .../parse/repl/dump/events/OpenTxnHandler.java | 12 +- .../repl/dump/io/ConstraintsSerializer.java | 10 +- .../load/message/AbstractMessageHandler.java | 4 +- .../dump/events/TestEventHandlerFactory.java | 7 +- .../hive/metastore/conf/MetastoreConf.java | 2 +- .../hive/metastore/messaging/EventMessage.java | 64 +-- .../metastore/messaging/MessageBuilder.java | 425 ++++++++++++++++++ .../metastore/messaging/MessageEncoder.java | 27 ++ .../metastore/messaging/MessageFactory.java | 367 +++------------- .../metastore/messaging/MessageSerializer.java | 24 ++ .../event/filters/DatabaseAndTableFilter.java | 8 +- .../messaging/json/JSONAcidWriteMessage.java | 9 +- .../json/JSONAddForeignKeyMessage.java | 5 +- .../json/JSONAddNotNullConstraintMessage.java | 5 +- .../messaging/json/JSONAddPartitionMessage.java | 11 +- .../json/JSONAddPrimaryKeyMessage.java | 5 +- .../json/JSONAddUniqueConstraintMessage.java | 5 +- .../messaging/json/JSONAlterCatalogMessage.java | 9 +- .../json/JSONAlterDatabaseMessage.java | 9 +- .../json/JSONAlterPartitionMessage.java | 15 +- .../messaging/json/JSONAlterTableMessage.java | 9 +- .../messaging/json/JSONCommitTxnMessage.java | 5 +- .../json/JSONCreateDatabaseMessage.java | 5 +- .../json/JSONCreateFunctionMessage.java | 5 +- .../messaging/json/JSONCreateTableMessage.java | 5 +- .../json/JSONDropPartitionMessage.java | 5 +- .../messaging/json/JSONDropTableMessage.java | 5 +- .../messaging/json/JSONInsertMessage.java | 9 +- .../messaging/json/JSONMessageEncoder.java | 70 +++ .../messaging/json/JSONMessageFactory.java | 432 ------------------- .../messaging/json/gzip/DeSerializer.java | 181 ++++++++ .../json/gzip/GzipJSONMessageEncoder.java | 68 +++ .../messaging/json/gzip/Serializer.java | 32 ++ .../hive/metastore/MetaStoreTestUtils.java | 11 + .../ptest2/conf/deployed/master-mr2.properties | 2 +- 77 files changed, 1781 insertions(+), 1202 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index bcf1e9e..ed6d3d8 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1176,7 +1176,7 @@ public class HiveConf extends Configuration { */ @Deprecated METASTORE_EVENT_MESSAGE_FACTORY("hive.metastore.event.message.factory", - "org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory", + "org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder", "Factory class for making encoding and decoding messages in the events generated."), /** * @deprecated Use MetastoreConf.EXECUTE_SET_UGI http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 4313e12..c23aab2 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -78,9 +78,32 @@ import org.apache.hadoop.hive.metastore.events.AbortTxnEvent; import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent; import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.metastore.events.AcidWriteEvent; +import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage; import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage; +import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage; +import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage; +import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage; +import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage; +import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; +import org.apache.hadoop.hive.metastore.messaging.DropConstraintMessage; +import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage; +import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; +import org.apache.hadoop.hive.metastore.messaging.EventMessage; import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; +import org.apache.hadoop.hive.metastore.messaging.InsertMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageEncoder; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.hive.metastore.messaging.MessageSerializer; import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage; import org.apache.hadoop.hive.metastore.messaging.PartitionFiles; import org.apache.hadoop.hive.metastore.tools.SQLGenerator; @@ -110,7 +133,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener private static CleanerThread cleaner = null; private Configuration conf; - private MessageFactory msgFactory; + private MessageEncoder msgEncoder; //cleaner is a static object, use static synchronized to make sure its thread-safe private static synchronized void init(Configuration conf) throws MetaException { @@ -126,7 +149,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener super(config); conf = config; DbNotificationListener.init(conf); - msgFactory = MessageFactory.getInstance(); + msgEncoder = MessageFactory.getDefaultInstance(conf); } /** @@ -172,9 +195,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener Table t = tableEvent.getTable(); FileIterator fileIter = MetaStoreUtils.isExternalTable(t) ? null : new FileIterator(t.getSd().getLocation()); + CreateTableMessage msg = + MessageBuilder.getInstance().buildCreateTableMessage(t, fileIter); NotificationEvent event = new NotificationEvent(0, now(), EventType.CREATE_TABLE.toString(), - msgFactory.buildCreateTableMessage(t, fileIter).toString()); + msgEncoder.getSerializer().serialize(msg)); event.setCatName(t.isSetCatName() ? t.getCatName() : DEFAULT_CATALOG_NAME); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); @@ -188,9 +213,10 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener @Override public void onDropTable(DropTableEvent tableEvent) throws MetaException { Table t = tableEvent.getTable(); + DropTableMessage msg = MessageBuilder.getInstance().buildDropTableMessage(t); NotificationEvent event = - new NotificationEvent(0, now(), EventType.DROP_TABLE.toString(), msgFactory - .buildDropTableMessage(t).toString()); + new NotificationEvent(0, now(), EventType.DROP_TABLE.toString(), + msgEncoder.getSerializer().serialize(msg)); event.setCatName(t.isSetCatName() ? t.getCatName() : DEFAULT_CATALOG_NAME); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); @@ -205,9 +231,13 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener public void onAlterTable(AlterTableEvent tableEvent) throws MetaException { Table before = tableEvent.getOldTable(); Table after = tableEvent.getNewTable(); + AlterTableMessage msg = MessageBuilder.getInstance() + .buildAlterTableMessage(before, after, tableEvent.getIsTruncateOp(), + tableEvent.getWriteId()); NotificationEvent event = - new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(), msgFactory - .buildAlterTableMessage(before, after, tableEvent.getIsTruncateOp(), tableEvent.getWriteId()).toString()); + new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(), + msgEncoder.getSerializer().serialize(msg) + ); event.setCatName(after.isSetCatName() ? after.getCatName() : DEFAULT_CATALOG_NAME); event.setDbName(after.getDbName()); event.setTableName(after.getTableName()); @@ -320,10 +350,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener Table t = partitionEvent.getTable(); PartitionFilesIterator fileIter = MetaStoreUtils.isExternalTable(t) ? null : new PartitionFilesIterator(partitionEvent.getPartitionIterator(), t); - String msg = msgFactory - .buildAddPartitionMessage(t, partitionEvent.getPartitionIterator(), fileIter).toString(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.ADD_PARTITION.toString(), msg); + EventMessage msg = MessageBuilder.getInstance() + .buildAddPartitionMessage(t, partitionEvent.getPartitionIterator(), fileIter); + MessageSerializer serializer = msgEncoder.getSerializer(); + + NotificationEvent event = new NotificationEvent(0, now(), + EventType.ADD_PARTITION.toString(), serializer.serialize(msg)); event.setCatName(t.isSetCatName() ? t.getCatName() : DEFAULT_CATALOG_NAME); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); @@ -337,9 +369,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener @Override public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException { Table t = partitionEvent.getTable(); - NotificationEvent event = - new NotificationEvent(0, now(), EventType.DROP_PARTITION.toString(), msgFactory - .buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString()); + DropPartitionMessage msg = + MessageBuilder.getInstance() + .buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()); + NotificationEvent event = new NotificationEvent(0, now(), EventType.DROP_PARTITION.toString(), + msgEncoder.getSerializer().serialize(msg)); event.setCatName(t.isSetCatName() ? t.getCatName() : DEFAULT_CATALOG_NAME); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); @@ -354,10 +388,13 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaException { Partition before = partitionEvent.getOldPartition(); Partition after = partitionEvent.getNewPartition(); + AlterPartitionMessage msg = MessageBuilder.getInstance() + .buildAlterPartitionMessage(partitionEvent.getTable(), before, after, + partitionEvent.getIsTruncateOp(), + partitionEvent.getWriteId()); NotificationEvent event = - new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(), msgFactory - .buildAlterPartitionMessage(partitionEvent.getTable(), before, after, partitionEvent.getIsTruncateOp(), - partitionEvent.getWriteId()).toString()); + new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(), + msgEncoder.getSerializer().serialize(msg)); event.setCatName(before.isSetCatName() ? before.getCatName() : DEFAULT_CATALOG_NAME); event.setDbName(before.getDbName()); event.setTableName(before.getTableName()); @@ -371,9 +408,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener @Override public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException { Database db = dbEvent.getDatabase(); + CreateDatabaseMessage msg = MessageBuilder.getInstance() + .buildCreateDatabaseMessage(db); NotificationEvent event = - new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), msgFactory - .buildCreateDatabaseMessage(db).toString()); + new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), + msgEncoder.getSerializer().serialize(msg)); event.setCatName(db.isSetCatalogName() ? db.getCatalogName() : DEFAULT_CATALOG_NAME); event.setDbName(db.getName()); process(event, dbEvent); @@ -386,9 +425,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener @Override public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException { Database db = dbEvent.getDatabase(); + DropDatabaseMessage msg = MessageBuilder.getInstance() + .buildDropDatabaseMessage(db); NotificationEvent event = - new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), msgFactory - .buildDropDatabaseMessage(db).toString()); + new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), + msgEncoder.getSerializer().serialize(msg)); event.setCatName(db.isSetCatalogName() ? db.getCatalogName() : DEFAULT_CATALOG_NAME); event.setDbName(db.getName()); process(event, dbEvent); @@ -402,9 +443,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener public void onAlterDatabase(AlterDatabaseEvent dbEvent) throws MetaException { Database oldDb = dbEvent.getOldDatabase(); Database newDb = dbEvent.getNewDatabase(); + AlterDatabaseMessage msg = MessageBuilder.getInstance() + .buildAlterDatabaseMessage(oldDb, newDb); NotificationEvent event = - new NotificationEvent(0, now(), EventType.ALTER_DATABASE.toString(), msgFactory - .buildAlterDatabaseMessage(oldDb, newDb).toString()); + new NotificationEvent(0, now(), EventType.ALTER_DATABASE.toString(), + msgEncoder.getSerializer().serialize(msg) + ); event.setCatName(oldDb.isSetCatalogName() ? oldDb.getCatalogName() : DEFAULT_CATALOG_NAME); event.setDbName(oldDb.getName()); process(event, dbEvent); @@ -417,9 +461,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener @Override public void onCreateFunction(CreateFunctionEvent fnEvent) throws MetaException { Function fn = fnEvent.getFunction(); + CreateFunctionMessage msg = MessageBuilder.getInstance() + .buildCreateFunctionMessage(fn); NotificationEvent event = - new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(), msgFactory - .buildCreateFunctionMessage(fn).toString()); + new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(), + msgEncoder.getSerializer().serialize(msg)); event.setCatName(fn.isSetCatName() ? fn.getCatName() : DEFAULT_CATALOG_NAME); event.setDbName(fn.getDbName()); process(event, fnEvent); @@ -432,9 +478,10 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener @Override public void onDropFunction(DropFunctionEvent fnEvent) throws MetaException { Function fn = fnEvent.getFunction(); + DropFunctionMessage msg = MessageBuilder.getInstance().buildDropFunctionMessage(fn); NotificationEvent event = - new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(), msgFactory - .buildDropFunctionMessage(fn).toString()); + new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(), + msgEncoder.getSerializer().serialize(msg)); event.setCatName(fn.isSetCatName() ? fn.getCatName() : DEFAULT_CATALOG_NAME); event.setDbName(fn.getDbName()); process(event, fnEvent); @@ -481,11 +528,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener @Override public void onInsert(InsertEvent insertEvent) throws MetaException { Table tableObj = insertEvent.getTableObj(); + InsertMessage msg = MessageBuilder.getInstance().buildInsertMessage(tableObj, + insertEvent.getPartitionObj(), insertEvent.isReplace(), + new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums())); NotificationEvent event = - new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage(tableObj, - insertEvent.getPartitionObj(), insertEvent.isReplace(), - new FileChksumIterator(insertEvent.getFiles(), insertEvent.getFileChecksums())) - .toString()); + new NotificationEvent(0, now(), EventType.INSERT.toString(), + msgEncoder.getSerializer().serialize(msg)); event.setCatName(tableObj.isSetCatName() ? tableObj.getCatName() : DEFAULT_CATALOG_NAME); event.setDbName(tableObj.getDbName()); event.setTableName(tableObj.getTableName()); @@ -495,10 +543,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener @Override public void onOpenTxn(OpenTxnEvent openTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) throws MetaException { int lastTxnIdx = openTxnEvent.getTxnIds().size() - 1; - OpenTxnMessage msg = msgFactory.buildOpenTxnMessage(openTxnEvent.getTxnIds().get(0), + OpenTxnMessage msg = + MessageBuilder.getInstance().buildOpenTxnMessage(openTxnEvent.getTxnIds().get(0), openTxnEvent.getTxnIds().get(lastTxnIdx)); NotificationEvent event = - new NotificationEvent(0, now(), EventType.OPEN_TXN.toString(), msg.toString()); + new NotificationEvent(0, now(), EventType.OPEN_TXN.toString(), + msgEncoder.getSerializer().serialize(msg)); try { addNotificationLog(event, openTxnEvent, dbConn, sqlGenerator); @@ -510,10 +560,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener @Override public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) throws MetaException { + CommitTxnMessage msg = + MessageBuilder.getInstance().buildCommitTxnMessage(commitTxnEvent.getTxnId()); + NotificationEvent event = - new NotificationEvent(0, now(), EventType.COMMIT_TXN.toString(), msgFactory.buildCommitTxnMessage( - commitTxnEvent.getTxnId()) - .toString()); + new NotificationEvent(0, now(), EventType.COMMIT_TXN.toString(), + msgEncoder.getSerializer().serialize(msg)); try { addNotificationLog(event, commitTxnEvent, dbConn, sqlGenerator); @@ -525,10 +577,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener @Override public void onAbortTxn(AbortTxnEvent abortTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) throws MetaException { + AbortTxnMessage msg = + MessageBuilder.getInstance().buildAbortTxnMessage(abortTxnEvent.getTxnId()); NotificationEvent event = - new NotificationEvent(0, now(), EventType.ABORT_TXN.toString(), msgFactory.buildAbortTxnMessage( - abortTxnEvent.getTxnId()) - .toString()); + new NotificationEvent(0, now(), EventType.ABORT_TXN.toString(), + msgEncoder.getSerializer().serialize(msg)); try { addNotificationLog(event, abortTxnEvent, dbConn, sqlGenerator); @@ -555,9 +608,10 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener public void onAddPrimaryKey(AddPrimaryKeyEvent addPrimaryKeyEvent) throws MetaException { List<SQLPrimaryKey> cols = addPrimaryKeyEvent.getPrimaryKeyCols(); if (cols.size() > 0) { - NotificationEvent event = - new NotificationEvent(0, now(), EventType.ADD_PRIMARYKEY.toString(), msgFactory - .buildAddPrimaryKeyMessage(addPrimaryKeyEvent.getPrimaryKeyCols()).toString()); + AddPrimaryKeyMessage msg = MessageBuilder.getInstance() + .buildAddPrimaryKeyMessage(addPrimaryKeyEvent.getPrimaryKeyCols()); + NotificationEvent event = new NotificationEvent(0, now(), EventType.ADD_PRIMARYKEY.toString(), + msgEncoder.getSerializer().serialize(msg)); event.setCatName(cols.get(0).isSetCatName() ? cols.get(0).getCatName() : DEFAULT_CATALOG_NAME); event.setDbName(cols.get(0).getTable_db()); event.setTableName(cols.get(0).getTable_name()); @@ -573,9 +627,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener public void onAddForeignKey(AddForeignKeyEvent addForeignKeyEvent) throws MetaException { List<SQLForeignKey> cols = addForeignKeyEvent.getForeignKeyCols(); if (cols.size() > 0) { + AddForeignKeyMessage msg = MessageBuilder.getInstance() + .buildAddForeignKeyMessage(addForeignKeyEvent.getForeignKeyCols()); NotificationEvent event = - new NotificationEvent(0, now(), EventType.ADD_FOREIGNKEY.toString(), msgFactory - .buildAddForeignKeyMessage(addForeignKeyEvent.getForeignKeyCols()).toString()); + new NotificationEvent(0, now(), EventType.ADD_FOREIGNKEY.toString(), + msgEncoder.getSerializer().serialize(msg)); event.setCatName(cols.get(0).isSetCatName() ? cols.get(0).getCatName() : DEFAULT_CATALOG_NAME); event.setDbName(cols.get(0).getPktable_db()); event.setTableName(cols.get(0).getPktable_name()); @@ -591,9 +647,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener public void onAddUniqueConstraint(AddUniqueConstraintEvent addUniqueConstraintEvent) throws MetaException { List<SQLUniqueConstraint> cols = addUniqueConstraintEvent.getUniqueConstraintCols(); if (cols.size() > 0) { + AddUniqueConstraintMessage msg = MessageBuilder.getInstance() + .buildAddUniqueConstraintMessage(addUniqueConstraintEvent.getUniqueConstraintCols()); NotificationEvent event = - new NotificationEvent(0, now(), EventType.ADD_UNIQUECONSTRAINT.toString(), msgFactory - .buildAddUniqueConstraintMessage(addUniqueConstraintEvent.getUniqueConstraintCols()).toString()); + new NotificationEvent(0, now(), EventType.ADD_UNIQUECONSTRAINT.toString(), + msgEncoder.getSerializer().serialize(msg) + ); event.setCatName(cols.get(0).isSetCatName() ? cols.get(0).getCatName() : DEFAULT_CATALOG_NAME); event.setDbName(cols.get(0).getTable_db()); event.setTableName(cols.get(0).getTable_name()); @@ -609,9 +668,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener public void onAddNotNullConstraint(AddNotNullConstraintEvent addNotNullConstraintEvent) throws MetaException { List<SQLNotNullConstraint> cols = addNotNullConstraintEvent.getNotNullConstraintCols(); if (cols.size() > 0) { + AddNotNullConstraintMessage msg = MessageBuilder.getInstance() + .buildAddNotNullConstraintMessage(addNotNullConstraintEvent.getNotNullConstraintCols()); NotificationEvent event = - new NotificationEvent(0, now(), EventType.ADD_NOTNULLCONSTRAINT.toString(), msgFactory - .buildAddNotNullConstraintMessage(addNotNullConstraintEvent.getNotNullConstraintCols()).toString()); + new NotificationEvent(0, now(), EventType.ADD_NOTNULLCONSTRAINT.toString(), + msgEncoder.getSerializer().serialize(msg) + ); event.setCatName(cols.get(0).isSetCatName() ? cols.get(0).getCatName() : DEFAULT_CATALOG_NAME); event.setDbName(cols.get(0).getTable_db()); event.setTableName(cols.get(0).getTable_name()); @@ -628,9 +690,11 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener String dbName = dropConstraintEvent.getDbName(); String tableName = dropConstraintEvent.getTableName(); String constraintName = dropConstraintEvent.getConstraintName(); + DropConstraintMessage msg = MessageBuilder.getInstance() + .buildDropConstraintMessage(dbName, tableName, constraintName); NotificationEvent event = - new NotificationEvent(0, now(), EventType.DROP_CONSTRAINT.toString(), msgFactory - .buildDropConstraintMessage(dbName, tableName, constraintName).toString()); + new NotificationEvent(0, now(), EventType.DROP_CONSTRAINT.toString(), + msgEncoder.getSerializer().serialize(msg)); event.setCatName(dropConstraintEvent.getCatName()); event.setDbName(dbName); event.setTableName(tableName); @@ -646,9 +710,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener throws MetaException { String tableName = allocWriteIdEvent.getTableName(); String dbName = allocWriteIdEvent.getDbName(); + AllocWriteIdMessage msg = MessageBuilder.getInstance() + .buildAllocWriteIdMessage(allocWriteIdEvent.getTxnToWriteIdList(), dbName, tableName); NotificationEvent event = - new NotificationEvent(0, now(), EventType.ALLOC_WRITE_ID.toString(), msgFactory - .buildAllocWriteIdMessage(allocWriteIdEvent.getTxnToWriteIdList(), dbName, tableName).toString()); + new NotificationEvent(0, now(), EventType.ALLOC_WRITE_ID.toString(), + msgEncoder.getSerializer().serialize(msg) + ); event.setTableName(tableName); event.setDbName(dbName); try { @@ -661,11 +728,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener @Override public void onAcidWrite(AcidWriteEvent acidWriteEvent, Connection dbConn, SQLGenerator sqlGenerator) throws MetaException { - AcidWriteMessage msg = msgFactory.buildAcidWriteMessage(acidWriteEvent, + AcidWriteMessage msg = MessageBuilder.getInstance().buildAcidWriteMessage(acidWriteEvent, new FileChksumIterator(acidWriteEvent.getFiles(), acidWriteEvent.getChecksums(), acidWriteEvent.getSubDirs())); - NotificationEvent event = new NotificationEvent(0, now(), EventType.ACID_WRITE.toString(), msg.toString()); - event.setMessageFormat(msgFactory.getMessageFormat()); + NotificationEvent event = new NotificationEvent(0, now(), EventType.ACID_WRITE.toString(), + msgEncoder.getSerializer().serialize(msg)); + event.setMessageFormat(msgEncoder.getMessageFormat()); event.setDbName(acidWriteEvent.getDatabase()); event.setTableName(acidWriteEvent.getTable()); try { @@ -848,7 +916,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener ResultSet rs = null; try { stmt = dbConn.createStatement(); - event.setMessageFormat(msgFactory.getMessageFormat()); + event.setMessageFormat(msgEncoder.getMessageFormat()); if (sqlGenerator.getDbProduct() == MYSQL) { stmt.execute("SET @@session.sql_mode=ANSI_QUOTES"); @@ -910,7 +978,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener * DB_NOTIFICATION_EVENT_ID_KEY_NAME for future reference by other listeners. */ private void process(NotificationEvent event, ListenerEvent listenerEvent) throws MetaException { - event.setMessageFormat(msgFactory.getMessageFormat()); + event.setMessageFormat(msgEncoder.getMessageFormat()); LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(), event.getMessage()); HMSHandler.getMSForConf(conf).addNotificationEvent(event); http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java index 4707d0e..17d3b73 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONCreateFunctionMessage.java @@ -20,6 +20,7 @@ package org.apache.hive.hcatalog.messaging.json; import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import org.apache.hive.hcatalog.messaging.CreateFunctionMessage; import org.apache.thrift.TException; import org.codehaus.jackson.annotate.JsonProperty; @@ -46,7 +47,7 @@ public class JSONCreateFunctionMessage extends CreateFunctionMessage { this.db = fn.getDbName(); this.timestamp = timestamp; try { - this.functionObjJson = JSONMessageFactory.createFunctionObjJson(fn); + this.functionObjJson = MessageBuilder.createFunctionObjJson(fn); } catch (TException ex) { throw new IllegalArgumentException("Could not serialize Function object", ex); } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java index 010c4a6..7fb7d1c 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONDropFunctionMessage.java @@ -20,6 +20,7 @@ package org.apache.hive.hcatalog.messaging.json; import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import org.apache.hive.hcatalog.messaging.DropFunctionMessage; import org.apache.thrift.TException; import org.codehaus.jackson.annotate.JsonProperty; @@ -46,7 +47,7 @@ public class JSONDropFunctionMessage extends DropFunctionMessage { this.db = fn.getDbName(); this.timestamp = timestamp; try { - this.functionObjJson = JSONMessageFactory.createFunctionObjJson(fn); + this.functionObjJson = MessageBuilder.createFunctionObjJson(fn); } catch (TException ex) { throw new IllegalArgumentException("Could not serialize Function object", ex); } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java index ec573a3..770dd1e 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java @@ -20,16 +20,14 @@ package org.apache.hive.hcatalog.messaging.json; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import javax.annotation.Nullable; - import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import org.apache.hive.hcatalog.messaging.AddPartitionMessage; import org.apache.hive.hcatalog.messaging.AlterPartitionMessage; import org.apache.hive.hcatalog.messaging.AlterTableMessage; @@ -43,15 +41,9 @@ import org.apache.hive.hcatalog.messaging.DropTableMessage; import org.apache.hive.hcatalog.messaging.InsertMessage; import org.apache.hive.hcatalog.messaging.MessageDeserializer; import org.apache.hive.hcatalog.messaging.MessageFactory; -import org.apache.thrift.TException; -import org.apache.thrift.TSerializer; -import org.apache.thrift.protocol.TJSONProtocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; - /** * The JSON implementation of the MessageFactory. Constructs JSON implementations of * each message-type. @@ -111,7 +103,7 @@ public class JSONMessageFactory extends MessageFactory { public AddPartitionMessage buildAddPartitionMessage(Table table, Iterator<Partition> partitionsIterator) { return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), table.getTableName(), table.getTableType(), - getPartitionKeyValues(table, partitionsIterator), now()); + MessageBuilder.getPartitionKeyValues(table, partitionsIterator), now()); } @Override @@ -119,14 +111,14 @@ public class JSONMessageFactory extends MessageFactory { Long writeId) { return new JSONAlterPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, before.getDbName(), before.getTableName(), table.getTableType(), - getPartitionKeyValues(table,before), writeId, now()); + MessageBuilder.getPartitionKeyValues(table,before), writeId, now()); } @Override public DropPartitionMessage buildDropPartitionMessage(Table table, Iterator<Partition> partitions) { return new JSONDropPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), table.getTableName(), table.getTableType(), - getPartitionKeyValues(table, partitions), now()); + MessageBuilder.getPartitionKeyValues(table, partitions), now()); } @Override @@ -159,27 +151,4 @@ public class JSONMessageFactory extends MessageFactory { return System.currentTimeMillis() / 1000; } - private static Map<String, String> getPartitionKeyValues(Table table, Partition partition) { - Map<String, String> partitionKeys = new LinkedHashMap<String, String>(); - for (int i=0; i<table.getPartitionKeysSize(); ++i) { - partitionKeys.put(table.getPartitionKeys().get(i).getName(), - partition.getValues().get(i)); - } - return partitionKeys; - } - - private static List<Map<String, String>> getPartitionKeyValues(final Table table, Iterator<Partition> iterator) { - return Lists.newArrayList(Iterators.transform(iterator, new com.google.common.base.Function<Partition, Map<String, String>>() { - @Override - public Map<String, String> apply(@Nullable Partition partition) { - return getPartitionKeyValues(table, partition); - } - })); - } - - static String createFunctionObjJson(Function functionObj) throws TException { - TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); - return serializer.toString(functionObj, "UTF-8"); - } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java ---------------------------------------------------------------------- diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index a00ea17..dc555a4 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -93,6 +93,7 @@ import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; import org.apache.hadoop.hive.metastore.messaging.InsertMessage; import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder; import org.apache.hadoop.hive.ql.DriverFactory; import org.apache.hadoop.hive.ql.IDriver; import org.apache.hadoop.hive.ql.session.SessionState; @@ -119,7 +120,16 @@ public class TestDbNotificationListener { private static Map<String, String> emptyParameters = new HashMap<String, String>(); private static IMetaStoreClient msClient; private static IDriver driver; - private static MessageDeserializer md = null; + private static MessageDeserializer md; + + static { + try { + md = MessageFactory.getInstance(JSONMessageEncoder.FORMAT).getDeserializer(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + private int startTime; private long firstEventId; private final String testTempDir = Paths.get(System.getProperty("java.io.tmpdir"), "testDbNotif").toString(); @@ -267,7 +277,7 @@ public class TestDbNotificationListener { SessionState.start(new CliSessionState(conf)); msClient = new HiveMetaStoreClient(conf); driver = DriverFactory.newDriver(conf); - md = MessageFactory.getInstance().getDeserializer(); + md = JSONMessageEncoder.getInstance().getDeserializer(); bcompat = new ReplicationV1CompatRule(msClient, conf, testsToSkipForReplV1BackwardCompatTesting ); } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcidTablesWithJsonMessage.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcidTablesWithJsonMessage.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcidTablesWithJsonMessage.java new file mode 100644 index 0000000..c16799d --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcidTablesWithJsonMessage.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.TestRule; + +import java.util.Collections; + +public class TestReplAcidTablesWithJsonMessage extends TestReplicationScenariosAcidTables { + + @Rule + public TestRule replV1BackwardCompat; + + @BeforeClass + public static void classLevelSetup() throws Exception { + internalBeforeClassSetup(Collections.emptyMap(), TestReplAcidTablesWithJsonMessage.class); + } + + @Before + public void setup() throws Throwable { + replV1BackwardCompat = primary.getReplivationV1CompatRule(Collections.emptyList()); + super.setup(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java new file mode 100644 index 0000000..0ec0275 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplAcrossInstancesWithJsonMessageFormat.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.TestRule; + +import java.util.ArrayList; +import java.util.Collections; + +public class TestReplAcrossInstancesWithJsonMessageFormat + extends TestReplicationScenariosAcrossInstances { + + @Rule + public TestRule replV1BackwardCompat; + + @BeforeClass + public static void classLevelSetup() throws Exception { + internalBeforeClassSetup(Collections.emptyMap(), TestReplicationScenarios.class); + } + + @Before + public void setup() throws Throwable { + replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>()); + super.setup(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplIncrementalLoadAcidTablesWithJsonMessage.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplIncrementalLoadAcidTablesWithJsonMessage.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplIncrementalLoadAcidTablesWithJsonMessage.java new file mode 100644 index 0000000..792ec1c --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplIncrementalLoadAcidTablesWithJsonMessage.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.TestRule; + +import java.util.Collections; + +public class TestReplIncrementalLoadAcidTablesWithJsonMessage + extends TestReplicationScenariosIncrementalLoadAcidTables { + + @Rule + public TestRule replV1BackwardCompat; + + @BeforeClass + public static void classLevelSetup() throws Exception { + internalBeforeClassSetup(Collections.emptyMap(), + TestReplIncrementalLoadAcidTablesWithJsonMessage.class); + } + + @Before + public void setup() throws Throwable { + replV1BackwardCompat = primary.getReplivationV1CompatRule(Collections.emptyList()); + super.setup(); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java new file mode 100644 index 0000000..faf1ced --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.TestRule; + +import java.util.ArrayList; +import java.util.Collections; + +public class TestReplWithJsonMessageFormat extends TestReplicationScenarios { + @Rule + public TestRule replV1BackwardCompatibleRule = + new ReplicationV1CompatRule(metaStoreClient, hconf, + new ArrayList<>(Collections.singletonList("testEventFilters"))); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + internalBeforeClassSetup(Collections.emptyMap()); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 9c35aa6..75cd68a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -18,11 +18,11 @@ package org.apache.hadoop.hive.ql.parse; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -33,7 +33,6 @@ import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; import org.apache.hadoop.hive.metastore.ObjectStore; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest; import org.apache.hadoop.hive.metastore.api.NotificationEvent; @@ -47,13 +46,18 @@ import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UniqueConstraintsRequest; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; +import org.apache.hadoop.hive.metastore.messaging.MessageEncoder; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.ql.DriverFactory; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.IDriver; import org.apache.hadoop.hive.ql.exec.DDLTask; import org.apache.hadoop.hive.ql.exec.MoveTask; @@ -62,45 +66,42 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.authorize.ProxyUsers; -import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule; import org.apache.thrift.TException; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; -import org.junit.rules.TestRule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.ql.ErrorMsg; import javax.annotation.Nullable; - import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; -import org.junit.Assert; +import static org.junit.Assert.assertTrue; public class TestReplicationScenarios { @@ -115,18 +116,14 @@ public class TestReplicationScenarios { private final static String TEST_PATH = System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR + tid; - private static HiveConf hconf; + static HiveConf hconf; + static HiveMetaStoreClient metaStoreClient; private static IDriver driver; - private static HiveMetaStoreClient metaStoreClient; private static String proxySettingName; - static HiveConf hconfMirror; - static IDriver driverMirror; - static HiveMetaStoreClient metaStoreClientMirror; + private static HiveConf hconfMirror; + private static IDriver driverMirror; + private static HiveMetaStoreClient metaStoreClientMirror; - @Rule - public TestRule replV1BackwardCompatibleRule = - new ReplicationV1CompatRule(metaStoreClient, hconf, - new ArrayList<>(Arrays.asList("testEventFilters"))); // Make sure we skip backward-compat checking for those tests that don't generate events protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); @@ -141,23 +138,30 @@ public class TestReplicationScenarios { @BeforeClass public static void setUpBeforeClass() throws Exception { + HashMap<String, String> overrideProperties = new HashMap<>(); + overrideProperties.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), + GzipJSONMessageEncoder.class.getCanonicalName()); + internalBeforeClassSetup(overrideProperties); + } + + static void internalBeforeClassSetup(Map<String, String> additionalProperties) + throws Exception { hconf = new HiveConf(TestReplicationScenarios.class); - String metastoreUri = System.getProperty("test."+HiveConf.ConfVars.METASTOREURIS.varname); + String metastoreUri = System.getProperty("test."+MetastoreConf.ConfVars.THRIFT_URIS.getHiveName()); if (metastoreUri != null) { - hconf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUri); + hconf.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreUri); return; } - hconf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, + hconf.set(MetastoreConf.ConfVars.TRANSACTIONAL_EVENT_LISTENERS.getHiveName(), DBNOTIF_LISTENER_CLASSNAME); // turn on db notification listener on metastore hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true); hconf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); hconf.setVar(HiveConf.ConfVars.REPLCMDIR, TEST_PATH + "/cmroot/"); proxySettingName = "hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts"; hconf.set(proxySettingName, "*"); - MetaStoreTestUtils.startMetaStoreWithRetry(hconf); hconf.setVar(HiveConf.ConfVars.REPLDIR,TEST_PATH + "/hrepl/"); - hconf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); + hconf.set(MetastoreConf.ConfVars.THRIFT_CONNECTION_RETRIES.getHiveName(), "3"); hconf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hconf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); hconf.set(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true"); @@ -166,11 +170,17 @@ public class TestReplicationScenarios { hconf.set(HiveConf.ConfVars.HIVE_TXN_MANAGER.varname, "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); hconf.set(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL.varname, - "org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore"); + "org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore"); hconf.setBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES, true); System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); + additionalProperties.forEach((key, value) -> { + hconf.set(key, value); + }); + + MetaStoreTestUtils.startMetaStoreWithRetry(hconf); + Path testPath = new Path(TEST_PATH); FileSystem fs = FileSystem.get(testPath.toUri(),hconf); fs.mkdirs(testPath); @@ -3077,12 +3087,12 @@ public class TestReplicationScenarios { // that match a provided message format IMetaStoreClient.NotificationFilter restrictByDefaultMessageFormat = - new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat()); + new MessageFormatFilter(JSONMessageEncoder.FORMAT); IMetaStoreClient.NotificationFilter restrictByArbitraryMessageFormat = - new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat() + "_bogus"); + new MessageFormatFilter(JSONMessageEncoder.FORMAT + "_bogus"); NotificationEvent dummyEvent = createDummyEvent(dbname,tblname,0); - assertEquals(MessageFactory.getInstance().getMessageFormat(),dummyEvent.getMessageFormat()); + assertEquals(JSONMessageEncoder.FORMAT,dummyEvent.getMessageFormat()); assertFalse(restrictByDefaultMessageFormat.accept(null)); assertTrue(restrictByDefaultMessageFormat.accept(dummyEvent)); @@ -3431,19 +3441,25 @@ public class TestReplicationScenarios { } private NotificationEvent createDummyEvent(String dbname, String tblname, long evid) { - MessageFactory msgFactory = MessageFactory.getInstance(); + MessageEncoder msgEncoder = null; + try { + msgEncoder = MessageFactory.getInstance(JSONMessageEncoder.FORMAT); + } catch (Exception e) { + throw new RuntimeException(e); + } Table t = new Table(); t.setDbName(dbname); t.setTableName(tblname); NotificationEvent event = new NotificationEvent( evid, (int)System.currentTimeMillis(), - MessageFactory.CREATE_TABLE_EVENT, - msgFactory.buildCreateTableMessage(t, Arrays.asList("/tmp/").iterator()).toString() + MessageBuilder.CREATE_TABLE_EVENT, + MessageBuilder.getInstance().buildCreateTableMessage(t, Arrays.asList("/tmp/").iterator()) + .toString() ); event.setDbName(t.getDbName()); event.setTableName(t.getTableName()); - event.setMessageFormat(msgFactory.getMessageFormat()); + event.setMessageFormat(msgEncoder.getMessageFormat()); return event; } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index e043e54..4ceb9fa 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -17,16 +17,16 @@ */ package org.apache.hadoop.hive.ql.parse; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest; -import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse; import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; @@ -41,7 +41,6 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.Utils; import org.junit.rules.TestName; -import org.junit.rules.TestRule; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -54,13 +53,11 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Collections; -import com.google.common.collect.Lists; -import org.junit.Ignore; +import java.util.Map; import static org.junit.Assert.assertTrue; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; @@ -72,12 +69,10 @@ public class TestReplicationScenariosAcidTables { @Rule public final TestName testName = new TestName(); - @Rule - public TestRule replV1BackwardCompat; - protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); - private static WarehouseInstance primary, replica, replicaNonAcid; - private static HiveConf conf; + static WarehouseInstance primary; + private static WarehouseInstance replica, replicaNonAcid; + static HiveConf conf; private String primaryDbName, replicatedDbName, primaryDbNameExtra; private enum OperationType { REPL_TEST_ACID_INSERT, REPL_TEST_ACID_INSERT_SELECT, REPL_TEST_ACID_CTAS, @@ -87,25 +82,38 @@ public class TestReplicationScenariosAcidTables { @BeforeClass public static void classLevelSetup() throws Exception { - conf = new HiveConf(TestReplicationScenariosAcidTables.class); + HashMap<String, String> overrides = new HashMap<>(); + overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), + GzipJSONMessageEncoder.class.getCanonicalName()); + + internalBeforeClassSetup(overrides, TestReplicationScenariosAcidTables.class); + } + + static void internalBeforeClassSetup(Map<String, String> overrides, + Class clazz) throws Exception { + + conf = new HiveConf(clazz); conf.set("dfs.client.use.datanode.hostname", "true"); conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); MiniDFSCluster miniDFSCluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); - HashMap<String, String> overridesForHiveConf = new HashMap<String, String>() {{ - put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); - put("hive.support.concurrency", "true"); - put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); - put("hive.metastore.client.capability.check", "false"); - put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); - put("hive.exec.dynamic.partition.mode", "nonstrict"); - put("hive.strict.checks.bucketing", "false"); - put("hive.mapred.mode", "nonstrict"); - put("mapred.input.dir.recursive", "true"); - put("hive.metastore.disallow.incompatible.col.type.changes", "false"); + new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + HashMap<String, String> acidEnableConf = new HashMap<String, String>() {{ + put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); + put("hive.support.concurrency", "true"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); + put("hive.metastore.client.capability.check", "false"); + put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); + put("hive.exec.dynamic.partition.mode", "nonstrict"); + put("hive.strict.checks.bucketing", "false"); + put("hive.mapred.mode", "nonstrict"); + put("mapred.input.dir.recursive", "true"); + put("hive.metastore.disallow.incompatible.col.type.changes", "false"); }}; - primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); - replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); + + acidEnableConf.putAll(overrides); + + primary = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf); + replica = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf); HashMap<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{ put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); put("hive.support.concurrency", "false"); @@ -123,7 +131,6 @@ public class TestReplicationScenariosAcidTables { @Before public void setup() throws Throwable { - replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>()); primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); replicatedDbName = "replicated_" + primaryDbName; primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" + http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 61473a8..7e8caf0 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -19,12 +19,13 @@ package org.apache.hadoop.hive.ql.parse; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.util.DependencyResolver; @@ -43,7 +44,6 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; -import org.junit.rules.TestRule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; @@ -53,7 +53,6 @@ import java.io.IOException; import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -72,33 +71,40 @@ import static org.junit.Assert.assertTrue; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.ErrorMsg; -import org.junit.Assert; public class TestReplicationScenariosAcrossInstances { @Rule public final TestName testName = new TestName(); - @Rule - public TestRule replV1BackwardCompat; - protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); - private static WarehouseInstance primary, replica; + static WarehouseInstance primary; + private static WarehouseInstance replica; private String primaryDbName, replicatedDbName; private static HiveConf conf; @BeforeClass public static void classLevelSetup() throws Exception { - conf = new HiveConf(TestReplicationScenarios.class); + HashMap<String, String> overrides = new HashMap<>(); + overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), + GzipJSONMessageEncoder.class.getCanonicalName()); + + internalBeforeClassSetup(overrides, TestReplicationScenarios.class); + } + + static void internalBeforeClassSetup(Map<String, String> overrides, Class clazz) + throws Exception { + conf = new HiveConf(clazz); conf.set("dfs.client.use.datanode.hostname", "true"); conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); - HashMap<String, String> overridesForHiveConf = new HashMap<String, String>() {{ + Map<String, String> localOverrides = new HashMap<String, String>() {{ put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true"); }}; - primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); - replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); + localOverrides.putAll(overrides); + primary = new WarehouseInstance(LOG, miniDFSCluster, localOverrides); + replica = new WarehouseInstance(LOG, miniDFSCluster, localOverrides); } @AfterClass @@ -109,7 +115,6 @@ public class TestReplicationScenariosAcrossInstances { @Before public void setup() throws Throwable { - replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>()); primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); replicatedDbName = "replicated_" + primaryDbName; primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" + @@ -323,7 +328,8 @@ public class TestReplicationScenariosAcrossInstances { "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')") .run("create table table1 (i int, j int)") .run("insert into table1 values (1,2)") - .dump(primaryDbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'")); + .dump(primaryDbName, null, + Collections.singletonList("'hive.repl.dump.metadata.only'='true'")); replica.load(replicatedDbName, tuple.dumpLocation) .run("use " + replicatedDbName) @@ -419,7 +425,7 @@ public class TestReplicationScenariosAcrossInstances { .run("create table table2 (a int, city string) partitioned by (country string)") .run("create table table3 (i int, j int)") .run("insert into table1 values (1,2)") - .dump(dbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'")); + .dump(dbName, null, Collections.singletonList("'hive.repl.dump.metadata.only'='true'")); replica.load(replicatedDbName, tuple.dumpLocation) .run("use " + replicatedDbName) @@ -433,7 +439,8 @@ public class TestReplicationScenariosAcrossInstances { .run("alter table table1 rename to renamed_table1") .run("insert into table2 partition(country='india') values (1,'mumbai') ") .run("create table table4 (i int, j int)") - .dump(dbName, tuple.lastReplicationId, Arrays.asList("'hive.repl.dump.metadata.only'='true'")); + .dump(dbName, tuple.lastReplicationId, + Collections.singletonList("'hive.repl.dump.metadata.only'='true'")); replica.load(replicatedDbName, tuple.dumpLocation) .run("use " + replicatedDbName) @@ -467,7 +474,7 @@ public class TestReplicationScenariosAcrossInstances { SOURCE_OF_REPLICATION + "' = '1,2,3')") .run("use " + dbTwo) .run("create table t1 (i int, j int)") - .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'")); + .dump("`*`", null, Collections.singletonList("'hive.repl.dump.metadata.only'='true'")); /* Due to the limitation that we can only have one instance of Persistence Manager Factory in a JVM @@ -526,7 +533,7 @@ public class TestReplicationScenariosAcrossInstances { .run("use " + dbOne) .run("create table t1 (i int, j int) partitioned by (load_date date) " + "clustered by(i) into 2 buckets stored as orc tblproperties ('transactional'='true') ") - .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'")); + .dump("`*`", null, Collections.singletonList("'hive.repl.dump.metadata.only'='true'")); String dbTwo = primaryDbName + randomTwo; WarehouseInstance.Tuple incrementalTuple = primary @@ -905,15 +912,20 @@ public class TestReplicationScenariosAcrossInstances { // Incremental load to non existing db should return database not exist error. tuple = primary.dump("someJunkDB", tuple.lastReplicationId); - CommandProcessorResponse response = replica.runCommand("REPL LOAD someJunkDB from " + tuple.dumpLocation); - response.getErrorMessage().toLowerCase().contains("org.apache.hadoop.hive.ql.metadata.hiveException: " + - "database does not exist"); + CommandProcessorResponse response = + replica.runCommand("REPL LOAD someJunkDB from '" + tuple.dumpLocation + "'"); + assertTrue(response.getErrorMessage().toLowerCase() + .contains("org.apache.hadoop.hive.ql.exec.DDLTask. Database does not exist: someJunkDB" + .toLowerCase())); // Bootstrap load from an empty dump directory should return empty load directory error. tuple = primary.dump("someJunkDB", null); - response = replica.runCommand("REPL LOAD someJunkDB from " + tuple.dumpLocation); - response.getErrorMessage().toLowerCase().contains("org.apache.hadoop.hive.ql.parse.semanticException:" + - " no data to load in path"); + response = replica.runCommand("REPL LOAD someJunkDB from '" + tuple.dumpLocation+"'"); + assertTrue(response.getErrorMessage().toLowerCase() + .contains( + "semanticException no data to load in path" + .toLowerCase()) + ); primary.run(" drop database if exists " + testDbName + " cascade"); } @@ -935,7 +947,8 @@ public class TestReplicationScenariosAcrossInstances { .run("insert into table3 partition(country='india') values(3)") .dump(primaryDbName, bootstrapTuple.lastReplicationId); - replica.load(replicatedDbName, incremental.dumpLocation, Arrays.asList("'hive.repl.approx.max.load.tasks'='10'")) + replica.load(replicatedDbName, incremental.dumpLocation, + Collections.singletonList("'hive.repl.approx.max.load.tasks'='10'")) .status(replicatedDbName) .verifyResult(incremental.lastReplicationId) .run("use " + replicatedDbName) @@ -959,7 +972,8 @@ public class TestReplicationScenariosAcrossInstances { FileStatus[] fileStatus = fs.listStatus(path); int numEvents = fileStatus.length - 1; //one is metadata file - replica.load(replicatedDbName, incremental.dumpLocation, Arrays.asList("'hive.repl.approx.max.load.tasks'='1'")) + replica.load(replicatedDbName, incremental.dumpLocation, + Collections.singletonList("'hive.repl.approx.max.load.tasks'='1'")) .run("use " + replicatedDbName) .run("show tables") .verifyResults(new String[] {"table1", "table2", "table3", "table4", "table5" }) @@ -1112,7 +1126,7 @@ public class TestReplicationScenariosAcrossInstances { .run("show tables") .verifyResults(new String[] { "t1", "t2" }) .run("select id from t1") - .verifyResults(Arrays.asList("10")) + .verifyResults(Collections.singletonList("10")) .run("select country from t2 order by country") .verifyResults(Arrays.asList("india", "uk", "us")); verifyIfCkptSet(replica, replicatedDbName, tuple.dumpLocation); @@ -1154,9 +1168,8 @@ public class TestReplicationScenariosAcrossInstances { // also not loaded. BehaviourInjection<CallerArguments, Boolean> callerVerifier = new BehaviourInjection<CallerArguments, Boolean>() { - @Nullable @Override - public Boolean apply(@Nullable CallerArguments args) { + public Boolean apply(CallerArguments args) { injectionPathCalled = true; if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.constraintTblName != null)) { LOG.warn("Verifier - DB: " + String.valueOf(args.dbName) @@ -1197,9 +1210,8 @@ public class TestReplicationScenariosAcrossInstances { // Verify if create table is not called on table t1 but called for t2 and t3. // Also, allow constraint creation only on t1 and t3. Foreign key creation on t2 fails. callerVerifier = new BehaviourInjection<CallerArguments, Boolean>() { - @Nullable @Override - public Boolean apply(@Nullable CallerArguments args) { + public Boolean apply(CallerArguments args) { injectionPathCalled = true; if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.funcName != null)) { LOG.warn("Verifier - DB: " + String.valueOf(args.dbName) + " Func: " + String.valueOf(args.funcName)); @@ -1235,9 +1247,8 @@ public class TestReplicationScenariosAcrossInstances { // Verify if no create table/function calls. Only add foreign key constraints on table t2. callerVerifier = new BehaviourInjection<CallerArguments, Boolean>() { - @Nullable @Override - public Boolean apply(@Nullable CallerArguments args) { + public Boolean apply(CallerArguments args) { injectionPathCalled = true; if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.tblName != null)) { LOG.warn("Verifier - DB: " + String.valueOf(args.dbName) @@ -1307,7 +1318,7 @@ public class TestReplicationScenariosAcrossInstances { }; InjectableBehaviourObjectStore.setGetPartitionBehaviour(getPartitionStub); - List<String> withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'"); + List<String> withConfigs = Collections.singletonList("'hive.repl.approx.max.load.tasks'='1'"); replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs); InjectableBehaviourObjectStore.resetGetPartitionBehaviour(); // reset the behaviour getPartitionStub.assertInjectionsPerformed(true, false); @@ -1318,7 +1329,7 @@ public class TestReplicationScenariosAcrossInstances { .run("show tables") .verifyResults(new String[] {"t2" }) .run("select country from t2 order by country") - .verifyResults(Arrays.asList("india")) + .verifyResults(Collections.singletonList("india")) .run("show functions like '" + replicatedDbName + "*'") .verifyResult(replicatedDbName + ".testFunctionOne"); @@ -1378,7 +1389,8 @@ public class TestReplicationScenariosAcrossInstances { @Test public void testMoveOptimizationIncrementalFailureAfterCopyReplace() throws Throwable { - List<String> withConfigs = Arrays.asList("'hive.repl.enable.move.optimization'='true'"); + List<String> withConfigs = + Collections.singletonList("'hive.repl.enable.move.optimization'='true'"); String replicatedDbName_CM = replicatedDbName + "_CM"; WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .run("create table t2 (place string) partitioned by (country string)") @@ -1399,7 +1411,8 @@ public class TestReplicationScenariosAcrossInstances { @Test public void testMoveOptimizationIncrementalFailureAfterCopy() throws Throwable { - List<String> withConfigs = Arrays.asList("'hive.repl.enable.move.optimization'='true'"); + List<String> withConfigs = + Collections.singletonList("'hive.repl.enable.move.optimization'='true'"); String replicatedDbName_CM = replicatedDbName + "_CM"; WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .run("create table t2 (place string) partitioned by (country string)") @@ -1417,16 +1430,16 @@ public class TestReplicationScenariosAcrossInstances { testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t2", "INSERT", tuple); } - private void testMoveOptimization(String primarydb, String replicadb, String replicatedDbName_CM, + private void testMoveOptimization(String primaryDb, String replicaDb, String replicatedDbName_CM, String tbl, String eventType, WarehouseInstance.Tuple tuple) throws Throwable { - List<String> withConfigs = Arrays.asList("'hive.repl.enable.move.optimization'='true'"); + List<String> withConfigs = + Collections.singletonList("'hive.repl.enable.move.optimization'='true'"); // fail add notification for given event type. BehaviourInjection<NotificationEvent, Boolean> callerVerifier = new BehaviourInjection<NotificationEvent, Boolean>() { - @Nullable @Override - public Boolean apply(@Nullable NotificationEvent entry) { + public Boolean apply(NotificationEvent entry) { if (entry.getEventType().equalsIgnoreCase(eventType) && entry.getTableName().equalsIgnoreCase(tbl)) { injectionPathCalled = true; LOG.warn("Verifier - DB: " + String.valueOf(entry.getDbName()) @@ -1440,19 +1453,19 @@ public class TestReplicationScenariosAcrossInstances { InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier); try { - replica.loadFailure(replicadb, tuple.dumpLocation, withConfigs); + replica.loadFailure(replicaDb, tuple.dumpLocation, withConfigs); } finally { InjectableBehaviourObjectStore.resetAddNotificationModifier(); } callerVerifier.assertInjectionsPerformed(true, false); - replica.load(replicadb, tuple.dumpLocation, withConfigs); + replica.load(replicaDb, tuple.dumpLocation, withConfigs); - replica.run("use " + replicadb) + replica.run("use " + replicaDb) .run("select country from " + tbl + " where country == 'india'") .verifyResults(Arrays.asList("india")); - primary.run("use " + primarydb) + primary.run("use " + primaryDb) .run("drop table " + tbl); InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
