Repository: hive Updated Branches: refs/heads/branch-3 c191ea5e9 -> 150ef3ba5
http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java new file mode 100644 index 0000000..4b37c8d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.incremental; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * IncrementalLoadEventsIterator + * Helper class to iterate through event dump directory. + */ +public class IncrementalLoadEventsIterator implements Iterator<FileStatus> { + private FileStatus[] eventDirs; + private int currentIndex; + private int numEvents; + + public IncrementalLoadEventsIterator(String loadPath, HiveConf conf) throws IOException { + Path eventPath = new Path(loadPath); + FileSystem fs = eventPath.getFileSystem(conf); + eventDirs = fs.listStatus(eventPath, EximUtil.getDirectoryFilter(fs)); + if ((eventDirs == null) || (eventDirs.length == 0)) { + throw new IllegalArgumentException("No data to load in path " + loadPath); + } + // For event dump, each sub-dir is an individual event dump. + // We need to guarantee that the directory listing we got is in order of event id. + Arrays.sort(eventDirs, new EventDumpDirComparator()); + currentIndex = 0; + numEvents = eventDirs.length; + } + + @Override + public boolean hasNext() { + return (eventDirs != null && currentIndex < numEvents); + } + + @Override + public FileStatus next() { + if (hasNext()) { + return eventDirs[currentIndex++]; + } else { + throw new NoSuchElementException("no more events"); + } + } + + public int getNumEvents() { + return numEvents; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java new file mode 100644 index 0000000..94e6425 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.incremental; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; +import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; +import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; +import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; +import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger; +import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; +import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; +import org.apache.hadoop.hive.ql.plan.AlterTableDesc; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; +import org.apache.hadoop.hive.ql.stats.StatsUtils; +import org.slf4j.Logger; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.HashSet; + +/** + * IncrementalLoad + * Iterate through the dump directory and create tasks to load the events. + */ +public class IncrementalLoadTasksBuilder { + private final String dbName, tableName; + private final IncrementalLoadEventsIterator iterator; + private HashSet<ReadEntity> inputs; + private HashSet<WriteEntity> outputs; + private Logger log; + private final HiveConf conf; + private final ReplLogger replLogger; + private static long numIteration; + + public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadPath, + IncrementalLoadEventsIterator iterator, HiveConf conf) { + this.dbName = dbName; + this.tableName = tableName; + this.iterator = iterator; + inputs = new HashSet<>(); + outputs = new HashSet<>(); + log = null; + this.conf = conf; + replLogger = new IncrementalLoadLogger(dbName, loadPath, iterator.getNumEvents()); + numIteration = 0; + replLogger.startLog(); + } + + public Task<? extends Serializable> build(DriverContext driverContext, Hive hive, Logger log) throws Exception { + Task<? extends Serializable> evTaskRoot = TaskFactory.get(new DependencyCollectionWork()); + Task<? extends Serializable> taskChainTail = evTaskRoot; + Long lastReplayedEvent = null; + this.log = log; + numIteration++; + this.log.debug("Iteration num " + numIteration); + TaskTracker tracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS)); + + while (iterator.hasNext() && tracker.canAddMoreTasks()) { + FileStatus dir = iterator.next(); + String location = dir.getPath().toUri().toString(); + DumpMetaData eventDmd = new DumpMetaData(new Path(location), conf); + + if (!shouldReplayEvent(dir, eventDmd.getDumpType(), dbName, tableName)) { + this.log.debug("Skipping event {} from {} for table {}.{} maxTasks: {}", + eventDmd.getDumpType(), dir.getPath().toUri(), dbName, tableName, tracker.numberOfTasks()); + continue; + } + + this.log.debug("Loading event {} from {} for table {}.{} maxTasks: {}", + eventDmd.getDumpType(), dir.getPath().toUri(), dbName, tableName, tracker.numberOfTasks()); + + // event loads will behave similar to table loads, with one crucial difference + // precursor order is strict, and each event must be processed after the previous one. + // The way we handle this strict order is as follows: + // First, we start with a taskChainTail which is a dummy noop task (a DependecyCollectionTask) + // at the head of our event chain. For each event we process, we tell analyzeTableLoad to + // create tasks that use the taskChainTail as a dependency. Then, we collect all those tasks + // and introduce a new barrier task(also a DependencyCollectionTask) which depends on all + // these tasks. Then, this barrier task becomes our new taskChainTail. Thus, we get a set of + // tasks as follows: + // + // --->ev1.task1-- --->ev2.task1-- + // / \ / \ + // evTaskRoot-->*---->ev1.task2---*--> ev1.barrierTask-->*---->ev2.task2---*->evTaskChainTail + // \ / + // --->ev1.task3-- + // + // Once this entire chain is generated, we add evTaskRoot to rootTasks, so as to execute the + // entire chain + + MessageHandler.Context context = new MessageHandler.Context(dbName, tableName, location, + taskChainTail, eventDmd, conf, hive, driverContext.getCtx(), this.log); + List<Task<? extends Serializable>> evTasks = analyzeEventLoad(context); + + if ((evTasks != null) && (!evTasks.isEmpty())) { + ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, + dir.getPath().getName(), + eventDmd.getDumpType().toString()); + Task<? extends Serializable> barrierTask = TaskFactory.get(replStateLogWork); + AddDependencyToLeaves function = new AddDependencyToLeaves(barrierTask); + DAGTraversal.traverse(evTasks, function); + this.log.debug("Updated taskChainTail from {}:{} to {}:{}", + taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId()); + tracker.addTaskList(taskChainTail.getChildTasks()); + taskChainTail = barrierTask; + } + lastReplayedEvent = eventDmd.getEventTo(); + } + + if (!evTaskRoot.equals(taskChainTail) && !iterator.hasNext()) { + Map<String, String> dbProps = new HashMap<>(); + dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(lastReplayedEvent)); + ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps); + Task<? extends Serializable> barrierTask = TaskFactory.get(replStateLogWork, conf); + taskChainTail.addDependentTask(barrierTask); + this.log.debug("Added {}:{} as a precursor of barrier task {}:{}", + taskChainTail.getClass(), taskChainTail.getId(), + barrierTask.getClass(), barrierTask.getId()); + } + log.info("Iteration " + numIteration + " done with num task : " + + tracker.numberOfTasks() + ", lastReplayedEvent : " + lastReplayedEvent); + return evTaskRoot; + } + + private boolean isEventNotReplayed(Map<String, String> params, FileStatus dir, DumpType dumpType) { + if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) { + String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()); + if (Long.parseLong(replLastId) >= Long.parseLong(dir.getPath().getName())) { + log.debug("Event " + dumpType + " with replId " + Long.parseLong(dir.getPath().getName()) + + " is already replayed. LastReplId - " + Long.parseLong(replLastId)); + return false; + } + } + return true; + } + + private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType, String dbName, String tableName) { + // if database itself is null then we can not filter out anything. + if (dbName == null || dbName.isEmpty()) { + return true; + } else if ((tableName == null) || (tableName.isEmpty())) { + Database database; + try { + database = Hive.get().getDatabase(dbName); + return isEventNotReplayed(database.getParameters(), dir, dumpType); + } catch (HiveException e) { + //may be the db is getting created in this load + log.debug("failed to get the database " + dbName); + return true; + } + } else { + Table tbl; + try { + tbl = Hive.get().getTable(dbName, tableName); + return isEventNotReplayed(tbl.getParameters(), dir, dumpType); + } catch (HiveException e) { + // may be the table is getting created in this load + log.debug("failed to get the table " + dbName + "." + tableName); + return true; + } + } + } + + private List<Task<? extends Serializable>> analyzeEventLoad(MessageHandler.Context context) throws SemanticException { + MessageHandler messageHandler = context.dmd.getDumpType().handler(); + List<Task<? extends Serializable>> tasks = messageHandler.handle(context); + + if (context.precursor != null) { + for (Task<? extends Serializable> t : tasks) { + context.precursor.addDependentTask(t); + log.debug("Added {}:{} as a precursor of {}:{}", + context.precursor.getClass(), context.precursor.getId(), t.getClass(), t.getId()); + } + } + + inputs.addAll(messageHandler.readEntities()); + outputs.addAll(messageHandler.writeEntities()); + return addUpdateReplStateTasks(StringUtils.isEmpty(context.tableName), messageHandler.getUpdatedMetadata(), tasks); + } + + private Task<? extends Serializable> tableUpdateReplStateTask(String dbName, String tableName, + Map<String, String> partSpec, String replState, + Task<? extends Serializable> preCursor) throws SemanticException { + HashMap<String, String> mapProp = new HashMap<>(); + mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState); + + AlterTableDesc alterTblDesc = new AlterTableDesc( + AlterTableDesc.AlterTableTypes.ADDPROPS, new ReplicationSpec(replState, replState)); + alterTblDesc.setProps(mapProp); + alterTblDesc.setOldName(StatsUtils.getFullyQualifiedTableName(dbName, tableName)); + alterTblDesc.setPartSpec((HashMap<String, String>)partSpec); + + Task<? extends Serializable> updateReplIdTask = TaskFactory.get(new DDLWork(inputs, outputs, alterTblDesc), conf); + + // Link the update repl state task with dependency collection task + if (preCursor != null) { + preCursor.addDependentTask(updateReplIdTask); + log.debug("Added {}:{} as a precursor of {}:{}", preCursor.getClass(), preCursor.getId(), + updateReplIdTask.getClass(), updateReplIdTask.getId()); + } + return updateReplIdTask; + } + + private Task<? extends Serializable> dbUpdateReplStateTask(String dbName, String replState, + Task<? extends Serializable> preCursor) { + HashMap<String, String> mapProp = new HashMap<>(); + mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState); + + AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, mapProp, new ReplicationSpec(replState, replState)); + Task<? extends Serializable> updateReplIdTask = TaskFactory.get(new DDLWork(inputs, outputs, alterDbDesc), conf); + + // Link the update repl state task with dependency collection task + if (preCursor != null) { + preCursor.addDependentTask(updateReplIdTask); + log.debug("Added {}:{} as a precursor of {}:{}", preCursor.getClass(), preCursor.getId(), + updateReplIdTask.getClass(), updateReplIdTask.getId()); + } + return updateReplIdTask; + } + + private List<Task<? extends Serializable>> addUpdateReplStateTasks( + boolean isDatabaseLoad, + UpdatedMetaDataTracker updatedMetaDataTracker, + List<Task<? extends Serializable>> importTasks) throws SemanticException { + // If no import tasks generated by the event then no need to update the repl state to any object. + if (importTasks.isEmpty()) { + log.debug("No objects need update of repl state: 0 import tasks"); + return importTasks; + } + + // Create a barrier task for dependency collection of import tasks + Task<? extends Serializable> barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf); + + List<Task<? extends Serializable>> tasks = new ArrayList<>(); + Task<? extends Serializable> updateReplIdTask; + + // If any partition is updated, then update repl state in partition object + for (UpdatedMetaDataTracker.UpdateMetaData updateMetaData : updatedMetaDataTracker.getUpdateMetaDataList()) { + String replState = updateMetaData.getReplState(); + String dbName = updateMetaData.getDbName(); + String tableName = updateMetaData.getTableName(); + // If any partition is updated, then update repl state in partition object + for (final Map<String, String> partSpec : updateMetaData.getPartitionsList()) { + updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, partSpec, replState, barrierTask); + tasks.add(updateReplIdTask); + } + + if (tableName != null) { + // If any table/partition is updated, then update repl state in table object + updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, null, replState, barrierTask); + tasks.add(updateReplIdTask); + } + + // For table level load, need not update replication state for the database + if (isDatabaseLoad) { + // If any table/partition is updated, then update repl state in db object + updateReplIdTask = dbUpdateReplStateTask(dbName, replState, barrierTask); + tasks.add(updateReplIdTask); + } + } + + if (tasks.isEmpty()) { + log.debug("No objects need update of repl state: 0 update tracker tasks"); + return importTasks; + } + + // Link import tasks to the barrier task which will in-turn linked with repl state update tasks + for (Task<? extends Serializable> t : importTasks){ + t.addDependentTask(barrierTask); + log.debug("Added {}:{} as a precursor of barrier task {}:{}", + t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId()); + } + + // At least one task would have been added to update the repl state + return tasks; + } + + public static long getNumIteration() { + return numIteration; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/AddDependencyToLeaves.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/AddDependencyToLeaves.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/AddDependencyToLeaves.java new file mode 100644 index 0000000..284796f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/AddDependencyToLeaves.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.util; + +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +public class AddDependencyToLeaves implements DAGTraversal.Function { + private List<Task<? extends Serializable>> postDependencyCollectionTasks; + + public AddDependencyToLeaves(List<Task<? extends Serializable>> postDependencyCollectionTasks) { + this.postDependencyCollectionTasks = postDependencyCollectionTasks; + } + + public AddDependencyToLeaves(Task<? extends Serializable> postDependencyTask) { + this(Collections.singletonList(postDependencyTask)); + } + + + @Override + public void process(Task<? extends Serializable> task) { + if (task.getChildTasks() == null) { + postDependencyCollectionTasks.forEach(task::addDependentTask); + } + } + + @Override + public boolean skipProcessing(Task<? extends Serializable> task) { + return postDependencyCollectionTasks.contains(task); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java new file mode 100644 index 0000000..00a8317 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.util; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.plan.AlterTableDesc; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.ImportTableDesc; +import org.apache.hadoop.hive.ql.stats.StatsUtils; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + + +public class ReplUtils { + + public static final String REPL_CHECKPOINT_KEY = "hive.repl.ckpt.key"; + + /** + * Bootstrap REPL LOAD operation type on the examined object based on ckpt state. + */ + public enum ReplLoadOpType { + LOAD_NEW, LOAD_SKIP, LOAD_REPLACE + } + + public static Map<Integer, List<ExprNodeGenericFuncDesc>> genPartSpecs( + Table table, List<Map<String, String>> partitions) throws SemanticException { + Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs = new HashMap<>(); + int partPrefixLength = 0; + if (partitions.size() > 0) { + partPrefixLength = partitions.get(0).size(); + // pick the length of the first ptn, we expect all ptns listed to have the same number of + // key-vals. + } + List<ExprNodeGenericFuncDesc> partitionDesc = new ArrayList<>(); + for (Map<String, String> ptn : partitions) { + // convert each key-value-map to appropriate expression. + ExprNodeGenericFuncDesc expr = null; + for (Map.Entry<String, String> kvp : ptn.entrySet()) { + String key = kvp.getKey(); + Object val = kvp.getValue(); + String type = table.getPartColByName(key).getType(); + PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type); + ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true); + ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate( + "=", column, new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, val)); + expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op); + } + if (expr != null) { + partitionDesc.add(expr); + } + } + if (partitionDesc.size() > 0) { + partSpecs.put(partPrefixLength, partitionDesc); + } + return partSpecs; + } + + public static Task<?> getTableReplLogTask(ImportTableDesc tableDesc, ReplLogger replLogger, HiveConf conf) + throws SemanticException { + ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, tableDesc.getTableName(), tableDesc.tableType()); + return TaskFactory.get(replLogWork, conf); + } + + public static Task<?> getTableCheckpointTask(ImportTableDesc tableDesc, HashMap<String, String> partSpec, + String dumpRoot, HiveConf conf) throws SemanticException { + HashMap<String, String> mapProp = new HashMap<>(); + mapProp.put(REPL_CHECKPOINT_KEY, dumpRoot); + + AlterTableDesc alterTblDesc = new AlterTableDesc(AlterTableDesc.AlterTableTypes.ADDPROPS); + alterTblDesc.setProps(mapProp); + alterTblDesc.setOldName( + StatsUtils.getFullyQualifiedTableName(tableDesc.getDatabaseName(), tableDesc.getTableName())); + if (partSpec != null) { + alterTblDesc.setPartSpec(partSpec); + } + return TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterTblDesc), conf); + } + + public static boolean replCkptStatus(String dbName, Map<String, String> props, String dumpRoot) + throws InvalidOperationException { + // If ckpt property not set or empty means, bootstrap is not run on this object. + if ((props != null) && props.containsKey(REPL_CHECKPOINT_KEY) && !props.get(REPL_CHECKPOINT_KEY).isEmpty()) { + if (props.get(REPL_CHECKPOINT_KEY).equals(dumpRoot)) { + return true; + } + throw new InvalidOperationException(ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.format(dumpRoot, + props.get(REPL_CHECKPOINT_KEY))); + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java new file mode 100644 index 0000000..1d01bc9 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.util; + +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; +import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** + * This class will be responsible to track how many tasks have been created, + * organization of tasks such that after the number of tasks for next execution are created + * we create a dependency collection task(DCT) -> another bootstrap task, + * and then add DCT as dependent to all existing tasks that are created so the cycle can continue. + */ +public class TaskTracker { + private static Logger LOG = LoggerFactory.getLogger(TaskTracker.class); + /** + * used to identify the list of tasks at root level for a given level like table / db / partition. + * this does not include the task dependency notion of "table tasks < ---- partition task" + */ + private final List<Task<? extends Serializable>> tasks = new ArrayList<>(); + private ReplicationState replicationState = null; + // since tasks themselves can be graphs we want to limit the number of created + // tasks including all of dependencies. + private int numberOfTasks = 0; + private final int maxTasksAllowed; + + public TaskTracker(int defaultMaxTasks) { + maxTasksAllowed = defaultMaxTasks; + } + + public TaskTracker(TaskTracker existing) { + maxTasksAllowed = existing.maxTasksAllowed - existing.numberOfTasks; + } + + /** + * this method is used to identify all the tasks in a graph. + * the graph however might get created in a disjoint fashion, in which case we can just update + * the number of tasks using the "update" method. + */ + public void addTask(Task<? extends Serializable> task) { + tasks.add(task); + + List <Task<? extends Serializable>> visited = new ArrayList<>(); + updateTaskCount(task, visited); + } + + public void addTaskList(List <Task<? extends Serializable>> taskList) { + List <Task<? extends Serializable>> visited = new ArrayList<>(); + for (Task<? extends Serializable> task : taskList) { + if (!visited.contains(task)) { + tasks.add(task); + updateTaskCount(task, visited); + } + } + } + + // This method is used to traverse the DAG created in tasks list and add the dependent task to + // the tail of each task chain. + public void addDependentTask(Task<? extends Serializable> dependent) { + if (tasks.isEmpty()) { + addTask(dependent); + } else { + DAGTraversal.traverse(tasks, new AddDependencyToLeaves(dependent)); + + List<Task<? extends Serializable>> visited = new ArrayList<>(); + updateTaskCount(dependent, visited); + } + } + + private void updateTaskCount(Task<? extends Serializable> task, + List <Task<? extends Serializable>> visited) { + numberOfTasks += 1; + visited.add(task); + if (task.getChildTasks() != null) { + for (Task<? extends Serializable> childTask : task.getChildTasks()) { + if (visited.contains(childTask)) { + continue; + } + updateTaskCount(childTask, visited); + } + } + } + + public boolean canAddMoreTasks() { + return numberOfTasks < maxTasksAllowed; + } + + public boolean hasTasks() { + return numberOfTasks != 0; + } + + public void update(TaskTracker withAnother) { + numberOfTasks += withAnother.numberOfTasks; + if (withAnother.hasReplicationState()) { + this.replicationState = withAnother.replicationState; + } + } + + public void setReplicationState(ReplicationState state) { + this.replicationState = state; + } + + public boolean hasReplicationState() { + return replicationState != null; + } + + public ReplicationState replicationState() { + return replicationState; + } + + public List<Task<? extends Serializable>> tasks() { + return tasks; + } + + public void debugLog(String forEventType) { + LOG.debug("{} event with total / root number of tasks:{}/{}", forEventType, numberOfTasks, + tasks.size()); + } + + public int numberOfTasks() { + return numberOfTasks; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java index a91f45e..cf54aa3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java @@ -22,7 +22,7 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork; +import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.parse.GenTezProcContext; import org.apache.hadoop.hive.ql.parse.GenTezWork; http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 0a5ecf9..0a535d1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -30,7 +30,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index c1b98ae..d746cd6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.parse; import org.antlr.runtime.tree.Tree; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -30,34 +29,18 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; -import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork; +import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.repl.DumpType; -import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; -import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; -import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger; -import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; -import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; -import org.apache.hadoop.hive.ql.plan.AlterTableDesc; -import org.apache.hadoop.hive.ql.plan.DDLWork; -import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.stats.StatsUtils; import java.io.FileNotFoundException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; + import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -267,45 +250,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } } - private boolean isEventNotReplayed(Map<String, String> params, FileStatus dir, DumpType dumpType) { - if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) { - String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()); - if (Long.parseLong(replLastId) >= Long.parseLong(dir.getPath().getName())) { - LOG.debug("Event " + dumpType + " with replId " + Long.parseLong(dir.getPath().getName()) - + " is already replayed. LastReplId - " + Long.parseLong(replLastId)); - return false; - } - } - return true; - } - - private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType) throws SemanticException { - // if database itself is null then we can not filter out anything. - if (dbNameOrPattern == null || dbNameOrPattern.isEmpty()) { - return true; - } else if ((tblNameOrPattern == null) || (tblNameOrPattern.isEmpty())) { - Database database; - try { - database = Hive.get().getDatabase(dbNameOrPattern); - return isEventNotReplayed(database.getParameters(), dir, dumpType); - } catch (HiveException e) { - //may be the db is getting created in this load - LOG.debug("failed to get the database " + dbNameOrPattern); - return true; - } - } else { - Table tbl; - try { - tbl = Hive.get().getTable(dbNameOrPattern, tblNameOrPattern); - return isEventNotReplayed(tbl.getParameters(), dir, dumpType); - } catch (HiveException e) { - // may be the table is getting created in this load - LOG.debug("failed to get the table " + dbNameOrPattern + "." + tblNameOrPattern); - return true; - } - } - } - /* * Example dump dirs we need to be able to handle : * @@ -397,7 +341,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) { ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, - tblNameOrPattern, queryState.getLineageState()); + tblNameOrPattern, queryState.getLineageState(), false); rootTasks.add(TaskFactory.get(replLoadWork, conf)); return; } @@ -408,242 +352,15 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { return; } - FileStatus[] dirsInLoadPath = fs.listStatus(loadPath, EximUtil.getDirectoryFilter(fs)); - - if ((dirsInLoadPath == null) || (dirsInLoadPath.length == 0)) { - throw new IllegalArgumentException("No data to load in path " + loadPath.toUri().toString()); - } - - if (!evDump){ - // not an event dump, not a table dump - thus, a db dump - if ((dbNameOrPattern != null) && (dirsInLoadPath.length > 1)) { - LOG.debug("Found multiple dirs when we expected 1:"); - for (FileStatus d : dirsInLoadPath) { - LOG.debug("> " + d.getPath().toUri().toString()); - } - throw new IllegalArgumentException( - "Multiple dirs in " - + loadPath.toUri().toString() - + " does not correspond to REPL LOAD expecting to load to a singular destination point."); - } - - ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, - queryState.getLineageState()); - rootTasks.add(TaskFactory.get(replLoadWork, conf)); - // - // for (FileStatus dir : dirsInLoadPath) { - // analyzeDatabaseLoad(dbNameOrPattern, fs, dir); - // } - } else { - // Event dump, each sub-dir is an individual event dump. - // We need to guarantee that the directory listing we got is in order of evid. - Arrays.sort(dirsInLoadPath, new EventDumpDirComparator()); - - Task<? extends Serializable> evTaskRoot = TaskFactory.get(new DependencyCollectionWork()); - Task<? extends Serializable> taskChainTail = evTaskRoot; - - ReplLogger replLogger = new IncrementalLoadLogger(dbNameOrPattern, - loadPath.toString(), dirsInLoadPath.length); - - for (FileStatus dir : dirsInLoadPath){ - String locn = dir.getPath().toUri().toString(); - DumpMetaData eventDmd = new DumpMetaData(new Path(locn), conf); - - if (!shouldReplayEvent(dir, eventDmd.getDumpType())) { - continue; - } - - LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern); - - // event loads will behave similar to table loads, with one crucial difference - // precursor order is strict, and each event must be processed after the previous one. - // The way we handle this strict order is as follows: - // First, we start with a taskChainTail which is a dummy noop task (a DependecyCollectionTask) - // at the head of our event chain. For each event we process, we tell analyzeTableLoad to - // create tasks that use the taskChainTail as a dependency. Then, we collect all those tasks - // and introduce a new barrier task(also a DependencyCollectionTask) which depends on all - // these tasks. Then, this barrier task becomes our new taskChainTail. Thus, we get a set of - // tasks as follows: - // - // --->ev1.task1-- --->ev2.task1-- - // / \ / \ - // evTaskRoot-->*---->ev1.task2---*--> ev1.barrierTask-->*---->ev2.task2---*->evTaskChainTail - // \ / - // --->ev1.task3-- - // - // Once this entire chain is generated, we add evTaskRoot to rootTasks, so as to execute the - // entire chain - - MessageHandler.Context context = new MessageHandler.Context(dbNameOrPattern, - tblNameOrPattern, locn, taskChainTail, - eventDmd, conf, db, ctx, LOG); - List<Task<? extends Serializable>> evTasks = analyzeEventLoad(context); - - if ((evTasks != null) && (!evTasks.isEmpty())){ - ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, - dir.getPath().getName(), - eventDmd.getDumpType().toString()); - Task<? extends Serializable> barrierTask = TaskFactory.get(replStateLogWork); - for (Task<? extends Serializable> t : evTasks){ - t.addDependentTask(barrierTask); - LOG.debug("Added {}:{} as a precursor of barrier task {}:{}", - t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId()); - } - LOG.debug("Updated taskChainTail from {}:{} to {}:{}", - taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId()); - taskChainTail = barrierTask; - } - } - - // If any event is there and db name is known, then dump the start and end logs - if (!evTaskRoot.equals(taskChainTail)) { - Map<String, String> dbProps = new HashMap<>(); - dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(dmd.getEventTo())); - ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps); - Task<? extends Serializable> barrierTask = TaskFactory.get(replStateLogWork, conf); - taskChainTail.addDependentTask(barrierTask); - LOG.debug("Added {}:{} as a precursor of barrier task {}:{}", - taskChainTail.getClass(), taskChainTail.getId(), - barrierTask.getClass(), barrierTask.getId()); - - replLogger.startLog(); - } - rootTasks.add(evTaskRoot); - } - + ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, + tblNameOrPattern, queryState.getLineageState(), evDump); + rootTasks.add(TaskFactory.get(replLoadWork, conf)); } catch (Exception e) { // TODO : simple wrap & rethrow for now, clean up with error codes throw new SemanticException(e.getMessage(), e); } } - private List<Task<? extends Serializable>> analyzeEventLoad( - MessageHandler.Context context) - throws SemanticException { - MessageHandler messageHandler = context.dmd.getDumpType().handler(); - List<Task<? extends Serializable>> tasks = messageHandler.handle(context); - - if (context.precursor != null) { - for (Task<? extends Serializable> t : tasks) { - context.precursor.addDependentTask(t); - LOG.debug("Added {}:{} as a precursor of {}:{}", - context.precursor.getClass(), context.precursor.getId(), t.getClass(), t.getId()); - } - } - - inputs.addAll(messageHandler.readEntities()); - outputs.addAll(messageHandler.writeEntities()); - return addUpdateReplStateTasks(StringUtils.isEmpty(context.tableName), - messageHandler.getUpdatedMetadata(), tasks); - } - - private Task<? extends Serializable> tableUpdateReplStateTask( - String dbName, - String tableName, - Map<String, String> partSpec, - String replState, - Task<? extends Serializable> preCursor) throws SemanticException { - HashMap<String, String> mapProp = new HashMap<>(); - mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState); - - AlterTableDesc alterTblDesc = new AlterTableDesc( - AlterTableDesc.AlterTableTypes.ADDPROPS, new ReplicationSpec(replState, replState)); - alterTblDesc.setProps(mapProp); - alterTblDesc.setOldName(StatsUtils.getFullyQualifiedTableName(dbName, tableName)); - alterTblDesc.setPartSpec((HashMap<String, String>)partSpec); - - Task<? extends Serializable> updateReplIdTask = TaskFactory.get( - new DDLWork(inputs, outputs, alterTblDesc), conf); - - // Link the update repl state task with dependency collection task - if (preCursor != null) { - preCursor.addDependentTask(updateReplIdTask); - LOG.debug("Added {}:{} as a precursor of {}:{}", - preCursor.getClass(), preCursor.getId(), - updateReplIdTask.getClass(), updateReplIdTask.getId()); - } - return updateReplIdTask; - } - - private Task<? extends Serializable> dbUpdateReplStateTask( - String dbName, - String replState, - Task<? extends Serializable> preCursor) { - HashMap<String, String> mapProp = new HashMap<>(); - mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState); - - AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc( - dbName, mapProp, new ReplicationSpec(replState, replState)); - Task<? extends Serializable> updateReplIdTask = TaskFactory.get( - new DDLWork(inputs, outputs, alterDbDesc), conf); - - // Link the update repl state task with dependency collection task - if (preCursor != null) { - preCursor.addDependentTask(updateReplIdTask); - LOG.debug("Added {}:{} as a precursor of {}:{}", - preCursor.getClass(), preCursor.getId(), - updateReplIdTask.getClass(), updateReplIdTask.getId()); - } - return updateReplIdTask; - } - - private List<Task<? extends Serializable>> addUpdateReplStateTasks( - boolean isDatabaseLoad, - UpdatedMetaDataTracker updatedMetaDataTracker, - List<Task<? extends Serializable>> importTasks) throws SemanticException { - // If no import tasks generated by the event then no need to update the repl state to any object. - if (importTasks.isEmpty()) { - LOG.debug("No objects need update of repl state: 0 import tasks"); - return importTasks; - } - - // Create a barrier task for dependency collection of import tasks - Task<? extends Serializable> barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf); - - List<Task<? extends Serializable>> tasks = new ArrayList<>(); - Task<? extends Serializable> updateReplIdTask; - - // If any partition is updated, then update repl state in partition object - for (UpdatedMetaDataTracker.UpdateMetaData updateMetaData : updatedMetaDataTracker.getUpdateMetaDataList()) { - String replState = updateMetaData.getReplState(); - String dbName = updateMetaData.getDbName(); - String tableName = updateMetaData.getTableName(); - // If any partition is updated, then update repl state in partition object - for (final Map<String, String> partSpec : updateMetaData.getPartitionsList()) { - updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, partSpec, replState, barrierTask); - tasks.add(updateReplIdTask); - } - - if (tableName != null) { - // If any table/partition is updated, then update repl state in table object - updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, null, replState, barrierTask); - tasks.add(updateReplIdTask); - } - - // For table level load, need not update replication state for the database - if (isDatabaseLoad) { - // If any table/partition is updated, then update repl state in db object - updateReplIdTask = dbUpdateReplStateTask(dbName, replState, barrierTask); - tasks.add(updateReplIdTask); - } - } - - if (tasks.isEmpty()) { - LOG.debug("No objects need update of repl state: 0 update tracker tasks"); - return importTasks; - } - - // Link import tasks to the barrier task which will in-turn linked with repl state update tasks - for (Task<? extends Serializable> t : importTasks){ - t.addDependentTask(barrierTask); - LOG.debug("Added {}:{} as a precursor of barrier task {}:{}", - t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId()); - } - - // At least one task would have been added to update the repl state - return tasks; - } - // REPL STATUS private void initReplStatus(ASTNode ast) throws SemanticException{ dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText()); http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java index 9fdf742..ecde3ce 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.io; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.thrift.TException; http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java index 70f4fed..f05c231 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java @@ -21,7 +21,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java index b59cdf2..e68e055 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java @@ -22,7 +22,7 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java index 87a6ff6..0619bd3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java @@ -20,7 +20,7 @@ package org.apache.hadoop.hive.ql.parse.repl.load.message; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.WriteEventInfo; import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves; +import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.ReplTxnWork; http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java index 939884d..4a2fdd2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java @@ -20,7 +20,7 @@ package org.apache.hadoop.hive.ql.parse.repl.load.message; import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.DDLWork; http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java index 309debe..166cf87 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java index 32b4a72..41ab447 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock;
