HIVE-16268 : enable incremental repl dump to handle functions metadata (Anishek Agarwal, reviewed by Sushanth Sowmyan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9d4f13af Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9d4f13af Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9d4f13af Branch: refs/heads/master Commit: 9d4f13afda34250b7cf722287a557426e90ff24d Parents: 699d6ce Author: Sushanth Sowmyan <[email protected]> Authored: Thu May 4 02:48:27 2017 -0700 Committer: Sushanth Sowmyan <[email protected]> Committed: Sun May 7 15:43:21 2017 -0700 ---------------------------------------------------------------------- .../ql/parse/ReplicationSemanticAnalyzer.java | 306 ++----------------- .../hadoop/hive/ql/parse/repl/DumpType.java | 1 + .../parse/repl/dump/events/AbstractHandler.java | 46 +++ .../repl/dump/events/AddPartitionHandler.java | 114 +++++++ .../repl/dump/events/AlterPartitionHandler.java | 112 +++++++ .../repl/dump/events/AlterTableHandler.java | 102 +++++++ .../repl/dump/events/CreateFunctionHandler.java | 36 +++ .../repl/dump/events/CreateTableHandler.java | 86 ++++++ .../parse/repl/dump/events/DefaultHandler.java | 44 +++ .../repl/dump/events/DropPartitionHandler.java | 44 +++ .../repl/dump/events/DropTableHandler.java | 44 +++ .../ql/parse/repl/dump/events/EventHandler.java | 62 ++++ .../repl/dump/events/EventHandlerFactory.java | 76 +++++ .../parse/repl/dump/events/InsertHandler.java | 110 +++++++ .../ql/parse/repl/events/AbstractHandler.java | 46 --- .../parse/repl/events/AddPartitionHandler.java | 114 ------- .../repl/events/AlterPartitionHandler.java | 112 ------- .../ql/parse/repl/events/AlterTableHandler.java | 102 ------- .../parse/repl/events/CreateTableHandler.java | 86 ------ .../ql/parse/repl/events/DefaultHandler.java | 44 --- .../parse/repl/events/DropPartitionHandler.java | 44 --- .../ql/parse/repl/events/DropTableHandler.java | 44 --- .../hive/ql/parse/repl/events/EventHandler.java | 62 ---- .../parse/repl/events/EventHandlerFactory.java | 75 ----- .../ql/parse/repl/events/InsertHandler.java | 110 ------- .../load/message/AbstractMessageHandler.java | 67 ++++ .../parse/repl/load/message/DefaultHandler.java | 33 ++ .../repl/load/message/DropPartitionHandler.java | 108 +++++++ .../repl/load/message/DropTableHandler.java | 51 ++++ .../parse/repl/load/message/InsertHandler.java | 47 +++ .../parse/repl/load/message/MessageHandler.java | 91 ++++++ .../load/message/MessageHandlerFactory.java | 79 +++++ .../load/message/RenamePartitionHandler.java | 74 +++++ .../repl/load/message/RenameTableHandler.java | 81 +++++ .../parse/repl/load/message/TableHandler.java | 68 +++++ .../load/message/TruncatePartitionHandler.java | 69 +++++ .../repl/load/message/TruncateTableHandler.java | 50 +++ .../dump/events/TestEventHandlerFactory.java | 62 ++++ .../repl/events/TestEventHandlerFactory.java | 62 ---- 39 files changed, 1781 insertions(+), 1183 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 2daa123..5d1d2fd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -27,22 +27,15 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.EventUtils; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter; -import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; -import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; -import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; -import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; -import org.apache.hadoop.hive.metastore.messaging.EventUtils; -import org.apache.hadoop.hive.metastore.messaging.InsertMessage; -import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; -import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; @@ -53,29 +46,23 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper; +import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler; +import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandlerFactory; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; -import org.apache.hadoop.hive.ql.parse.repl.events.EventHandler; -import org.apache.hadoop.hive.ql.parse.repl.events.EventHandlerFactory; import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; +import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandlerFactory; import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; import org.apache.hadoop.hive.ql.plan.CreateFunctionDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; -import org.apache.hadoop.hive.ql.plan.DropTableDesc; -import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; -import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; -import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FunctionWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; -import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc; -import org.apache.hadoop.hive.ql.plan.TruncateTableDesc; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,7 +74,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -681,270 +667,26 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } private List<Task<? extends Serializable>> analyzeEventLoad( - String dbName, String tblName, String locn, - Task<? extends Serializable> precursor, - Map<String, Long> dbsUpdated, Map<String, Long> tablesUpdated, - DumpMetaData dmd) throws SemanticException { - MessageDeserializer md = MessageFactory.getInstance().getDeserializer(); - switch (dmd.getDumpType()) { - case EVENT_CREATE_TABLE: { - return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); - } - case EVENT_ADD_PARTITION: { - return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); - } - case EVENT_DROP_TABLE: { - DropTableMessage dropTableMessage = md.getDropTableMessage(dmd.getPayload()); - String actualDbName = ((dbName == null) || dbName.isEmpty() ? dropTableMessage.getDB() : dbName); - String actualTblName = ((tblName == null) || tblName.isEmpty() ? dropTableMessage.getTable() : tblName); - DropTableDesc dropTableDesc = new DropTableDesc( - actualDbName + "." + actualTblName, - null, true, true, - getNewEventOnlyReplicationSpec(dmd.getEventFrom())); - Task<DDLWork> dropTableTask = TaskFactory.get(new DDLWork(inputs, outputs, dropTableDesc), conf); - if (precursor != null){ - precursor.addDependentTask(dropTableTask); - } - List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>(); - tasks.add(dropTableTask); - LOG.debug("Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName()); - dbsUpdated.put(actualDbName,dmd.getEventTo()); - return tasks; - } - case EVENT_DROP_PARTITION: { - try { - DropPartitionMessage dropPartitionMessage = md.getDropPartitionMessage(dmd.getPayload()); - String actualDbName = ((dbName == null) || dbName.isEmpty() ? dropPartitionMessage.getDB() : dbName); - String actualTblName = ((tblName == null) || tblName.isEmpty() ? dropPartitionMessage.getTable() : tblName); - Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs; - partSpecs = - genPartSpecs(new Table(dropPartitionMessage.getTableObj()), - dropPartitionMessage.getPartitions()); - if (partSpecs.size() > 0) { - DropTableDesc dropPtnDesc = new DropTableDesc( - actualDbName + "." + actualTblName, - partSpecs, null, true, - getNewEventOnlyReplicationSpec(dmd.getEventFrom())); - Task<DDLWork> dropPtnTask = - TaskFactory.get(new DDLWork(inputs, outputs, dropPtnDesc), conf); - if (precursor != null) { - precursor.addDependentTask(dropPtnTask); - } - List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>(); - tasks.add(dropPtnTask); - LOG.debug("Added drop ptn task : {}:{},{}", dropPtnTask.getId(), - dropPtnDesc.getTableName(), dropPartitionMessage.getPartitions()); - dbsUpdated.put(actualDbName, dmd.getEventTo()); - tablesUpdated.put(actualDbName + "." + actualTblName, dmd.getEventTo()); - return tasks; - } else { - throw new SemanticException( - "DROP PARTITION EVENT does not return any part descs for event message :" - + dmd.getPayload()); - } - } catch (Exception e) { - if (!(e instanceof SemanticException)){ - throw new SemanticException("Error reading message members", e); - } else { - throw (SemanticException)e; - } - } - } - case EVENT_ALTER_TABLE: { - return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); - } - case EVENT_RENAME_TABLE: { - AlterTableMessage renameTableMessage = md.getAlterTableMessage(dmd.getPayload()); - if ((tblName != null) && (!tblName.isEmpty())){ - throw new SemanticException("RENAMES of tables are not supported for table-level replication"); - } - try { - String oldDbName = renameTableMessage.getTableObjBefore().getDbName(); - String newDbName = renameTableMessage.getTableObjAfter().getDbName(); - - if ((dbName != null) && (!dbName.isEmpty())){ - // If we're loading into a db, instead of into the warehouse, then the oldDbName and - // newDbName must be the same - if (!oldDbName.equalsIgnoreCase(newDbName)){ - throw new SemanticException("Cannot replicate an event renaming a table across" - + " databases into a db level load " + oldDbName +"->" + newDbName); - } else { - // both were the same, and can be replaced by the new db we're loading into. - oldDbName = dbName; - newDbName = dbName; - } - } - - String oldName = oldDbName + "." + renameTableMessage.getTableObjBefore().getTableName(); - String newName = newDbName + "." + renameTableMessage.getTableObjAfter().getTableName(); - AlterTableDesc renameTableDesc = new AlterTableDesc(oldName, newName, false); - Task<DDLWork> renameTableTask = TaskFactory.get(new DDLWork(inputs, outputs, renameTableDesc), conf); - if (precursor != null){ - precursor.addDependentTask(renameTableTask); - } - List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>(); - tasks.add(renameTableTask); - LOG.debug("Added rename table task : {}:{}->{}", renameTableTask.getId(), oldName, newName); - dbsUpdated.put(newDbName, dmd.getEventTo()); // oldDbName and newDbName *will* be the same if we're here - tablesUpdated.remove(oldName); - tablesUpdated.put(newName, dmd.getEventTo()); - // Note : edge-case here in interaction with table-level REPL LOAD, where that nukes out tablesUpdated - // However, we explicitly don't support repl of that sort, and error out above if so. If that should - // ever change, this will need reworking. - return tasks; - } catch (Exception e) { - if (!(e instanceof SemanticException)){ - throw new SemanticException("Error reading message members", e); - } else { - throw (SemanticException)e; - } - } - } - case EVENT_TRUNCATE_TABLE: { - AlterTableMessage truncateTableMessage = md.getAlterTableMessage(dmd.getPayload()); - String actualDbName = ((dbName == null) || dbName.isEmpty() ? truncateTableMessage.getDB() : dbName); - String actualTblName = ((tblName == null) || tblName.isEmpty() ? truncateTableMessage.getTable() : tblName); - - TruncateTableDesc truncateTableDesc = new TruncateTableDesc( - actualDbName + "." + actualTblName, null); - Task<DDLWork> truncateTableTask = TaskFactory.get(new DDLWork(inputs, outputs, truncateTableDesc), conf); - if (precursor != null) { - precursor.addDependentTask(truncateTableTask); - } - - List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>(); - tasks.add(truncateTableTask); - LOG.debug("Added truncate tbl task : {}:{}", truncateTableTask.getId(), truncateTableDesc.getTableName()); - dbsUpdated.put(actualDbName,dmd.getEventTo()); - return tasks; - } - case EVENT_ALTER_PARTITION: { - return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); - } - case EVENT_RENAME_PARTITION: { - AlterPartitionMessage renamePtnMessage = md.getAlterPartitionMessage(dmd.getPayload()); - String actualDbName = ((dbName == null) || dbName.isEmpty() ? renamePtnMessage.getDB() : dbName); - String actualTblName = ((tblName == null) || tblName.isEmpty() ? renamePtnMessage.getTable() : tblName); - - Map<String, String> newPartSpec = new LinkedHashMap<String,String>(); - Map<String, String> oldPartSpec = new LinkedHashMap<String,String>(); - String tableName = actualDbName + "." + actualTblName; - try { - org.apache.hadoop.hive.metastore.api.Table tblObj = renamePtnMessage.getTableObj(); - org.apache.hadoop.hive.metastore.api.Partition pobjBefore = renamePtnMessage.getPtnObjBefore(); - org.apache.hadoop.hive.metastore.api.Partition pobjAfter = renamePtnMessage.getPtnObjAfter(); - Iterator<String> beforeValIter = pobjBefore.getValuesIterator(); - Iterator<String> afterValIter = pobjAfter.getValuesIterator(); - for (FieldSchema fs : tblObj.getPartitionKeys()){ - oldPartSpec.put(fs.getName(), beforeValIter.next()); - newPartSpec.put(fs.getName(), afterValIter.next()); - } - } catch (Exception e) { - if (!(e instanceof SemanticException)){ - throw new SemanticException("Error reading message members", e); - } else { - throw (SemanticException)e; - } - } - - RenamePartitionDesc renamePtnDesc = new RenamePartitionDesc(tableName, oldPartSpec, newPartSpec); - Task<DDLWork> renamePtnTask = TaskFactory.get(new DDLWork(inputs, outputs, renamePtnDesc), conf); - if (precursor != null){ - precursor.addDependentTask(renamePtnTask); - } - List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>(); - tasks.add(renamePtnTask); - LOG.debug("Added rename ptn task : {}:{}->{}", renamePtnTask.getId(), oldPartSpec, newPartSpec); - dbsUpdated.put(actualDbName, dmd.getEventTo()); - tablesUpdated.put(tableName, dmd.getEventTo()); - return tasks; - } - case EVENT_TRUNCATE_PARTITION: { - AlterPartitionMessage truncatePtnMessage = md.getAlterPartitionMessage(dmd.getPayload()); - String actualDbName = ((dbName == null) || dbName.isEmpty() ? truncatePtnMessage.getDB() : dbName); - String actualTblName = ((tblName == null) || tblName.isEmpty() ? truncatePtnMessage.getTable() : tblName); - - Map<String, String> partSpec = new LinkedHashMap<String,String>(); - try { - org.apache.hadoop.hive.metastore.api.Table tblObj = truncatePtnMessage.getTableObj(); - org.apache.hadoop.hive.metastore.api.Partition pobjAfter = truncatePtnMessage.getPtnObjAfter(); - Iterator<String> afterValIter = pobjAfter.getValuesIterator(); - for (FieldSchema fs : tblObj.getPartitionKeys()){ - partSpec.put(fs.getName(), afterValIter.next()); - } - } catch (Exception e) { - if (!(e instanceof SemanticException)){ - throw new SemanticException("Error reading message members", e); - } else { - throw (SemanticException)e; - } - } - - TruncateTableDesc truncateTableDesc = new TruncateTableDesc( - actualDbName + "." + actualTblName, partSpec); - Task<DDLWork> truncatePtnTask = TaskFactory.get(new DDLWork(inputs, outputs, truncateTableDesc), conf); - if (precursor != null) { - precursor.addDependentTask(truncatePtnTask); - } - - List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>(); - tasks.add(truncatePtnTask); - LOG.debug("Added truncate ptn task : {}:{}", truncatePtnTask.getId(), truncateTableDesc.getTableName()); - dbsUpdated.put(actualDbName,dmd.getEventTo()); - return tasks; - } - case EVENT_INSERT: { - md = MessageFactory.getInstance().getDeserializer(); - InsertMessage insertMessage = md.getInsertMessage(dmd.getPayload()); - String actualDbName = ((dbName == null) || dbName.isEmpty() ? insertMessage.getDB() : dbName); - String actualTblName = ((tblName == null) || tblName.isEmpty() ? insertMessage.getTable() : tblName); - - // Piggybacking in Import logic for now - return analyzeTableLoad(actualDbName, actualTblName, locn, precursor, dbsUpdated, tablesUpdated); - } - case EVENT_UNKNOWN: { - break; - } - default: { - break; - } - } - return null; - } - - private Map<Integer, List<ExprNodeGenericFuncDesc>> genPartSpecs(Table table, - List<Map<String, String>> partitions) throws SemanticException { - Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs = - new HashMap<Integer, List<ExprNodeGenericFuncDesc>>(); - int partPrefixLength = 0; - if ((partitions != null) && (partitions.size() > 0)) { - partPrefixLength = partitions.get(0).size(); - // pick the length of the first ptn, we expect all ptns listed to have the same number of - // key-vals. - } - List<ExprNodeGenericFuncDesc> ptnDescs = new ArrayList<ExprNodeGenericFuncDesc>(); - for (Map<String, String> ptn : partitions) { - // convert each key-value-map to appropriate expression. - ExprNodeGenericFuncDesc expr = null; - for (Map.Entry<String, String> kvp : ptn.entrySet()) { - String key = kvp.getKey(); - Object val = kvp.getValue(); - String type = table.getPartColByName(key).getType(); - ; - PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type); - ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true); - ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate( - "=", column, new ExprNodeConstantDesc(pti, val)); - expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op); - } - if (expr != null) { - ptnDescs.add(expr); + String dbName, String tblName, String locn, Task<? extends Serializable> precursor, + Map<String, Long> dbsUpdated, Map<String, Long> tablesUpdated, DumpMetaData dmd) + throws SemanticException { + MessageHandler.Context context = + new MessageHandler.Context(dbName, tblName, locn, precursor, dmd, conf, db, ctx, LOG); + MessageHandler messageHandler = MessageHandlerFactory.handlerFor(dmd.getDumpType()); + List<Task<? extends Serializable>> tasks = messageHandler.handle(context); + + if (precursor != null) { + for (Task<? extends Serializable> t : tasks) { + precursor.addDependentTask(t); + LOG.debug("Added {}:{} as a precursor of {}:{}", + precursor.getClass(), precursor.getId(), t.getClass(), t.getId()); } } - if (ptnDescs.size() > 0) { - partSpecs.put(partPrefixLength, ptnDescs); - } - return partSpecs; + dbsUpdated.putAll(messageHandler.databasesUpdated()); + tablesUpdated.putAll(messageHandler.tablesUpdated()); + inputs.addAll(messageHandler.readEntities()); + outputs.addAll(messageHandler.writeEntities()); + return tasks; } private boolean existEmptyDb(String dbName) throws InvalidOperationException, HiveException { http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java index b1df5a3..c2cffaa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java @@ -31,6 +31,7 @@ public enum DumpType { EVENT_RENAME_PARTITION("EVENT_RENAME_PARTITION"), EVENT_TRUNCATE_PARTITION("EVENT_TRUNCATE_PARTITION"), EVENT_INSERT("EVENT_INSERT"), + EVENT_CREATE_FUNCTION("EVENT_CREATE_FUNCTION"), EVENT_UNKNOWN("EVENT_UNKNOWN"); String type = null; http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractHandler.java new file mode 100644 index 0000000..ba699e3 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractHandler.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class AbstractHandler implements EventHandler { + static final Logger LOG = LoggerFactory.getLogger(AbstractHandler.class); + + final NotificationEvent event; + final MessageDeserializer deserializer; + + AbstractHandler(NotificationEvent event) { + this.event = event; + deserializer = MessageFactory.getInstance().getDeserializer(); + } + + @Override + public long fromEventId() { + return event.getEventId(); + } + + @Override + public long toEventId() { + return event.getEventId(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java new file mode 100644 index 0000000..f4239e5 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.PartitionFiles; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; + +import javax.annotation.Nullable; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.Iterator; + +import org.apache.hadoop.hive.ql.parse.repl.DumpType; + +class AddPartitionHandler extends AbstractHandler { + protected AddPartitionHandler(NotificationEvent notificationEvent) { + super(notificationEvent); + } + + @Override + public void handle(Context withinContext) throws Exception { + AddPartitionMessage apm = deserializer.getAddPartitionMessage(event.getMessage()); + LOG.info("Processing#{} ADD_PARTITION message : {}", fromEventId(), event.getMessage()); + Iterable<org.apache.hadoop.hive.metastore.api.Partition> ptns = apm.getPartitionObjs(); + if ((ptns == null) || (!ptns.iterator().hasNext())) { + LOG.debug("Event#{} was an ADD_PTN_EVENT with no partitions"); + return; + } + org.apache.hadoop.hive.metastore.api.Table tobj = apm.getTableObj(); + if (tobj == null) { + LOG.debug("Event#{} was a ADD_PTN_EVENT with no table listed"); + return; + } + + final Table qlMdTable = new Table(tobj); + Iterable<Partition> qlPtns = Iterables.transform( + ptns, + new Function<org.apache.hadoop.hive.metastore.api.Partition, Partition>() { + @Nullable + @Override + public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition input) { + if (input == null) { + return null; + } + try { + return new Partition(qlMdTable, input); + } catch (HiveException e) { + throw new IllegalArgumentException(e); + } + } + } + ); + + Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); + EximUtil.createExportDump( + metaDataPath.getFileSystem(withinContext.hiveConf), + metaDataPath, + qlMdTable, + qlPtns, + withinContext.replicationSpec); + + Iterator<PartitionFiles> partitionFilesIter = apm.getPartitionFilesIter().iterator(); + for (Partition qlPtn : qlPtns) { + Iterable<String> files = partitionFilesIter.next().getFiles(); + if (files != null) { + // encoded filename/checksum of files, write into _files + try (BufferedWriter fileListWriter = writer(withinContext, qlPtn)) { + for (String file : files) { + fileListWriter.write(file + "\n"); + } + } + } + } + withinContext.createDmd(this).write(); + } + + private BufferedWriter writer(Context withinContext, Partition qlPtn) + throws IOException { + Path ptnDataPath = new Path(withinContext.eventRoot, qlPtn.getName()); + FileSystem fs = ptnDataPath.getFileSystem(withinContext.hiveConf); + Path filesPath = new Path(ptnDataPath, EximUtil.FILES_NAME); + return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); + } + + @Override + public DumpType dumpType() { + return DumpType.EVENT_ADD_PARTITION; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java new file mode 100644 index 0000000..8a7e742 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hive.ql.parse.repl.DumpType; + +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; + +class AlterPartitionHandler extends AbstractHandler { + private final org.apache.hadoop.hive.metastore.api.Partition after; + private final org.apache.hadoop.hive.metastore.api.Table tableObject; + private final boolean isTruncateOp; + private final Scenario scenario; + + AlterPartitionHandler(NotificationEvent event) throws Exception { + super(event); + AlterPartitionMessage apm = deserializer.getAlterPartitionMessage(event.getMessage()); + tableObject = apm.getTableObj(); + org.apache.hadoop.hive.metastore.api.Partition before = apm.getPtnObjBefore(); + after = apm.getPtnObjAfter(); + isTruncateOp = apm.getIsTruncateOp(); + scenario = scenarioType(before, after); + } + + private enum Scenario { + ALTER { + @Override + DumpType dumpType() { + return DumpType.EVENT_ALTER_PARTITION; + } + }, + RENAME { + @Override + DumpType dumpType() { + return DumpType.EVENT_RENAME_PARTITION; + } + }, + TRUNCATE { + @Override + DumpType dumpType() { + return DumpType.EVENT_TRUNCATE_PARTITION; + } + }; + + abstract DumpType dumpType(); + } + + private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Partition before, + org.apache.hadoop.hive.metastore.api.Partition after) { + Iterator<String> beforeValIter = before.getValuesIterator(); + Iterator<String> afterValIter = after.getValuesIterator(); + while(beforeValIter.hasNext()) { + if (!beforeValIter.next().equals(afterValIter.next())) { + return Scenario.RENAME; + } + } + return isTruncateOp ? Scenario.TRUNCATE : Scenario.ALTER; + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Processing#{} ALTER_PARTITION message : {}", fromEventId(), event.getMessage()); + + if (Scenario.ALTER == scenario) { + withinContext.replicationSpec.setIsMetadataOnly(true); + Table qlMdTable = new Table(tableObject); + List<Partition> partitions = new ArrayList<>(); + partitions.add(new Partition(qlMdTable, after)); + Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); + EximUtil.createExportDump( + metaDataPath.getFileSystem(withinContext.hiveConf), + metaDataPath, + qlMdTable, + partitions, + withinContext.replicationSpec); + } + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); + } + + @Override + public DumpType dumpType() { + return scenario.dumpType(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java new file mode 100644 index 0000000..f457f23 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; + +import org.apache.hadoop.hive.ql.parse.repl.DumpType; + +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; + +class AlterTableHandler extends AbstractHandler { + private final org.apache.hadoop.hive.metastore.api.Table after; + private final boolean isTruncateOp; + private final Scenario scenario; + + private enum Scenario { + ALTER { + @Override + DumpType dumpType() { + return DumpType.EVENT_ALTER_TABLE; + } + }, + RENAME { + @Override + DumpType dumpType() { + return DumpType.EVENT_RENAME_TABLE; + } + }, + TRUNCATE { + @Override + DumpType dumpType() { + return DumpType.EVENT_TRUNCATE_TABLE; + } + }; + + abstract DumpType dumpType(); + } + + AlterTableHandler(NotificationEvent event) throws Exception { + super(event); + AlterTableMessage atm = deserializer.getAlterTableMessage(event.getMessage()); + org.apache.hadoop.hive.metastore.api.Table before = atm.getTableObjBefore(); + after = atm.getTableObjAfter(); + isTruncateOp = atm.getIsTruncateOp(); + scenario = scenarioType(before, after); + } + + private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Table before, + org.apache.hadoop.hive.metastore.api.Table after) { + if (before.getDbName().equals(after.getDbName()) + && before.getTableName().equals(after.getTableName())) { + return isTruncateOp ? Scenario.TRUNCATE : Scenario.ALTER; + } else { + return Scenario.RENAME; + } + } + + @Override + public void handle(Context withinContext) throws Exception { + { + LOG.info("Processing#{} ALTER_TABLE message : {}", fromEventId(), event.getMessage()); + if (Scenario.ALTER == scenario) { + withinContext.replicationSpec.setIsMetadataOnly(true); + Table qlMdTableAfter = new Table(after); + Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); + EximUtil.createExportDump( + metaDataPath.getFileSystem(withinContext.hiveConf), + metaDataPath, + qlMdTableAfter, + null, + withinContext.replicationSpec); + } + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); + } + } + + @Override + public DumpType dumpType() { + return scenario.dumpType(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java new file mode 100644 index 0000000..bebf035 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java @@ -0,0 +1,36 @@ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer; +import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; + +import org.apache.hadoop.hive.ql.parse.repl.DumpType; + +class CreateFunctionHandler extends AbstractHandler { + CreateFunctionHandler(NotificationEvent event) { + super(event); + } + + @Override + public void handle(Context withinContext) throws Exception { + CreateFunctionMessage createFunctionMessage = + deserializer.getCreateFunctionMessage(event.getMessage()); + LOG.info("Processing#{} CREATE_MESSAGE message : {}", fromEventId(), event.getMessage()); + Path metadataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); + FileSystem fileSystem = metadataPath.getFileSystem(withinContext.hiveConf); + + try (JsonWriter jsonWriter = new JsonWriter(fileSystem, metadataPath)) { + new FunctionSerializer(createFunctionMessage.getFunctionObj()) + .writeTo(jsonWriter, withinContext.replicationSpec); + } + } + + @Override + public DumpType dumpType() { + return DumpType.EVENT_CREATE_FUNCTION; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java new file mode 100644 index 0000000..ca3607f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; + +import org.apache.hadoop.hive.ql.parse.repl.DumpType; + +class CreateTableHandler extends AbstractHandler { + + CreateTableHandler(NotificationEvent event) { + super(event); + } + + @Override + public void handle(Context withinContext) throws Exception { + CreateTableMessage ctm = deserializer.getCreateTableMessage(event.getMessage()); + LOG.info("Processing#{} CREATE_TABLE message : {}", fromEventId(), event.getMessage()); + org.apache.hadoop.hive.metastore.api.Table tobj = ctm.getTableObj(); + + if (tobj == null) { + LOG.debug("Event#{} was a CREATE_TABLE_EVENT with no table listed"); + return; + } + + Table qlMdTable = new Table(tobj); + if (qlMdTable.isView()) { + withinContext.replicationSpec.setIsMetadataOnly(true); + } + + Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); + EximUtil.createExportDump( + metaDataPath.getFileSystem(withinContext.hiveConf), + metaDataPath, + qlMdTable, + null, + withinContext.replicationSpec); + + Path dataPath = new Path(withinContext.eventRoot, "data"); + Iterable<String> files = ctm.getFiles(); + if (files != null) { + // encoded filename/checksum of files, write into _files + try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { + for (String file : files) { + fileListWriter.write(file + "\n"); + } + } + } + withinContext.createDmd(this).write(); + } + + private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException { + FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf); + Path filesPath = new Path(dataPath, EximUtil.FILES_NAME); + return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); + } + + @Override + public DumpType dumpType() { + return DumpType.EVENT_CREATE_TABLE; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java new file mode 100644 index 0000000..0d4665a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +import org.apache.hadoop.hive.ql.parse.repl.DumpType; + +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; + +class DefaultHandler extends AbstractHandler { + + DefaultHandler(NotificationEvent event) { + super(event); + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Dummy processing#{} message : {}", fromEventId(), event.getMessage()); + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); + } + + @Override + public DumpType dumpType() { + return DumpType.EVENT_UNKNOWN; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java new file mode 100644 index 0000000..a4eacc4 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +import org.apache.hadoop.hive.ql.parse.repl.DumpType; + +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; + +class DropPartitionHandler extends AbstractHandler { + + DropPartitionHandler(NotificationEvent event) { + super(event); + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Processing#{} DROP_PARTITION message : {}", fromEventId(), event.getMessage()); + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); + } + + @Override + public DumpType dumpType() { + return DumpType.EVENT_DROP_PARTITION; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java new file mode 100644 index 0000000..40cd5cb --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +import org.apache.hadoop.hive.ql.parse.repl.DumpType; + +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; + +class DropTableHandler extends AbstractHandler { + + DropTableHandler(NotificationEvent event) { + super(event); + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), event.getMessage()); + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); + } + + @Override + public DumpType dumpType() { + return DumpType.EVENT_DROP_TABLE; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java new file mode 100644 index 0000000..c0fa7b2 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; + +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; + +public interface EventHandler { + void handle(Context withinContext) throws Exception; + + long fromEventId(); + + long toEventId(); + + DumpType dumpType(); + + class Context { + final Path eventRoot, cmRoot; + final Hive db; + final HiveConf hiveConf; + final ReplicationSpec replicationSpec; + + public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf, + ReplicationSpec replicationSpec) { + this.eventRoot = eventRoot; + this.cmRoot = cmRoot; + this.db = db; + this.hiveConf = hiveConf; + this.replicationSpec = replicationSpec; + } + + DumpMetaData createDmd(EventHandler eventHandler) { + return new DumpMetaData( + eventRoot, + eventHandler.dumpType(), + eventHandler.fromEventId(), + eventHandler.toEventId(), + cmRoot, hiveConf + ); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java new file mode 100644 index 0000000..08dbd13 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Modifier; +import java.util.HashMap; +import java.util.Map; + +public class EventHandlerFactory { + private EventHandlerFactory() { + } + + private static Map<String, Class<? extends EventHandler>> registeredHandlers = new HashMap<>(); + + static { + register(MessageFactory.ADD_PARTITION_EVENT, AddPartitionHandler.class); + register(MessageFactory.ALTER_PARTITION_EVENT, AlterPartitionHandler.class); + register(MessageFactory.ALTER_TABLE_EVENT, AlterTableHandler.class); + register(MessageFactory.CREATE_FUNCTION_EVENT, CreateFunctionHandler.class); + register(MessageFactory.CREATE_TABLE_EVENT, CreateTableHandler.class); + register(MessageFactory.DROP_PARTITION_EVENT, DropPartitionHandler.class); + register(MessageFactory.DROP_TABLE_EVENT, DropTableHandler.class); + register(MessageFactory.INSERT_EVENT, InsertHandler.class); + } + + static void register(String event, Class<? extends EventHandler> handlerClazz) { + try { + Constructor<? extends EventHandler> constructor = + handlerClazz.getDeclaredConstructor(NotificationEvent.class); + assert constructor != null; + assert !Modifier.isPrivate(constructor.getModifiers()); + registeredHandlers.put(event, handlerClazz); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("handler class: " + handlerClazz.getCanonicalName() + + " does not have the a constructor with only parameter of type:" + + NotificationEvent.class.getCanonicalName(), e); + } + } + + public static EventHandler handlerFor(NotificationEvent event) { + if (registeredHandlers.containsKey(event.getEventType())) { + Class<? extends EventHandler> handlerClazz = registeredHandlers.get(event.getEventType()); + try { + Constructor<? extends EventHandler> constructor = + handlerClazz.getDeclaredConstructor(NotificationEvent.class); + return constructor.newInstance(event); + } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) { + // this should never happen. however we want to make sure we propagate the exception + throw new RuntimeException( + "failed when creating handler for " + event.getEventType() + + " with the responsible class being " + handlerClazz.getCanonicalName(), e); + } + } + return new DefaultHandler(event); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java new file mode 100644 index 0000000..0393701 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.InsertMessage; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.thrift.TException; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.ql.parse.repl.DumpType; + +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; + +class InsertHandler extends AbstractHandler { + + InsertHandler(NotificationEvent event) { + super(event); + } + + @Override + public void handle(Context withinContext) throws Exception { + InsertMessage insertMsg = deserializer.getInsertMessage(event.getMessage()); + org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(withinContext, insertMsg); + Map<String, String> partSpec = insertMsg.getPartitionKeyValues(); + List<Partition> qlPtns = null; + if (qlMdTable.isPartitioned() && !partSpec.isEmpty()) { + qlPtns = Collections.singletonList(withinContext.db.getPartition(qlMdTable, partSpec, false)); + } + Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); + + // Mark the replace type based on INSERT-INTO or INSERT_OVERWRITE operation + withinContext.replicationSpec.setIsReplace(insertMsg.isReplace()); + EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath, + qlMdTable, qlPtns, + withinContext.replicationSpec); + Iterable<String> files = insertMsg.getFiles(); + + if (files != null) { + Path dataPath; + if ((null == qlPtns) || qlPtns.isEmpty()) { + dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); + } else { + /* + * Insert into/overwrite operation shall operate on one or more partitions or even partitions from multiple + * tables. But, Insert event is generated for each partition to which the data is inserted. So, qlPtns list + * will have only one entry. + */ + assert(1 == qlPtns.size()); + dataPath = new Path(withinContext.eventRoot, qlPtns.get(0).getName()); + } + + // encoded filename/checksum of files, write into _files + try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { + for (String file : files) { + fileListWriter.write(file + "\n"); + } + } + } + + LOG.info("Processing#{} INSERT message : {}", fromEventId(), event.getMessage()); + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); + } + + private org.apache.hadoop.hive.ql.metadata.Table tableObject( + Context withinContext, InsertMessage insertMsg) throws TException { + return new org.apache.hadoop.hive.ql.metadata.Table( + withinContext.db.getMSC().getTable( + insertMsg.getDB(), insertMsg.getTable() + ) + ); + } + + private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException { + Path filesPath = new Path(dataPath, EximUtil.FILES_NAME); + FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf); + return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); + } + + @Override + public DumpType dumpType() { + return DumpType.EVENT_INSERT; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java deleted file mode 100644 index ab059c2..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AbstractHandler.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.parse.repl.events; - -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; -import org.apache.hadoop.hive.metastore.messaging.MessageFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class AbstractHandler implements EventHandler { - static final Logger LOG = LoggerFactory.getLogger(AbstractHandler.class); - - final NotificationEvent event; - final MessageDeserializer deserializer; - - AbstractHandler(NotificationEvent event) { - this.event = event; - deserializer = MessageFactory.getInstance().getDeserializer(); - } - - @Override - public long fromEventId() { - return event.getEventId(); - } - - @Override - public long toEventId() { - return event.getEventId(); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java deleted file mode 100644 index 1616ab9..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AddPartitionHandler.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.parse.repl.events; - -import com.google.common.base.Function; -import com.google.common.collect.Iterables; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; -import org.apache.hadoop.hive.metastore.messaging.PartitionFiles; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.EximUtil; - -import javax.annotation.Nullable; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.util.Iterator; - -import org.apache.hadoop.hive.ql.parse.repl.DumpType; - -public class AddPartitionHandler extends AbstractHandler { - protected AddPartitionHandler(NotificationEvent notificationEvent) { - super(notificationEvent); - } - - @Override - public void handle(Context withinContext) throws Exception { - AddPartitionMessage apm = deserializer.getAddPartitionMessage(event.getMessage()); - LOG.info("Processing#{} ADD_PARTITION message : {}", fromEventId(), event.getMessage()); - Iterable<org.apache.hadoop.hive.metastore.api.Partition> ptns = apm.getPartitionObjs(); - if ((ptns == null) || (!ptns.iterator().hasNext())) { - LOG.debug("Event#{} was an ADD_PTN_EVENT with no partitions"); - return; - } - org.apache.hadoop.hive.metastore.api.Table tobj = apm.getTableObj(); - if (tobj == null) { - LOG.debug("Event#{} was a ADD_PTN_EVENT with no table listed"); - return; - } - - final Table qlMdTable = new Table(tobj); - Iterable<Partition> qlPtns = Iterables.transform( - ptns, - new Function<org.apache.hadoop.hive.metastore.api.Partition, Partition>() { - @Nullable - @Override - public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition input) { - if (input == null) { - return null; - } - try { - return new Partition(qlMdTable, input); - } catch (HiveException e) { - throw new IllegalArgumentException(e); - } - } - } - ); - - Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); - EximUtil.createExportDump( - metaDataPath.getFileSystem(withinContext.hiveConf), - metaDataPath, - qlMdTable, - qlPtns, - withinContext.replicationSpec); - - Iterator<PartitionFiles> partitionFilesIter = apm.getPartitionFilesIter().iterator(); - for (Partition qlPtn : qlPtns) { - Iterable<String> files = partitionFilesIter.next().getFiles(); - if (files != null) { - // encoded filename/checksum of files, write into _files - try (BufferedWriter fileListWriter = writer(withinContext, qlPtn)) { - for (String file : files) { - fileListWriter.write(file + "\n"); - } - } - } - } - withinContext.createDmd(this).write(); - } - - private BufferedWriter writer(Context withinContext, Partition qlPtn) - throws IOException { - Path ptnDataPath = new Path(withinContext.eventRoot, qlPtn.getName()); - FileSystem fs = ptnDataPath.getFileSystem(withinContext.hiveConf); - Path filesPath = new Path(ptnDataPath, EximUtil.FILES_NAME); - return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); - } - - @Override - public DumpType dumpType() { - return DumpType.EVENT_ADD_PARTITION; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java deleted file mode 100644 index b6c3496..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterPartitionHandler.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.parse.repl.events; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; -import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.EximUtil; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.apache.hadoop.hive.ql.parse.repl.DumpType; - -import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; - -public class AlterPartitionHandler extends AbstractHandler { - private final org.apache.hadoop.hive.metastore.api.Partition after; - private final org.apache.hadoop.hive.metastore.api.Table tableObject; - private final boolean isTruncateOp; - private final Scenario scenario; - - AlterPartitionHandler(NotificationEvent event) throws Exception { - super(event); - AlterPartitionMessage apm = deserializer.getAlterPartitionMessage(event.getMessage()); - tableObject = apm.getTableObj(); - org.apache.hadoop.hive.metastore.api.Partition before = apm.getPtnObjBefore(); - after = apm.getPtnObjAfter(); - isTruncateOp = apm.getIsTruncateOp(); - scenario = scenarioType(before, after); - } - - private enum Scenario { - ALTER { - @Override - DumpType dumpType() { - return DumpType.EVENT_ALTER_PARTITION; - } - }, - RENAME { - @Override - DumpType dumpType() { - return DumpType.EVENT_RENAME_PARTITION; - } - }, - TRUNCATE { - @Override - DumpType dumpType() { - return DumpType.EVENT_TRUNCATE_PARTITION; - } - }; - - abstract DumpType dumpType(); - } - - private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Partition before, - org.apache.hadoop.hive.metastore.api.Partition after) { - Iterator<String> beforeValIter = before.getValuesIterator(); - Iterator<String> afterValIter = after.getValuesIterator(); - while(beforeValIter.hasNext()) { - if (!beforeValIter.next().equals(afterValIter.next())) { - return Scenario.RENAME; - } - } - return isTruncateOp ? Scenario.TRUNCATE : Scenario.ALTER; - } - - @Override - public void handle(Context withinContext) throws Exception { - LOG.info("Processing#{} ALTER_PARTITION message : {}", fromEventId(), event.getMessage()); - - if (Scenario.ALTER == scenario) { - withinContext.replicationSpec.setIsMetadataOnly(true); - Table qlMdTable = new Table(tableObject); - List<Partition> partitions = new ArrayList<>(); - partitions.add(new Partition(qlMdTable, after)); - Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); - EximUtil.createExportDump( - metaDataPath.getFileSystem(withinContext.hiveConf), - metaDataPath, - qlMdTable, - partitions, - withinContext.replicationSpec); - } - DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); - dmd.write(); - } - - @Override - public DumpType dumpType() { - return scenario.dumpType(); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java deleted file mode 100644 index d553240..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/AlterTableHandler.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.parse.repl.events; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; -import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.EximUtil; - -import org.apache.hadoop.hive.ql.parse.repl.DumpType; - -import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; - -public class AlterTableHandler extends AbstractHandler { - private final org.apache.hadoop.hive.metastore.api.Table after; - private final boolean isTruncateOp; - private final Scenario scenario; - - private enum Scenario { - ALTER { - @Override - DumpType dumpType() { - return DumpType.EVENT_ALTER_TABLE; - } - }, - RENAME { - @Override - DumpType dumpType() { - return DumpType.EVENT_RENAME_TABLE; - } - }, - TRUNCATE { - @Override - DumpType dumpType() { - return DumpType.EVENT_TRUNCATE_TABLE; - } - }; - - abstract DumpType dumpType(); - } - - AlterTableHandler(NotificationEvent event) throws Exception { - super(event); - AlterTableMessage atm = deserializer.getAlterTableMessage(event.getMessage()); - org.apache.hadoop.hive.metastore.api.Table before = atm.getTableObjBefore(); - after = atm.getTableObjAfter(); - isTruncateOp = atm.getIsTruncateOp(); - scenario = scenarioType(before, after); - } - - private Scenario scenarioType(org.apache.hadoop.hive.metastore.api.Table before, - org.apache.hadoop.hive.metastore.api.Table after) { - if (before.getDbName().equals(after.getDbName()) - && before.getTableName().equals(after.getTableName())) { - return isTruncateOp ? Scenario.TRUNCATE : Scenario.ALTER; - } else { - return Scenario.RENAME; - } - } - - @Override - public void handle(Context withinContext) throws Exception { - { - LOG.info("Processing#{} ALTER_TABLE message : {}", fromEventId(), event.getMessage()); - if (Scenario.ALTER == scenario) { - withinContext.replicationSpec.setIsMetadataOnly(true); - Table qlMdTableAfter = new Table(after); - Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); - EximUtil.createExportDump( - metaDataPath.getFileSystem(withinContext.hiveConf), - metaDataPath, - qlMdTableAfter, - null, - withinContext.replicationSpec); - } - DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); - dmd.write(); - } - } - - @Override - public DumpType dumpType() { - return scenario.dumpType(); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java deleted file mode 100644 index 88600fd..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/CreateTableHandler.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.parse.repl.events; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; -import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.EximUtil; - -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.OutputStreamWriter; - -import org.apache.hadoop.hive.ql.parse.repl.DumpType; - -public class CreateTableHandler extends AbstractHandler { - - CreateTableHandler(NotificationEvent event) { - super(event); - } - - @Override - public void handle(Context withinContext) throws Exception { - CreateTableMessage ctm = deserializer.getCreateTableMessage(event.getMessage()); - LOG.info("Processing#{} CREATE_TABLE message : {}", fromEventId(), event.getMessage()); - org.apache.hadoop.hive.metastore.api.Table tobj = ctm.getTableObj(); - - if (tobj == null) { - LOG.debug("Event#{} was a CREATE_TABLE_EVENT with no table listed"); - return; - } - - Table qlMdTable = new Table(tobj); - if (qlMdTable.isView()) { - withinContext.replicationSpec.setIsMetadataOnly(true); - } - - Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); - EximUtil.createExportDump( - metaDataPath.getFileSystem(withinContext.hiveConf), - metaDataPath, - qlMdTable, - null, - withinContext.replicationSpec); - - Path dataPath = new Path(withinContext.eventRoot, "data"); - Iterable<String> files = ctm.getFiles(); - if (files != null) { - // encoded filename/checksum of files, write into _files - try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { - for (String file : files) { - fileListWriter.write(file + "\n"); - } - } - } - withinContext.createDmd(this).write(); - } - - private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException { - FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf); - Path filesPath = new Path(dataPath, EximUtil.FILES_NAME); - return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); - } - - @Override - public DumpType dumpType() { - return DumpType.EVENT_CREATE_TABLE; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java deleted file mode 100644 index 78cd74f..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DefaultHandler.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.parse.repl.events; - -import org.apache.hadoop.hive.metastore.api.NotificationEvent; - -import org.apache.hadoop.hive.ql.parse.repl.DumpType; - -import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; - -public class DefaultHandler extends AbstractHandler { - - DefaultHandler(NotificationEvent event) { - super(event); - } - - @Override - public void handle(Context withinContext) throws Exception { - LOG.info("Dummy processing#{} message : {}", fromEventId(), event.getMessage()); - DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); - dmd.write(); - } - - @Override - public DumpType dumpType() { - return DumpType.EVENT_UNKNOWN; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java deleted file mode 100644 index c4a0908..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropPartitionHandler.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.parse.repl.events; - -import org.apache.hadoop.hive.metastore.api.NotificationEvent; - -import org.apache.hadoop.hive.ql.parse.repl.DumpType; - -import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; - -public class DropPartitionHandler extends AbstractHandler { - - DropPartitionHandler(NotificationEvent event) { - super(event); - } - - @Override - public void handle(Context withinContext) throws Exception { - LOG.info("Processing#{} DROP_PARTITION message : {}", fromEventId(), event.getMessage()); - DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); - dmd.write(); - } - - @Override - public DumpType dumpType() { - return DumpType.EVENT_DROP_PARTITION; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java deleted file mode 100644 index e3addaf..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/DropTableHandler.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.parse.repl.events; - -import org.apache.hadoop.hive.metastore.api.NotificationEvent; - -import org.apache.hadoop.hive.ql.parse.repl.DumpType; - -import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; - -public class DropTableHandler extends AbstractHandler { - - DropTableHandler(NotificationEvent event) { - super(event); - } - - @Override - public void handle(Context withinContext) throws Exception { - LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), event.getMessage()); - DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); - dmd.write(); - } - - @Override - public DumpType dumpType() { - return DumpType.EVENT_DROP_TABLE; - } -}
