Repository: hive Updated Branches: refs/heads/master 699d6ce36 -> 9d4f13afd
http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java deleted file mode 100644 index 29f3b42..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandler.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.parse.repl.events; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.parse.ReplicationSpec; - -import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -import org.apache.hadoop.hive.ql.parse.repl.DumpType; - -public interface EventHandler { - void handle(Context withinContext) throws Exception; - - long fromEventId(); - - long toEventId(); - - DumpType dumpType(); - - class Context { - final Path eventRoot, cmRoot; - final Hive db; - final HiveConf hiveConf; - final ReplicationSpec replicationSpec; - - public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf, - ReplicationSpec replicationSpec) { - this.eventRoot = eventRoot; - this.cmRoot = cmRoot; - this.db = db; - this.hiveConf = hiveConf; - this.replicationSpec = replicationSpec; - } - - DumpMetaData createDmd(EventHandler eventHandler) { - return new DumpMetaData( - eventRoot, - eventHandler.dumpType(), - eventHandler.fromEventId(), - eventHandler.toEventId(), - cmRoot, hiveConf - ); - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java deleted file mode 100644 index 53adea8..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/EventHandlerFactory.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.parse.repl.events; - -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.MessageFactory; - -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Modifier; -import java.util.HashMap; -import java.util.Map; - -public class EventHandlerFactory { - private EventHandlerFactory() { - } - - private static Map<String, Class<? extends EventHandler>> registeredHandlers = new HashMap<>(); - - static { - register(MessageFactory.ADD_PARTITION_EVENT, AddPartitionHandler.class); - register(MessageFactory.ALTER_PARTITION_EVENT, AlterPartitionHandler.class); - register(MessageFactory.ALTER_TABLE_EVENT, AlterTableHandler.class); - register(MessageFactory.CREATE_TABLE_EVENT, CreateTableHandler.class); - register(MessageFactory.DROP_PARTITION_EVENT, DropPartitionHandler.class); - register(MessageFactory.DROP_TABLE_EVENT, DropTableHandler.class); - register(MessageFactory.INSERT_EVENT, InsertHandler.class); - } - - static void register(String event, Class<? extends EventHandler> handlerClazz) { - try { - Constructor<? extends EventHandler> constructor = - handlerClazz.getDeclaredConstructor(NotificationEvent.class); - assert constructor != null; - assert !Modifier.isPrivate(constructor.getModifiers()); - registeredHandlers.put(event, handlerClazz); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException("handler class: " + handlerClazz.getCanonicalName() - + " does not have the a constructor with only parameter of type:" - + NotificationEvent.class.getCanonicalName(), e); - } - } - - public static EventHandler handlerFor(NotificationEvent event) { - if (registeredHandlers.containsKey(event.getEventType())) { - Class<? extends EventHandler> handlerClazz = registeredHandlers.get(event.getEventType()); - try { - Constructor<? extends EventHandler> constructor = - handlerClazz.getDeclaredConstructor(NotificationEvent.class); - return constructor.newInstance(event); - } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) { - // this should never happen. however we want to make sure we propagate the exception - throw new RuntimeException( - "failed when creating handler for " + event.getEventType() - + " with the responsible class being " + handlerClazz.getCanonicalName(), e); - } - } - return new DefaultHandler(event); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java deleted file mode 100644 index 910b396..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/events/InsertHandler.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.parse.repl.events; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.InsertMessage; -import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.parse.EximUtil; -import org.apache.thrift.TException; - -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.hive.ql.parse.repl.DumpType; - -import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; - -public class InsertHandler extends AbstractHandler { - - InsertHandler(NotificationEvent event) { - super(event); - } - - @Override - public void handle(Context withinContext) throws Exception { - InsertMessage insertMsg = deserializer.getInsertMessage(event.getMessage()); - org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(withinContext, insertMsg); - Map<String, String> partSpec = insertMsg.getPartitionKeyValues(); - List<Partition> qlPtns = null; - if (qlMdTable.isPartitioned() && !partSpec.isEmpty()) { - qlPtns = Collections.singletonList(withinContext.db.getPartition(qlMdTable, partSpec, false)); - } - Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); - - // Mark the replace type based on INSERT-INTO or INSERT_OVERWRITE operation - withinContext.replicationSpec.setIsReplace(insertMsg.isReplace()); - EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath, - qlMdTable, qlPtns, - withinContext.replicationSpec); - Iterable<String> files = insertMsg.getFiles(); - - if (files != null) { - Path dataPath; - if ((null == qlPtns) || qlPtns.isEmpty()) { - dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); - } else { - /* - * Insert into/overwrite operation shall operate on one or more partitions or even partitions from multiple - * tables. But, Insert event is generated for each partition to which the data is inserted. So, qlPtns list - * will have only one entry. - */ - assert(1 == qlPtns.size()); - dataPath = new Path(withinContext.eventRoot, qlPtns.get(0).getName()); - } - - // encoded filename/checksum of files, write into _files - try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { - for (String file : files) { - fileListWriter.write(file + "\n"); - } - } - } - - LOG.info("Processing#{} INSERT message : {}", fromEventId(), event.getMessage()); - DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); - dmd.write(); - } - - private org.apache.hadoop.hive.ql.metadata.Table tableObject( - Context withinContext, InsertMessage insertMsg) throws TException { - return new org.apache.hadoop.hive.ql.metadata.Table( - withinContext.db.getMSC().getTable( - insertMsg.getDB(), insertMsg.getTable() - ) - ); - } - - private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException { - Path filesPath = new Path(dataPath, EximUtil.FILES_NAME); - FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf); - return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); - } - - @Override - public DumpType dumpType() { - return DumpType.EVENT_INSERT; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java new file mode 100644 index 0000000..95e51e4 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbstractMessageHandler.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +abstract class AbstractMessageHandler implements MessageHandler { + final HashSet<ReadEntity> readEntitySet = new HashSet<>(); + final HashSet<WriteEntity> writeEntitySet = new HashSet<>(); + final Map<String, Long> tablesUpdated = new HashMap<>(), + databasesUpdated = new HashMap<>(); + final MessageDeserializer deserializer = MessageFactory.getInstance().getDeserializer(); + + @Override + public Set<ReadEntity> readEntities() { + return readEntitySet; + } + + @Override + public Set<WriteEntity> writeEntities() { + return writeEntitySet; + } + + @Override + public Map<String, Long> tablesUpdated() { + return tablesUpdated; + } + + @Override + public Map<String, Long> databasesUpdated() { + return databasesUpdated; + } + + ReplicationSpec eventOnlyReplicationSpec(Context forContext) throws SemanticException { + String eventId = forContext.dmd.getEventTo().toString(); + return replicationSpec(eventId, eventId); + } + + private ReplicationSpec replicationSpec(String fromId, String toId) throws SemanticException { + return new ReplicationSpec(true, false, fromId, toId, false, true, false); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DefaultHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DefaultHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DefaultHandler.java new file mode 100644 index 0000000..6d346b6 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DefaultHandler.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +class DefaultHandler extends AbstractMessageHandler { + @Override + public List<Task<? extends Serializable>> handle(Context withinContext) + throws SemanticException { + return new ArrayList<>(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java new file mode 100644 index 0000000..73f2613 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DropTableDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +class DropPartitionHandler extends AbstractMessageHandler { + @Override + public List<Task<? extends Serializable>> handle(Context context) + throws SemanticException { + try { + DropPartitionMessage msg = deserializer.getDropPartitionMessage(context.dmd.getPayload()); + String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; + String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName; + Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs = + genPartSpecs(new Table(msg.getTableObj()), + msg.getPartitions()); + if (partSpecs.size() > 0) { + DropTableDesc dropPtnDesc = new DropTableDesc(actualDbName + "." + actualTblName, + partSpecs, null, true, eventOnlyReplicationSpec(context)); + Task<DDLWork> dropPtnTask = TaskFactory.get( + new DDLWork(readEntitySet, writeEntitySet, dropPtnDesc), + context.hiveConf + ); + context.log.debug("Added drop ptn task : {}:{},{}", dropPtnTask.getId(), + dropPtnDesc.getTableName(), msg.getPartitions()); + databasesUpdated.put(actualDbName, context.dmd.getEventTo()); + tablesUpdated.put(actualDbName + "." + actualTblName, context.dmd.getEventTo()); + return Collections.singletonList(dropPtnTask); + } else { + throw new SemanticException( + "DROP PARTITION EVENT does not return any part descs for event message :" + + context.dmd.getPayload()); + } + } catch (Exception e) { + throw (e instanceof SemanticException) + ? (SemanticException) e + : new SemanticException("Error reading message members", e); + } + } + + private Map<Integer, List<ExprNodeGenericFuncDesc>> genPartSpecs(Table table, + List<Map<String, String>> partitions) throws SemanticException { + Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs = new HashMap<>(); + int partPrefixLength = 0; + if (partitions.size() > 0) { + partPrefixLength = partitions.get(0).size(); + // pick the length of the first ptn, we expect all ptns listed to have the same number of + // key-vals. + } + List<ExprNodeGenericFuncDesc> partitionDesc = new ArrayList<>(); + for (Map<String, String> ptn : partitions) { + // convert each key-value-map to appropriate expression. + ExprNodeGenericFuncDesc expr = null; + for (Map.Entry<String, String> kvp : ptn.entrySet()) { + String key = kvp.getKey(); + Object val = kvp.getValue(); + String type = table.getPartColByName(key).getType(); + PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type); + ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true); + ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate( + "=", column, new ExprNodeConstantDesc(pti, val)); + expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op); + } + if (expr != null) { + partitionDesc.add(expr); + } + } + if (partitionDesc.size() > 0) { + partSpecs.put(partPrefixLength, partitionDesc); + } + return partSpecs; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java new file mode 100644 index 0000000..b623f2f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DropTableDesc; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +class DropTableHandler extends AbstractMessageHandler { + @Override + public List<Task<? extends Serializable>> handle(Context context) + throws SemanticException { + DropTableMessage msg = deserializer.getDropTableMessage(context.dmd.getPayload()); + String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; + String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName; + DropTableDesc dropTableDesc = new DropTableDesc( + actualDbName + "." + actualTblName, + null, true, true, + eventOnlyReplicationSpec(context)); + Task<DDLWork> dropTableTask = TaskFactory.get( + new DDLWork(readEntitySet, writeEntitySet, dropTableDesc), + context.hiveConf + ); + context.log + .debug("Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName()); + databasesUpdated.put(actualDbName, context.dmd.getEventTo()); + return Collections.singletonList(dropTableTask); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java new file mode 100644 index 0000000..fa63169 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.messaging.InsertMessage; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.io.Serializable; +import java.util.List; + +class InsertHandler extends AbstractMessageHandler { + @Override + public List<Task<? extends Serializable>> handle(Context withinContext) + throws SemanticException { + InsertMessage insertMessage = deserializer.getInsertMessage(withinContext.dmd.getPayload()); + String actualDbName = + withinContext.isDbNameEmpty() ? insertMessage.getDB() : withinContext.dbName; + String actualTblName = + withinContext.isTableNameEmpty() ? insertMessage.getTable() : withinContext.tableName; + + Context currentContext = new Context(withinContext, actualDbName, actualTblName); + // Piggybacking in Import logic for now + TableHandler tableHandler = new TableHandler(); + List<Task<? extends Serializable>> tasks = tableHandler.handle(currentContext); + readEntitySet.addAll(tableHandler.readEntities()); + writeEntitySet.addAll(tableHandler.writeEntities()); + databasesUpdated.putAll(tableHandler.databasesUpdated); + tablesUpdated.putAll(tableHandler.tablesUpdated); + return tasks; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java new file mode 100644 index 0000000..840f95e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.slf4j.Logger; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; + +public interface MessageHandler { + + List<Task<? extends Serializable>> handle(Context withinContext) throws SemanticException; + + Set<ReadEntity> readEntities(); + + Set<WriteEntity> writeEntities(); + + Map<String, Long> tablesUpdated(); + + Map<String, Long> databasesUpdated(); + + class Context { + final String dbName, tableName, location; + final Task<? extends Serializable> precursor; + DumpMetaData dmd; + final HiveConf hiveConf; + final Hive db; + final org.apache.hadoop.hive.ql.Context nestedContext; + final Logger log; + + public Context(String dbName, String tableName, String location, + Task<? extends Serializable> precursor, DumpMetaData dmd, HiveConf hiveConf, + Hive db, org.apache.hadoop.hive.ql.Context nestedContext, Logger log) { + this.dbName = dbName; + this.tableName = tableName; + this.location = location; + this.precursor = precursor; + this.dmd = dmd; + this.hiveConf = hiveConf; + this.db = db; + this.nestedContext = nestedContext; + this.log = log; + } + + public Context(Context other, String dbName, String tableName) { + this.dbName = dbName; + this.tableName = tableName; + this.location = other.location; + this.precursor = other.precursor; + this.dmd = other.dmd; + this.hiveConf = other.hiveConf; + this.db = other.db; + this.nestedContext = other.nestedContext; + this.log = other.log; + } + + boolean isTableNameEmpty() { + return StringUtils.isEmpty(tableName); + } + + boolean isDbNameEmpty() { + return StringUtils.isEmpty(dbName); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandlerFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandlerFactory.java new file mode 100644 index 0000000..de6ff74 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandlerFactory.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Modifier; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.hive.ql.parse.repl.DumpType; + +public class MessageHandlerFactory { + private static Map<DumpType, Class<? extends MessageHandler>> messageHandlers = new HashMap<>(); + + static { + register(DumpType.EVENT_DROP_PARTITION, DropPartitionHandler.class); + register(DumpType.EVENT_DROP_TABLE, DropTableHandler.class); + register(DumpType.EVENT_INSERT, InsertHandler.class); + register(DumpType.EVENT_RENAME_PARTITION, RenamePartitionHandler.class); + register(DumpType.EVENT_RENAME_TABLE, RenameTableHandler.class); + + register(DumpType.EVENT_CREATE_TABLE, TableHandler.class); + register(DumpType.EVENT_ADD_PARTITION, TableHandler.class); + register(DumpType.EVENT_ALTER_TABLE, TableHandler.class); + register(DumpType.EVENT_ALTER_PARTITION, TableHandler.class); + + register(DumpType.EVENT_TRUNCATE_PARTITION, TruncatePartitionHandler.class); + register(DumpType.EVENT_TRUNCATE_TABLE, TruncateTableHandler.class); + } + + private static void register(DumpType eventType, Class<? extends MessageHandler> handlerClazz) { + try { + Constructor<? extends MessageHandler> constructor = + handlerClazz.getDeclaredConstructor(); + assert constructor != null; + assert !Modifier.isPrivate(constructor.getModifiers()); + messageHandlers.put(eventType, handlerClazz); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("handler class: " + handlerClazz.getCanonicalName() + + " does not have the a constructor with only parameter of type:" + + NotificationEvent.class.getCanonicalName(), e); + } + } + + public static MessageHandler handlerFor(DumpType eventType) { + if (messageHandlers.containsKey(eventType)) { + Class<? extends MessageHandler> handlerClazz = messageHandlers.get(eventType); + try { + Constructor<? extends MessageHandler> constructor = + handlerClazz.getDeclaredConstructor(); + return constructor.newInstance(); + } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) { + // this should never happen. however we want to make sure we propagate the exception + throw new RuntimeException( + "failed when creating handler for " + eventType + + " with the responsible class being " + handlerClazz.getCanonicalName(), e); + } + } + return new DefaultHandler(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java new file mode 100644 index 0000000..658f2ba --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc; + +import java.io.Serializable; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +class RenamePartitionHandler extends AbstractMessageHandler { + @Override + public List<Task<? extends Serializable>> handle(Context context) + throws SemanticException { + + AlterPartitionMessage msg = deserializer.getAlterPartitionMessage(context.dmd.getPayload()); + String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; + String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName; + + Map<String, String> newPartSpec = new LinkedHashMap<>(); + Map<String, String> oldPartSpec = new LinkedHashMap<>(); + String tableName = actualDbName + "." + actualTblName; + try { + Table tblObj = msg.getTableObj(); + Iterator<String> beforeIterator = msg.getPtnObjBefore().getValuesIterator(); + Iterator<String> afterIterator = msg.getPtnObjAfter().getValuesIterator(); + for (FieldSchema fs : tblObj.getPartitionKeys()) { + oldPartSpec.put(fs.getName(), beforeIterator.next()); + newPartSpec.put(fs.getName(), afterIterator.next()); + } + } catch (Exception e) { + throw (e instanceof SemanticException) + ? (SemanticException) e + : new SemanticException("Error reading message members", e); + } + + RenamePartitionDesc renamePtnDesc = + new RenamePartitionDesc(tableName, oldPartSpec, newPartSpec); + Task<DDLWork> renamePtnTask = TaskFactory.get( + new DDLWork(readEntitySet, writeEntitySet, renamePtnDesc), context.hiveConf + ); + context.log + .debug("Added rename ptn task : {}:{}->{}", renamePtnTask.getId(), oldPartSpec, + newPartSpec); + databasesUpdated.put(actualDbName, context.dmd.getEventTo()); + tablesUpdated.put(tableName, context.dmd.getEventTo()); + return Collections.singletonList(renamePtnTask); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java new file mode 100644 index 0000000..2c429c1 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.AlterTableDesc; +import org.apache.hadoop.hive.ql.plan.DDLWork; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +class RenameTableHandler extends AbstractMessageHandler { + @Override + public List<Task<? extends Serializable>> handle(Context context) + throws SemanticException { + + AlterTableMessage msg = deserializer.getAlterTableMessage(context.dmd.getPayload()); + if (!context.isTableNameEmpty()) { + throw new SemanticException( + "RENAMES of tables are not supported for table-level replication"); + } + try { + String oldDbName = msg.getTableObjBefore().getDbName(); + String newDbName = msg.getTableObjAfter().getDbName(); + + if (!context.isDbNameEmpty()) { + // If we're loading into a db, instead of into the warehouse, then the oldDbName and + // newDbName must be the same + if (!oldDbName.equalsIgnoreCase(newDbName)) { + throw new SemanticException("Cannot replicate an event renaming a table across" + + " databases into a db level load " + oldDbName + "->" + newDbName); + } else { + // both were the same, and can be replaced by the new db we're loading into. + oldDbName = context.dbName; + newDbName = context.dbName; + } + } + + String oldName = oldDbName + "." + msg.getTableObjBefore().getTableName(); + String newName = newDbName + "." + msg.getTableObjAfter().getTableName(); + AlterTableDesc renameTableDesc = new AlterTableDesc(oldName, newName, false); + Task<DDLWork> renameTableTask = TaskFactory.get( + new DDLWork(readEntitySet, writeEntitySet, renameTableDesc), context.hiveConf + ); + context.log.debug( + "Added rename table task : {}:{}->{}", renameTableTask.getId(), oldName, newName + ); + // oldDbName and newDbName *will* be the same if we're here + databasesUpdated.put(newDbName, context.dmd.getEventTo()); + tablesUpdated.remove(oldName); + tablesUpdated.put(newName, context.dmd.getEventTo()); + // Note : edge-case here in interaction with table-level REPL LOAD, where that nukes out tablesUpdated + // However, we explicitly don't support repl of that sort, and error out above if so. If that should + // ever change, this will need reworking. + return Collections.singletonList(renameTableTask); + } catch (Exception e) { + throw (e instanceof SemanticException) + ? (SemanticException) e + : new SemanticException("Error reading message members", e); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java new file mode 100644 index 0000000..2db8385 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; + +class TableHandler extends AbstractMessageHandler { + @Override + public List<Task<? extends Serializable>> handle(Context context) throws SemanticException { + // Path being passed to us is a table dump location. We go ahead and load it in as needed. + // If tblName is null, then we default to the table name specified in _metadata, which is good. + // or are both specified, in which case, that's what we are intended to create the new table as. + if (context.isDbNameEmpty()) { + throw new SemanticException("Database name cannot be null for a table load"); + } + try { + // TODO: why not have the below variables as static / inline seems to have no possibility of updates back here + + // no location set on repl loads + boolean isLocationSet = false; + // all repl imports are non-external + boolean isExternalSet = false; + // bootstrap loads are not partition level + boolean isPartSpecSet = false; + // repl loads are not partition level + LinkedHashMap<String, String> parsedPartSpec = null; + // no location for repl imports + String parsedLocation = null; + List<Task<? extends Serializable>> importTasks = new ArrayList<>(); + + EximUtil.SemanticAnalyzerWrapperContext x = + new EximUtil.SemanticAnalyzerWrapperContext( + context.hiveConf, context.db, readEntitySet, writeEntitySet, importTasks, context.log, + context.nestedContext); + ImportSemanticAnalyzer.prepareImport(isLocationSet, isExternalSet, isPartSpecSet, + (context.precursor != null), parsedLocation, context.tableName, context.dbName, + parsedPartSpec, context.location, x, + databasesUpdated, tablesUpdated); + + return importTasks; + } catch (Exception e) { + throw new SemanticException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java new file mode 100644 index 0000000..5436f0d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.TruncateTableDesc; + +import java.io.Serializable; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +class TruncatePartitionHandler extends AbstractMessageHandler { + @Override + public List<Task<? extends Serializable>> handle(Context context) throws SemanticException { + AlterPartitionMessage msg = deserializer.getAlterPartitionMessage(context.dmd.getPayload()); + String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; + String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName; + + Map<String, String> partSpec = new LinkedHashMap<>(); + try { + org.apache.hadoop.hive.metastore.api.Table tblObj = msg.getTableObj(); + Iterator<String> afterIterator = msg.getPtnObjAfter().getValuesIterator(); + for (FieldSchema fs : tblObj.getPartitionKeys()) { + partSpec.put(fs.getName(), afterIterator.next()); + } + } catch (Exception e) { + if (!(e instanceof SemanticException)) { + throw new SemanticException("Error reading message members", e); + } else { + throw (SemanticException) e; + } + } + + TruncateTableDesc truncateTableDesc = new TruncateTableDesc( + actualDbName + "." + actualTblName, partSpec); + Task<DDLWork> truncatePtnTask = + TaskFactory.get( + new DDLWork(readEntitySet, writeEntitySet, truncateTableDesc), + context.hiveConf + ); + context.log.debug("Added truncate ptn task : {}:{}", truncatePtnTask.getId(), + truncateTableDesc.getTableName()); + databasesUpdated.put(actualDbName, context.dmd.getEventTo()); + return Collections.singletonList(truncatePtnTask); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java new file mode 100644 index 0000000..731383c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.TruncateTableDesc; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +class TruncateTableHandler extends AbstractMessageHandler { + @Override + public List<Task<? extends Serializable>> handle(Context context) throws SemanticException { + AlterTableMessage msg = deserializer.getAlterTableMessage(context.dmd.getPayload()); + String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; + String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName; + + TruncateTableDesc truncateTableDesc = new TruncateTableDesc( + actualDbName + "." + actualTblName, null); + Task<DDLWork> truncateTableTask = TaskFactory.get( + new DDLWork(readEntitySet, writeEntitySet, truncateTableDesc), + context.hiveConf + ); + + context.log.debug("Added truncate tbl task : {}:{}", truncateTableTask.getId(), + truncateTableDesc.getTableName()); + databasesUpdated.put(actualDbName, context.dmd.getEventTo()); + return Collections.singletonList(truncateTableTask); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java new file mode 100644 index 0000000..c689e6f --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/events/TestEventHandlerFactory.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +public class TestEventHandlerFactory { + @Test(expected = IllegalArgumentException.class) + public void shouldNotAllowRegisteringEventsWhichCannotBeInstantiated() { + class NonCompatibleEventHandler implements EventHandler { + @Override + public void handle(Context withinContext) throws Exception { + + } + + @Override + public long fromEventId() { + return 0; + } + + @Override + public long toEventId() { + return 0; + } + + @Override + public DumpType dumpType() { + return null; + } + } + EventHandlerFactory.register("anyEvent", NonCompatibleEventHandler.class); + } + + @Test + public void shouldProvideDefaultHandlerWhenNothingRegisteredForThatEvent() { + EventHandler eventHandler = + EventHandlerFactory.handlerFor(new NotificationEvent(Long.MAX_VALUE, Integer.MAX_VALUE, + "shouldGiveDefaultHandler", "s")); + assertTrue(eventHandler instanceof DefaultHandler); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/9d4f13af/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java deleted file mode 100644 index 4b802c4..0000000 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/events/TestEventHandlerFactory.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.parse.repl.events; - -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.ql.parse.repl.DumpType; -import org.junit.Test; - -import static org.junit.Assert.assertTrue; - -public class TestEventHandlerFactory { - @Test(expected = IllegalArgumentException.class) - public void shouldNotAllowRegisteringEventsWhichCannotBeInstantiated() { - class NonCompatibleEventHandler implements EventHandler { - @Override - public void handle(Context withinContext) throws Exception { - - } - - @Override - public long fromEventId() { - return 0; - } - - @Override - public long toEventId() { - return 0; - } - - @Override - public DumpType dumpType() { - return null; - } - } - EventHandlerFactory.register("anyEvent", NonCompatibleEventHandler.class); - } - - @Test - public void shouldProvideDefaultHandlerWhenNothingRegisteredForThatEvent() { - EventHandler eventHandler = - EventHandlerFactory.handlerFor(new NotificationEvent(Long.MAX_VALUE, Integer.MAX_VALUE, - "shouldGiveDefaultHandler", "s")); - assertTrue(eventHandler instanceof DefaultHandler); - } - -} \ No newline at end of file
