http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java index 3fe8b58..314ca48 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java @@ -17,32 +17,19 @@ */ package org.apache.hadoop.hive.ql.parse; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest; -import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse; -import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; -import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; -import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; -import org.apache.hadoop.hive.metastore.txn.TxnStore; -import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.shims.Utils; -import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; -import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments; -import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; + import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; + import org.junit.rules.TestName; -import org.junit.rules.TestRule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -50,13 +37,11 @@ import org.junit.BeforeClass; import org.junit.AfterClass; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; -import javax.annotation.Nullable; -import java.util.Collections; +import java.util.Map; + import com.google.common.collect.Lists; -import org.junit.Ignore; /** * TestReplicationScenariosAcidTables - test replication for ACID tables @@ -65,11 +50,9 @@ public class TestReplicationScenariosIncrementalLoadAcidTables { @Rule public final TestName testName = new TestName(); - @Rule - public TestRule replV1BackwardCompat; - protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenariosIncrementalLoadAcidTables.class); - private static WarehouseInstance primary, replica, replicaNonAcid; + static WarehouseInstance primary; + private static WarehouseInstance replica, replicaNonAcid; private static HiveConf conf; private String primaryDbName, replicatedDbName, primaryDbNameExtra; private enum OperationType { @@ -80,12 +63,21 @@ public class TestReplicationScenariosIncrementalLoadAcidTables { @BeforeClass public static void classLevelSetup() throws Exception { - conf = new HiveConf(TestReplicationScenariosAcidTables.class); + HashMap<String, String> overrides = new HashMap<>(); + overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), + GzipJSONMessageEncoder.class.getCanonicalName()); + + internalBeforeClassSetup(overrides, TestReplicationScenariosAcidTables.class); + } + + static void internalBeforeClassSetup(Map<String, String> overrides, Class clazz) + throws Exception { + conf = new HiveConf(clazz); conf.set("dfs.client.use.datanode.hostname", "true"); conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); - HashMap<String, String> overridesForHiveConf = new HashMap<String, String>() {{ + HashMap<String, String> acidConfs = new HashMap<String, String>() {{ put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); put("hive.support.concurrency", "true"); put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); @@ -97,9 +89,11 @@ public class TestReplicationScenariosIncrementalLoadAcidTables { put("mapred.input.dir.recursive", "true"); put("hive.metastore.disallow.incompatible.col.type.changes", "false"); }}; - primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); - replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); - HashMap<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{ + + acidConfs.putAll(overrides); + primary = new WarehouseInstance(LOG, miniDFSCluster, acidConfs); + replica = new WarehouseInstance(LOG, miniDFSCluster, acidConfs); + Map<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{ put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); put("hive.support.concurrency", "false"); put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); @@ -116,7 +110,6 @@ public class TestReplicationScenariosIncrementalLoadAcidTables { @Before public void setup() throws Throwable { - replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>()); primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); replicatedDbName = "replicated_" + primaryDbName; primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 1e3478d..aae7bd7 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -100,7 +100,7 @@ public class WarehouseInstance implements Closeable { initialize(cmRootPath.toString(), warehouseRoot.toString(), overridesForHiveConf); } - public WarehouseInstance(Logger logger, MiniDFSCluster cluster, + WarehouseInstance(Logger logger, MiniDFSCluster cluster, Map<String, String> overridesForHiveConf) throws Exception { this(logger, cluster, overridesForHiveConf, null); } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java index 66f3b78..a51b7e7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java @@ -60,7 +60,7 @@ import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.Entity.Type; import org.apache.hadoop.hive.ql.hooks.ReadEntity; @@ -997,12 +997,12 @@ public final class QueryResultsCache { String tableName; switch (event.getEventType()) { - case MessageFactory.ADD_PARTITION_EVENT: - case MessageFactory.ALTER_PARTITION_EVENT: - case MessageFactory.DROP_PARTITION_EVENT: - case MessageFactory.ALTER_TABLE_EVENT: - case MessageFactory.DROP_TABLE_EVENT: - case MessageFactory.INSERT_EVENT: + case MessageBuilder.ADD_PARTITION_EVENT: + case MessageBuilder.ALTER_PARTITION_EVENT: + case MessageBuilder.DROP_PARTITION_EVENT: + case MessageBuilder.ALTER_TABLE_EVENT: + case MessageBuilder.DROP_TABLE_EVENT: + case MessageBuilder.INSERT_EVENT: dbName = event.getDbName(); tableName = event.getTableName(); break; http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 4872080..c75bde5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -31,11 +31,9 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey; import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; -import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; -import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; @@ -156,8 +154,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { IMetaStoreClient.NotificationFilter evFilter = new AndFilter( new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern), - new EventBoundaryFilter(work.eventFrom, work.eventTo), - new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat())); + new EventBoundaryFilter(work.eventFrom, work.eventTo)); EventUtils.MSClientNotificationFetcher evFetcher = new EventUtils.MSClientNotificationFetcher(hiveDb); http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java index d09b98c..8747727 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java @@ -30,7 +30,7 @@ import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage; import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage; import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage; import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; -import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent; @@ -63,7 +63,7 @@ public class LoadConstraint { private final ConstraintEvent event; private final String dbNameToLoadIn; private final TaskTracker tracker; - private final MessageDeserializer deserializer = MessageFactory.getInstance().getDeserializer(); + private final MessageDeserializer deserializer = JSONMessageEncoder.getInstance().getDeserializer(); public LoadConstraint(Context context, ConstraintEvent event, String dbNameToLoadIn, TaskTracker existingTracker) { http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java index b9a5d21..5db3f26 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java @@ -18,20 +18,26 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -class AbortTxnHandler extends AbstractEventHandler { +class AbortTxnHandler extends AbstractEventHandler<AbortTxnMessage> { AbortTxnHandler(NotificationEvent event) { super(event); } @Override + AbortTxnMessage eventMessage(String stringRepresentation) { + return deserializer.getAbortTxnMessage(stringRepresentation); + } + + @Override public void handle(Context withinContext) throws Exception { - LOG.info("Processing#{} ABORT_TXN message : {}", fromEventId(), event.getMessage()); + LOG.info("Processing#{} ABORT_TXN message : {}", fromEventId(), eventMessageAsJSON); DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); + dmd.setPayload(eventMessageAsJSON); dmd.write(); } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java index 3ed005c..672f402 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java @@ -18,9 +18,10 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.EventMessage; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; -abstract class AbstractConstraintEventHandler extends AbstractEventHandler { +abstract class AbstractConstraintEventHandler<T extends EventMessage> extends AbstractEventHandler<T> { AbstractConstraintEventHandler(NotificationEvent event) { super(event); } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java index a70c673..b996703 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java @@ -18,20 +18,48 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.EventMessage; import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; +import org.apache.hadoop.hive.metastore.messaging.MessageEncoder; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -abstract class AbstractEventHandler implements EventHandler { +abstract class AbstractEventHandler<T extends EventMessage> implements EventHandler { static final Logger LOG = LoggerFactory.getLogger(AbstractEventHandler.class); + static final MessageEncoder jsonMessageEncoder = JSONMessageEncoder.getInstance(); final NotificationEvent event; final MessageDeserializer deserializer; + final String eventMessageAsJSON; + final T eventMessage; AbstractEventHandler(NotificationEvent event) { this.event = event; - deserializer = MessageFactory.getInstance().getDeserializer(); + try { + deserializer = MessageFactory.getInstance(event.getMessageFormat()).getDeserializer(); + } catch (Exception e) { + String message = + "could not create appropriate messageFactory for format " + event.getMessageFormat(); + LOG.error(message, e); + throw new IllegalStateException(message, e); + } + eventMessage = eventMessage(event.getMessage()); + eventMessageAsJSON = eventMessageAsJSON(eventMessage); + } + + /** + * This takes in the string representation of the message in the format as specified in rdbms backing metastore. + */ + abstract T eventMessage(String stringRepresentation); + + private String eventMessageAsJSON(T eventMessage) { + if (eventMessage == null) { + // this will only happen in case DefaultHandler is invoked + return null; + } + return jsonMessageEncoder.getSerializer().serialize(eventMessage); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddForeignKeyHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddForeignKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddForeignKeyHandler.java index 8fdf2f1..736a162 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddForeignKeyHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddForeignKeyHandler.java @@ -18,21 +18,27 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -public class AddForeignKeyHandler extends AbstractConstraintEventHandler { +public class AddForeignKeyHandler extends AbstractConstraintEventHandler<AddForeignKeyMessage> { AddForeignKeyHandler(NotificationEvent event) { super(event); } @Override + AddForeignKeyMessage eventMessage(String stringRepresentation) { + return deserializer.getAddForeignKeyMessage(stringRepresentation); + } + + @Override public void handle(Context withinContext) throws Exception { LOG.debug("Processing#{} ADD_FOREIGNKEY_MESSAGE message : {}", fromEventId(), - event.getMessage()); + eventMessageAsJSON); if (shouldReplicate(withinContext)) { DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); + dmd.setPayload(eventMessageAsJSON); dmd.write(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddNotNullConstraintHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddNotNullConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddNotNullConstraintHandler.java index 335d4e6..c778198 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddNotNullConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddNotNullConstraintHandler.java @@ -18,22 +18,28 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -public class AddNotNullConstraintHandler extends AbstractConstraintEventHandler { +public class AddNotNullConstraintHandler extends AbstractConstraintEventHandler<AddNotNullConstraintMessage> { AddNotNullConstraintHandler(NotificationEvent event) { super(event); } @Override + AddNotNullConstraintMessage eventMessage(String stringRepresentation) { + return deserializer.getAddNotNullConstraintMessage(stringRepresentation); + } + + @Override public void handle(Context withinContext) throws Exception { LOG.debug("Processing#{} ADD_NOTNULLCONSTRAINT_MESSAGE message : {}", fromEventId(), - event.getMessage()); + eventMessageAsJSON); if (shouldReplicate(withinContext)) { DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); + dmd.setPayload(eventMessageAsJSON); dmd.write(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index 973a65b..5c16887 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.EventMessage; import org.apache.hadoop.hive.metastore.messaging.PartitionFiles; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -42,10 +43,15 @@ class AddPartitionHandler extends AbstractEventHandler { } @Override + EventMessage eventMessage(String stringRepresentation) { + return deserializer.getAddPartitionMessage(stringRepresentation); + } + + @Override public void handle(Context withinContext) throws Exception { - LOG.info("Processing#{} ADD_PARTITION message : {}", fromEventId(), event.getMessage()); + LOG.info("Processing#{} ADD_PARTITION message : {}", fromEventId(), eventMessageAsJSON); - AddPartitionMessage apm = deserializer.getAddPartitionMessage(event.getMessage()); + AddPartitionMessage apm = (AddPartitionMessage) eventMessage; org.apache.hadoop.hive.metastore.api.Table tobj = apm.getTableObj(); if (tobj == null) { LOG.debug("Event#{} was a ADD_PTN_EVENT with no table listed"); http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPrimaryKeyHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPrimaryKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPrimaryKeyHandler.java index cf45c68..f9c08c2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPrimaryKeyHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPrimaryKeyHandler.java @@ -18,22 +18,28 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -public class AddPrimaryKeyHandler extends AbstractConstraintEventHandler { +public class AddPrimaryKeyHandler extends AbstractConstraintEventHandler<AddPrimaryKeyMessage> { AddPrimaryKeyHandler(NotificationEvent event) { super(event); } @Override + AddPrimaryKeyMessage eventMessage(String stringRepresentation) { + return deserializer.getAddPrimaryKeyMessage(stringRepresentation); + } + + @Override public void handle(Context withinContext) throws Exception { LOG.debug("Processing#{} ADD_PRIMARYKEY_MESSAGE message : {}", fromEventId(), - event.getMessage()); + eventMessageAsJSON); if (shouldReplicate(withinContext)) { DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); + dmd.setPayload(eventMessageAsJSON); dmd.write(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddUniqueConstraintHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddUniqueConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddUniqueConstraintHandler.java index 58835a0..69caf08 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddUniqueConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddUniqueConstraintHandler.java @@ -18,22 +18,29 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -public class AddUniqueConstraintHandler extends AbstractConstraintEventHandler { +public class AddUniqueConstraintHandler + extends AbstractConstraintEventHandler<AddUniqueConstraintMessage> { AddUniqueConstraintHandler(NotificationEvent event) { super(event); } @Override + AddUniqueConstraintMessage eventMessage(String stringRepresentation) { + return deserializer.getAddUniqueConstraintMessage(stringRepresentation); + } + + @Override public void handle(Context withinContext) throws Exception { LOG.debug("Processing#{} ADD_UNIQUECONSTRAINT_MESSAGE message : {}", fromEventId(), - event.getMessage()); + eventMessageAsJSON); if (shouldReplicate(withinContext)) { DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); + dmd.setPayload(eventMessageAsJSON); dmd.write(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java index 38efbd7..7602d1f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java @@ -18,19 +18,25 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -class AllocWriteIdHandler extends AbstractEventHandler { +class AllocWriteIdHandler extends AbstractEventHandler<AllocWriteIdMessage> { AllocWriteIdHandler(NotificationEvent event) { super(event); } @Override + AllocWriteIdMessage eventMessage(String stringRepresentation) { + return deserializer.getAllocWriteIdMessage(stringRepresentation); + } + + @Override public void handle(Context withinContext) throws Exception { - LOG.info("Processing#{} ALLOC_WRITE_ID message : {}", fromEventId(), event.getMessage()); + LOG.info("Processing#{} ALLOC_WRITE_ID message : {}", fromEventId(), eventMessageAsJSON); DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); + dmd.setPayload(eventMessageAsJSON); dmd.write(); } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterDatabaseHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterDatabaseHandler.java index 3863c59..a31d1b8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterDatabaseHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterDatabaseHandler.java @@ -18,20 +18,26 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -class AlterDatabaseHandler extends AbstractEventHandler { +class AlterDatabaseHandler extends AbstractEventHandler<AlterDatabaseMessage> { AlterDatabaseHandler(NotificationEvent event) { super(event); } @Override + AlterDatabaseMessage eventMessage(String stringRepresentation) { + return deserializer.getAlterDatabaseMessage(stringRepresentation); + } + + @Override public void handle(Context withinContext) throws Exception { - LOG.info("Processing#{} ALTER_DATABASE message : {}", fromEventId(), event.getMessage()); + LOG.info("Processing#{} ALTER_DATABASE message : {}", fromEventId(), eventMessageAsJSON); DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); + dmd.setPayload(eventMessageAsJSON); dmd.write(); } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java index cde4eed..d81408e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java @@ -23,17 +23,15 @@ import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import org.apache.hadoop.hive.ql.parse.repl.DumpType; - -import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; -import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; - -class AlterPartitionHandler extends AbstractEventHandler { +class AlterPartitionHandler extends AbstractEventHandler<AlterPartitionMessage> { private final org.apache.hadoop.hive.metastore.api.Partition after; private final org.apache.hadoop.hive.metastore.api.Table tableObject; private final boolean isTruncateOp; @@ -41,7 +39,7 @@ class AlterPartitionHandler extends AbstractEventHandler { AlterPartitionHandler(NotificationEvent event) throws Exception { super(event); - AlterPartitionMessage apm = deserializer.getAlterPartitionMessage(event.getMessage()); + AlterPartitionMessage apm = eventMessage; tableObject = apm.getTableObj(); org.apache.hadoop.hive.metastore.api.Partition before = apm.getPtnObjBefore(); after = apm.getPtnObjAfter(); @@ -49,6 +47,11 @@ class AlterPartitionHandler extends AbstractEventHandler { scenario = scenarioType(before, after); } + @Override + AlterPartitionMessage eventMessage(String stringRepresentation) { + return deserializer.getAlterPartitionMessage(stringRepresentation); + } + private enum Scenario { ALTER { @Override @@ -86,7 +89,7 @@ class AlterPartitionHandler extends AbstractEventHandler { @Override public void handle(Context withinContext) throws Exception { - LOG.info("Processing#{} ALTER_PARTITION message : {}", fromEventId(), event.getMessage()); + LOG.info("Processing#{} ALTER_PARTITION message : {}", fromEventId(), eventMessageAsJSON); Table qlMdTable = new Table(tableObject); if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) { @@ -107,7 +110,7 @@ class AlterPartitionHandler extends AbstractEventHandler { withinContext.hiveConf); } DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); + dmd.setPayload(eventMessageAsJSON); dmd.write(); } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java index 5f582b3..00fa370 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -class AlterTableHandler extends AbstractEventHandler { +class AlterTableHandler extends AbstractEventHandler<AlterTableMessage> { private final org.apache.hadoop.hive.metastore.api.Table before; private final org.apache.hadoop.hive.metastore.api.Table after; private final boolean isTruncateOp; @@ -59,13 +59,17 @@ class AlterTableHandler extends AbstractEventHandler { AlterTableHandler(NotificationEvent event) throws Exception { super(event); - AlterTableMessage atm = deserializer.getAlterTableMessage(event.getMessage()); - before = atm.getTableObjBefore(); - after = atm.getTableObjAfter(); - isTruncateOp = atm.getIsTruncateOp(); + before = eventMessage.getTableObjBefore(); + after = eventMessage.getTableObjAfter(); + isTruncateOp = eventMessage.getIsTruncateOp(); scenario = scenarioType(before, after); } + @Override + AlterTableMessage eventMessage(String stringRepresentation) { + return deserializer.getAlterTableMessage(stringRepresentation); + } + private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Table before, org.apache.hadoop.hive.metastore.api.Table after) { if (before.getDbName().equals(after.getDbName()) @@ -78,7 +82,7 @@ class AlterTableHandler extends AbstractEventHandler { @Override public void handle(Context withinContext) throws Exception { - LOG.info("Processing#{} ALTER_TABLE message : {}", fromEventId(), event.getMessage()); + LOG.info("Processing#{} ALTER_TABLE message : {}", fromEventId(), eventMessageAsJSON); Table qlMdTableBefore = new Table(before); if (!Utils @@ -100,7 +104,7 @@ class AlterTableHandler extends AbstractEventHandler { } DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); + dmd.setPayload(eventMessageAsJSON); dmd.write(); } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java index 82a722f..620263f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java @@ -40,12 +40,17 @@ import java.io.OutputStreamWriter; import java.util.ArrayList; import java.util.List; -class CommitTxnHandler extends AbstractEventHandler { +class CommitTxnHandler extends AbstractEventHandler<CommitTxnMessage> { CommitTxnHandler(NotificationEvent event) { super(event); } + @Override + CommitTxnMessage eventMessage(String stringRepresentation) { + return deserializer.getCommitTxnMessage(stringRepresentation); + } + private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException { Path filesPath = new Path(dataPath, EximUtil.FILES_NAME); FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf); @@ -97,23 +102,22 @@ class CommitTxnHandler extends AbstractEventHandler { @Override public void handle(Context withinContext) throws Exception { - LOG.info("Processing#{} COMMIT_TXN message : {}", fromEventId(), event.getMessage()); - String payload = event.getMessage(); + LOG.info("Processing#{} COMMIT_TXN message : {}", fromEventId(), eventMessageAsJSON); + String payload = eventMessageAsJSON; if (!withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) { - CommitTxnMessage commitTxnMessage = deserializer.getCommitTxnMessage(event.getMessage()); String contextDbName = withinContext.dbName == null ? null : StringUtils.normalizeIdentifier(withinContext.dbName); String contextTableName = withinContext.tableName == null ? null : StringUtils.normalizeIdentifier(withinContext.tableName); List<WriteEventInfo> writeEventInfoList = HiveMetaStore.HMSHandler.getMSForConf(withinContext.hiveConf). - getAllWriteEventInfo(commitTxnMessage.getTxnId(), contextDbName, contextTableName); + getAllWriteEventInfo(eventMessage.getTxnId(), contextDbName, contextTableName); int numEntry = (writeEventInfoList != null ? writeEventInfoList.size() : 0); if (numEntry != 0) { - commitTxnMessage.addWriteEventInfo(writeEventInfoList); - payload = commitTxnMessage.toString(); - LOG.debug("payload for commit txn event : " + payload); + eventMessage.addWriteEventInfo(writeEventInfoList); + payload = jsonMessageEncoder.getSerializer().serialize(eventMessage); + LOG.debug("payload for commit txn event : " + eventMessageAsJSON); } org.apache.hadoop.hive.ql.metadata.Table qlMdTablePrev = null; @@ -128,7 +132,7 @@ class CommitTxnHandler extends AbstractEventHandler { // combination as primary key, so the entries with same table will come together. Only basic table metadata is // used during import, so we need not dump the latest table metadata. for (int idx = 0; idx < numEntry; idx++) { - qlMdTable = new org.apache.hadoop.hive.ql.metadata.Table(commitTxnMessage.getTableObj(idx)); + qlMdTable = new org.apache.hadoop.hive.ql.metadata.Table(eventMessage.getTableObj(idx)); if (qlMdTablePrev == null) { qlMdTablePrev = qlMdTable; } @@ -141,13 +145,13 @@ class CommitTxnHandler extends AbstractEventHandler { qlMdTablePrev = qlMdTable; } - if (qlMdTable.isPartitioned() && (null != commitTxnMessage.getPartitionObj(idx))) { + if (qlMdTable.isPartitioned() && (null != eventMessage.getPartitionObj(idx))) { qlPtns.add(new org.apache.hadoop.hive.ql.metadata.Partition(qlMdTable, - commitTxnMessage.getPartitionObj(idx))); + eventMessage.getPartitionObj(idx))); } filesTobeAdded.add(Lists.newArrayList( - ReplChangeManager.getListFromSeparatedString(commitTxnMessage.getFiles(idx)))); + ReplChangeManager.getListFromSeparatedString(eventMessage.getFiles(idx)))); } //Dump last table in the list http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateDatabaseHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateDatabaseHandler.java index 21eb74b..7d64e49 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateDatabaseHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateDatabaseHandler.java @@ -24,19 +24,22 @@ import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.DumpType; -class CreateDatabaseHandler extends AbstractEventHandler { +class CreateDatabaseHandler extends AbstractEventHandler<CreateDatabaseMessage> { CreateDatabaseHandler(NotificationEvent event) { super(event); } @Override + CreateDatabaseMessage eventMessage(String stringRepresentation) { + return deserializer.getCreateDatabaseMessage(stringRepresentation); + } + + @Override public void handle(Context withinContext) throws Exception { - LOG.info("Processing#{} CREATE_DATABASE message : {}", fromEventId(), event.getMessage()); - CreateDatabaseMessage createDatabaseMsg = - deserializer.getCreateDatabaseMessage(event.getMessage()); + LOG.info("Processing#{} CREATE_DATABASE message : {}", fromEventId(), eventMessageAsJSON); Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); FileSystem fileSystem = metaDataPath.getFileSystem(withinContext.hiveConf); - EximUtil.createDbExportDump(fileSystem, metaDataPath, createDatabaseMsg.getDatabaseObject(), + EximUtil.createDbExportDump(fileSystem, metaDataPath, eventMessage.getDatabaseObject(), withinContext.replicationSpec); withinContext.createDmd(this).write(); } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java index 5f0338e..5954e15 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java @@ -27,21 +27,24 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; -class CreateFunctionHandler extends AbstractEventHandler { +class CreateFunctionHandler extends AbstractEventHandler<CreateFunctionMessage> { CreateFunctionHandler(NotificationEvent event) { super(event); } @Override + CreateFunctionMessage eventMessage(String stringRepresentation) { + return deserializer.getCreateFunctionMessage(stringRepresentation); + } + + @Override public void handle(Context withinContext) throws Exception { - CreateFunctionMessage createFunctionMessage = - deserializer.getCreateFunctionMessage(event.getMessage()); - LOG.info("Processing#{} CREATE_MESSAGE message : {}", fromEventId(), event.getMessage()); + LOG.info("Processing#{} CREATE_MESSAGE message : {}", fromEventId(), eventMessageAsJSON); Path metadataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); FileSystem fileSystem = metadataPath.getFileSystem(withinContext.hiveConf); try (JsonWriter jsonWriter = new JsonWriter(fileSystem, metadataPath)) { - new FunctionSerializer(createFunctionMessage.getFunctionObj(), withinContext.hiveConf) + new FunctionSerializer(eventMessage.getFunctionObj(), withinContext.hiveConf) .writeTo(jsonWriter, withinContext.replicationSpec); } withinContext.createDmd(this).write(); http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index 897ea7f..550a82d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -25,23 +25,26 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; -import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; -class CreateTableHandler extends AbstractEventHandler { +class CreateTableHandler extends AbstractEventHandler<CreateTableMessage> { CreateTableHandler(NotificationEvent event) { super(event); } @Override + CreateTableMessage eventMessage(String stringRepresentation) { + return deserializer.getCreateTableMessage(stringRepresentation); + } + + @Override public void handle(Context withinContext) throws Exception { - CreateTableMessage ctm = deserializer.getCreateTableMessage(event.getMessage()); - LOG.info("Processing#{} CREATE_TABLE message : {}", fromEventId(), event.getMessage()); - org.apache.hadoop.hive.metastore.api.Table tobj = ctm.getTableObj(); + LOG.info("Processing#{} CREATE_TABLE message : {}", fromEventId(), eventMessageAsJSON); + org.apache.hadoop.hive.metastore.api.Table tobj = eventMessage.getTableObj(); if (tobj == null) { LOG.debug("Event#{} was a CREATE_TABLE_EVENT with no table listed"); @@ -68,7 +71,7 @@ class CreateTableHandler extends AbstractEventHandler { withinContext.hiveConf); Path dataPath = new Path(withinContext.eventRoot, "data"); - Iterable<String> files = ctm.getFiles(); + Iterable<String> files = eventMessage.getFiles(); if (files != null) { // encoded filename/checksum of files, write into _files try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java index 8977f62..864cb98 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.EventMessage; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; @@ -30,7 +31,15 @@ class DefaultHandler extends AbstractEventHandler { } @Override + EventMessage eventMessage(String stringRepresentation) { + return null; + } + + @Override public void handle(Context withinContext) throws Exception { + // we specifically use the the message string from the original event since we dont know what type of message + // to convert this message to, this handler should not be called since with different message formats we need + // the ability to convert messages to a given message type. LOG.info("Dummy processing#{} message : {}", fromEventId(), event.getMessage()); DumpMetaData dmd = withinContext.createDmd(this); dmd.setPayload(event.getMessage()); http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java index 979e9a1..4c239e2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java @@ -18,19 +18,26 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.DropConstraintMessage; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -class DropConstraintHandler extends AbstractEventHandler { +class DropConstraintHandler extends AbstractEventHandler<DropConstraintMessage> { DropConstraintHandler(NotificationEvent event) { super(event); } @Override + DropConstraintMessage eventMessage(String stringRepresentation) { + return deserializer.getDropConstraintMessage(stringRepresentation); + } + + @Override public void handle(Context withinContext) throws Exception { - LOG.info("Processing#{} DROP_CONSTRAINT_MESSAGE message : {}", fromEventId(), event.getMessage()); + LOG.info("Processing#{} DROP_CONSTRAINT_MESSAGE message : {}", fromEventId(), + eventMessageAsJSON); DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); + dmd.setPayload(eventMessageAsJSON); dmd.write(); } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropDatabaseHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropDatabaseHandler.java index 4eae778..f09f77d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropDatabaseHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropDatabaseHandler.java @@ -18,19 +18,25 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -class DropDatabaseHandler extends AbstractEventHandler { +class DropDatabaseHandler extends AbstractEventHandler<DropDatabaseMessage> { DropDatabaseHandler(NotificationEvent event) { super(event); } @Override + DropDatabaseMessage eventMessage(String stringRepresentation) { + return deserializer.getDropDatabaseMessage(stringRepresentation); + } + + @Override public void handle(Context withinContext) throws Exception { - LOG.info("Processing#{} DROP_DATABASE message : {}", fromEventId(), event.getMessage()); + LOG.info("Processing#{} DROP_DATABASE message : {}", fromEventId(), eventMessageAsJSON); DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); + dmd.setPayload(eventMessageAsJSON); dmd.write(); } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java index 352b0cc..6140c0c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java @@ -18,20 +18,26 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -class DropFunctionHandler extends AbstractEventHandler { +class DropFunctionHandler extends AbstractEventHandler<DropFunctionMessage> { DropFunctionHandler(NotificationEvent event) { super(event); } @Override + DropFunctionMessage eventMessage(String stringRepresentation) { + return deserializer.getDropFunctionMessage(stringRepresentation); + } + + @Override public void handle(Context withinContext) throws Exception { - LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), event.getMessage()); + LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), eventMessageAsJSON); DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); + dmd.setPayload(eventMessageAsJSON); dmd.write(); } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java index 19b7044..e2a40d2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java @@ -19,21 +19,27 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -class DropPartitionHandler extends AbstractEventHandler { +class DropPartitionHandler extends AbstractEventHandler<DropPartitionMessage> { DropPartitionHandler(NotificationEvent event) { super(event); } @Override + DropPartitionMessage eventMessage(String stringRepresentation) { + return deserializer.getDropPartitionMessage(stringRepresentation); + } + + @Override public void handle(Context withinContext) throws Exception { - LOG.info("Processing#{} DROP_PARTITION message : {}", fromEventId(), event.getMessage()); + LOG.info("Processing#{} DROP_PARTITION message : {}", fromEventId(), eventMessageAsJSON); DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); + dmd.setPayload(eventMessageAsJSON); dmd.write(); } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java index cce0192..7d17de2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java @@ -19,21 +19,27 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -class DropTableHandler extends AbstractEventHandler { +class DropTableHandler extends AbstractEventHandler<DropTableMessage> { DropTableHandler(NotificationEvent event) { super(event); } @Override + DropTableMessage eventMessage(String stringRepresentation) { + return deserializer.getDropTableMessage(stringRepresentation); + } + + @Override public void handle(Context withinContext) throws Exception { - LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), event.getMessage()); + LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), eventMessageAsJSON); DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); + dmd.setPayload(eventMessageAsJSON); dmd.write(); } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java index a1d61f9..2a0379e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -33,27 +33,27 @@ public class EventHandlerFactory { private static Map<String, Class<? extends EventHandler>> registeredHandlers = new HashMap<>(); static { - register(MessageFactory.ADD_PARTITION_EVENT, AddPartitionHandler.class); - register(MessageFactory.ALTER_DATABASE_EVENT, AlterDatabaseHandler.class); - register(MessageFactory.ALTER_PARTITION_EVENT, AlterPartitionHandler.class); - register(MessageFactory.ALTER_TABLE_EVENT, AlterTableHandler.class); - register(MessageFactory.CREATE_FUNCTION_EVENT, CreateFunctionHandler.class); - register(MessageFactory.CREATE_TABLE_EVENT, CreateTableHandler.class); - register(MessageFactory.DROP_PARTITION_EVENT, DropPartitionHandler.class); - register(MessageFactory.DROP_TABLE_EVENT, DropTableHandler.class); - register(MessageFactory.INSERT_EVENT, InsertHandler.class); - register(MessageFactory.DROP_FUNCTION_EVENT, DropFunctionHandler.class); - register(MessageFactory.ADD_PRIMARYKEY_EVENT, AddPrimaryKeyHandler.class); - register(MessageFactory.ADD_FOREIGNKEY_EVENT, AddForeignKeyHandler.class); - register(MessageFactory.ADD_UNIQUECONSTRAINT_EVENT, AddUniqueConstraintHandler.class); - register(MessageFactory.ADD_NOTNULLCONSTRAINT_EVENT, AddNotNullConstraintHandler.class); - register(MessageFactory.DROP_CONSTRAINT_EVENT, DropConstraintHandler.class); - register(MessageFactory.CREATE_DATABASE_EVENT, CreateDatabaseHandler.class); - register(MessageFactory.DROP_DATABASE_EVENT, DropDatabaseHandler.class); - register(MessageFactory.OPEN_TXN_EVENT, OpenTxnHandler.class); - register(MessageFactory.COMMIT_TXN_EVENT, CommitTxnHandler.class); - register(MessageFactory.ABORT_TXN_EVENT, AbortTxnHandler.class); - register(MessageFactory.ALLOC_WRITE_ID_EVENT, AllocWriteIdHandler.class); + register(MessageBuilder.ADD_PARTITION_EVENT, AddPartitionHandler.class); + register(MessageBuilder.ALTER_DATABASE_EVENT, AlterDatabaseHandler.class); + register(MessageBuilder.ALTER_PARTITION_EVENT, AlterPartitionHandler.class); + register(MessageBuilder.ALTER_TABLE_EVENT, AlterTableHandler.class); + register(MessageBuilder.CREATE_FUNCTION_EVENT, CreateFunctionHandler.class); + register(MessageBuilder.CREATE_TABLE_EVENT, CreateTableHandler.class); + register(MessageBuilder.DROP_PARTITION_EVENT, DropPartitionHandler.class); + register(MessageBuilder.DROP_TABLE_EVENT, DropTableHandler.class); + register(MessageBuilder.INSERT_EVENT, InsertHandler.class); + register(MessageBuilder.DROP_FUNCTION_EVENT, DropFunctionHandler.class); + register(MessageBuilder.ADD_PRIMARYKEY_EVENT, AddPrimaryKeyHandler.class); + register(MessageBuilder.ADD_FOREIGNKEY_EVENT, AddForeignKeyHandler.class); + register(MessageBuilder.ADD_UNIQUECONSTRAINT_EVENT, AddUniqueConstraintHandler.class); + register(MessageBuilder.ADD_NOTNULLCONSTRAINT_EVENT, AddNotNullConstraintHandler.class); + register(MessageBuilder.DROP_CONSTRAINT_EVENT, DropConstraintHandler.class); + register(MessageBuilder.CREATE_DATABASE_EVENT, CreateDatabaseHandler.class); + register(MessageBuilder.DROP_DATABASE_EVENT, DropDatabaseHandler.class); + register(MessageBuilder.OPEN_TXN_EVENT, OpenTxnHandler.class); + register(MessageBuilder.COMMIT_TXN_EVENT, CommitTxnHandler.class); + register(MessageBuilder.ABORT_TXN_EVENT, AbortTxnHandler.class); + register(MessageBuilder.ALLOC_WRITE_ID_EVENT, AllocWriteIdHandler.class); } static void register(String event, Class<? extends EventHandler> handlerClazz) { http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index cf3822a..842e20a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -36,19 +36,23 @@ import java.util.Collections; import java.util.List; -class InsertHandler extends AbstractEventHandler { +class InsertHandler extends AbstractEventHandler<InsertMessage> { InsertHandler(NotificationEvent event) { super(event); } @Override + InsertMessage eventMessage(String stringRepresentation) { + return deserializer.getInsertMessage(stringRepresentation); + } + + @Override public void handle(Context withinContext) throws Exception { if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) { return; } - InsertMessage insertMsg = deserializer.getInsertMessage(event.getMessage()); - org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(insertMsg); + org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(eventMessage); if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) { return; @@ -58,18 +62,18 @@ class InsertHandler extends AbstractEventHandler { assert(!AcidUtils.isTransactionalTable(qlMdTable)); List<Partition> qlPtns = null; - if (qlMdTable.isPartitioned() && (null != insertMsg.getPtnObj())) { - qlPtns = Collections.singletonList(partitionObject(qlMdTable, insertMsg)); + if (qlMdTable.isPartitioned() && (null != eventMessage.getPtnObj())) { + qlPtns = Collections.singletonList(partitionObject(qlMdTable, eventMessage)); } Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); // Mark the replace type based on INSERT-INTO or INSERT_OVERWRITE operation - withinContext.replicationSpec.setIsReplace(insertMsg.isReplace()); + withinContext.replicationSpec.setIsReplace(eventMessage.isReplace()); EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath, qlMdTable, qlPtns, withinContext.replicationSpec, withinContext.hiveConf); - Iterable<String> files = insertMsg.getFiles(); + Iterable<String> files = eventMessage.getFiles(); if (files != null) { Path dataPath; @@ -93,9 +97,9 @@ class InsertHandler extends AbstractEventHandler { } } - LOG.info("Processing#{} INSERT message : {}", fromEventId(), event.getMessage()); + LOG.info("Processing#{} INSERT message : {}", fromEventId(), eventMessageAsJSON); DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); + dmd.setPayload(eventMessageAsJSON); dmd.write(); } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java index fe81fe1..215e726 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java @@ -18,20 +18,26 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -class OpenTxnHandler extends AbstractEventHandler { +class OpenTxnHandler extends AbstractEventHandler<OpenTxnMessage> { OpenTxnHandler(NotificationEvent event) { super(event); } @Override + OpenTxnMessage eventMessage(String stringRepresentation) { + return deserializer.getOpenTxnMessage(stringRepresentation); + } + + @Override public void handle(Context withinContext) throws Exception { - LOG.info("Processing#{} OPEN_TXN message : {}", fromEventId(), event.getMessage()); + LOG.info("Processing#{} OPEN_TXN message : {}", fromEventId(), eventMessageAsJSON); DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); + dmd.setPayload(eventMessageAsJSON); dmd.write(); } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java index 2848212..ae3db9c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/ConstraintsSerializer.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey; import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; -import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -52,16 +52,16 @@ public class ConstraintsSerializer implements JsonWriter.Serializer { String pksString, fksString, uksString, nnsString; pksString = fksString = uksString = nnsString = ""; if (pks != null) { - pksString = MessageFactory.getInstance().buildAddPrimaryKeyMessage(pks).toString(); + pksString = MessageBuilder.getInstance().buildAddPrimaryKeyMessage(pks).toString(); } if (fks != null) { - fksString = MessageFactory.getInstance().buildAddForeignKeyMessage(fks).toString(); + fksString = MessageBuilder.getInstance().buildAddForeignKeyMessage(fks).toString(); } if (uks != null) { - uksString = MessageFactory.getInstance().buildAddUniqueConstraintMessage(uks).toString(); + uksString = MessageBuilder.getInstance().buildAddUniqueConstraintMessage(uks).toString(); } if (nns != null) { - nnsString = MessageFactory.getInstance().buildAddNotNullConstraintMessage(nns).toString(); + nnsString = MessageBuilder.getInstance().buildAddNotNullConstraintMessage(nns).toString(); } writer.jsonGenerator.writeStringField("pks", pksString); writer.jsonGenerator.writeStringField("uks", uksString); http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java index 5b26681..32ac6ee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hive.ql.parse.repl.load.message; import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; -import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; @@ -30,7 +30,7 @@ abstract class AbstractMessageHandler implements MessageHandler { final HashSet<ReadEntity> readEntitySet = new HashSet<>(); final HashSet<WriteEntity> writeEntitySet = new HashSet<>(); final UpdatedMetaDataTracker updatedMetadata = new UpdatedMetaDataTracker(); - final MessageDeserializer deserializer = MessageFactory.getInstance().getDeserializer(); + final MessageDeserializer deserializer = JSONMessageEncoder.getInstance().getDeserializer(); @Override public Set<ReadEntity> readEntities() { http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java index 7057890..6a3c563 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.junit.Test; @@ -53,9 +54,11 @@ public class TestEventHandlerFactory { @Test public void shouldProvideDefaultHandlerWhenNothingRegisteredForThatEvent() { + NotificationEvent event = new NotificationEvent(Long.MAX_VALUE, Integer.MAX_VALUE, + "shouldGiveDefaultHandler", "s"); + event.setMessageFormat(JSONMessageEncoder.FORMAT); EventHandler eventHandler = - EventHandlerFactory.handlerFor(new NotificationEvent(Long.MAX_VALUE, Integer.MAX_VALUE, - "shouldGiveDefaultHandler", "s")); + EventHandlerFactory.handlerFor(event); assertTrue(eventHandler instanceof DefaultHandler); } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 1795ef7..1d64cce 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -514,7 +514,7 @@ public class MetastoreConf { "Alternatively, configure hive.metastore.transactional.event.listeners to ensure both are invoked in same JDO transaction."), EVENT_MESSAGE_FACTORY("metastore.event.message.factory", "hive.metastore.event.message.factory", - "org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory", + "org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder", "Factory class for making encoding and decoding messages in the events generated."), EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS("metastore.notification.parameters.exclude.patterns", "hive.metastore.notification.parameters.exclude.patterns", "", http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java index f24b419..1262c12 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java @@ -30,38 +30,38 @@ public abstract class EventMessage { */ public enum EventType { - CREATE_DATABASE(MessageFactory.CREATE_DATABASE_EVENT), - DROP_DATABASE(MessageFactory.DROP_DATABASE_EVENT), - CREATE_TABLE(MessageFactory.CREATE_TABLE_EVENT), - DROP_TABLE(MessageFactory.DROP_TABLE_EVENT), - ADD_PARTITION(MessageFactory.ADD_PARTITION_EVENT), - DROP_PARTITION(MessageFactory.DROP_PARTITION_EVENT), - ALTER_DATABASE(MessageFactory.ALTER_DATABASE_EVENT), - ALTER_TABLE(MessageFactory.ALTER_TABLE_EVENT), - ALTER_PARTITION(MessageFactory.ALTER_PARTITION_EVENT), - INSERT(MessageFactory.INSERT_EVENT), - CREATE_FUNCTION(MessageFactory.CREATE_FUNCTION_EVENT), - DROP_FUNCTION(MessageFactory.DROP_FUNCTION_EVENT), - - ADD_PRIMARYKEY(MessageFactory.ADD_PRIMARYKEY_EVENT), - ADD_FOREIGNKEY(MessageFactory.ADD_FOREIGNKEY_EVENT), - ADD_UNIQUECONSTRAINT(MessageFactory.ADD_UNIQUECONSTRAINT_EVENT), - ADD_NOTNULLCONSTRAINT(MessageFactory.ADD_NOTNULLCONSTRAINT_EVENT), - DROP_CONSTRAINT(MessageFactory.DROP_CONSTRAINT_EVENT), - CREATE_ISCHEMA(MessageFactory.CREATE_ISCHEMA_EVENT), - ALTER_ISCHEMA(MessageFactory.ALTER_ISCHEMA_EVENT), - DROP_ISCHEMA(MessageFactory.DROP_ISCHEMA_EVENT), - ADD_SCHEMA_VERSION(MessageFactory.ADD_SCHEMA_VERSION_EVENT), - ALTER_SCHEMA_VERSION(MessageFactory.ALTER_SCHEMA_VERSION_EVENT), - DROP_SCHEMA_VERSION(MessageFactory.DROP_SCHEMA_VERSION_EVENT), - CREATE_CATALOG(MessageFactory.CREATE_CATALOG_EVENT), - DROP_CATALOG(MessageFactory.DROP_CATALOG_EVENT), - OPEN_TXN(MessageFactory.OPEN_TXN_EVENT), - COMMIT_TXN(MessageFactory.COMMIT_TXN_EVENT), - ABORT_TXN(MessageFactory.ABORT_TXN_EVENT), - ALLOC_WRITE_ID(MessageFactory.ALLOC_WRITE_ID_EVENT), - ALTER_CATALOG(MessageFactory.ALTER_CATALOG_EVENT), - ACID_WRITE(MessageFactory.ACID_WRITE_EVENT); + CREATE_DATABASE(MessageBuilder.CREATE_DATABASE_EVENT), + DROP_DATABASE(MessageBuilder.DROP_DATABASE_EVENT), + CREATE_TABLE(MessageBuilder.CREATE_TABLE_EVENT), + DROP_TABLE(MessageBuilder.DROP_TABLE_EVENT), + ADD_PARTITION(MessageBuilder.ADD_PARTITION_EVENT), + DROP_PARTITION(MessageBuilder.DROP_PARTITION_EVENT), + ALTER_DATABASE(MessageBuilder.ALTER_DATABASE_EVENT), + ALTER_TABLE(MessageBuilder.ALTER_TABLE_EVENT), + ALTER_PARTITION(MessageBuilder.ALTER_PARTITION_EVENT), + INSERT(MessageBuilder.INSERT_EVENT), + CREATE_FUNCTION(MessageBuilder.CREATE_FUNCTION_EVENT), + DROP_FUNCTION(MessageBuilder.DROP_FUNCTION_EVENT), + + ADD_PRIMARYKEY(MessageBuilder.ADD_PRIMARYKEY_EVENT), + ADD_FOREIGNKEY(MessageBuilder.ADD_FOREIGNKEY_EVENT), + ADD_UNIQUECONSTRAINT(MessageBuilder.ADD_UNIQUECONSTRAINT_EVENT), + ADD_NOTNULLCONSTRAINT(MessageBuilder.ADD_NOTNULLCONSTRAINT_EVENT), + DROP_CONSTRAINT(MessageBuilder.DROP_CONSTRAINT_EVENT), + CREATE_ISCHEMA(MessageBuilder.CREATE_ISCHEMA_EVENT), + ALTER_ISCHEMA(MessageBuilder.ALTER_ISCHEMA_EVENT), + DROP_ISCHEMA(MessageBuilder.DROP_ISCHEMA_EVENT), + ADD_SCHEMA_VERSION(MessageBuilder.ADD_SCHEMA_VERSION_EVENT), + ALTER_SCHEMA_VERSION(MessageBuilder.ALTER_SCHEMA_VERSION_EVENT), + DROP_SCHEMA_VERSION(MessageBuilder.DROP_SCHEMA_VERSION_EVENT), + CREATE_CATALOG(MessageBuilder.CREATE_CATALOG_EVENT), + DROP_CATALOG(MessageBuilder.DROP_CATALOG_EVENT), + OPEN_TXN(MessageBuilder.OPEN_TXN_EVENT), + COMMIT_TXN(MessageBuilder.COMMIT_TXN_EVENT), + ABORT_TXN(MessageBuilder.ABORT_TXN_EVENT), + ALLOC_WRITE_ID(MessageBuilder.ALLOC_WRITE_ID_EVENT), + ALTER_CATALOG(MessageBuilder.ALTER_CATALOG_EVENT), + ACID_WRITE(MessageBuilder.ACID_WRITE_EVENT); private String typeString;
