HIVE-14636 : pass information from FSOP/TezTask to commit to take care of speculative execution and failed tasks (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/87dcab47 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/87dcab47 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/87dcab47 Branch: refs/heads/hive-14535 Commit: 87dcab470f33ace818c775da6b0a9f18b10f66ac Parents: 2cef25d Author: Sergey Shelukhin <[email protected]> Authored: Wed Aug 31 16:06:03 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Wed Aug 31 16:06:03 2016 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hive/common/FileUtils.java | 6 +- .../hadoop/hive/common/HiveStatsUtils.java | 14 +- .../java/org/apache/hadoop/hive/ql/Context.java | 9 +- .../hive/ql/exec/AbstractFileMergeOperator.java | 4 +- .../hadoop/hive/ql/exec/FileSinkOperator.java | 194 +++++++++--- .../apache/hadoop/hive/ql/exec/MoveTask.java | 315 ++++++++++--------- .../apache/hadoop/hive/ql/exec/Utilities.java | 10 +- .../apache/hadoop/hive/ql/metadata/Hive.java | 2 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 5 +- .../hadoop/hive/ql/parse/TypeCheckCtx.java | 2 +- .../hadoop/hive/ql/plan/FileSinkDesc.java | 9 + .../hadoop/hive/ql/plan/LoadFileDesc.java | 2 +- .../hadoop/hive/ql/plan/LoadTableDesc.java | 19 +- ql/src/test/queries/clientpositive/mm_current.q | 18 +- .../clientpositive/llap/mm_current.q.out | 133 +++++++- 15 files changed, 517 insertions(+), 225 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/common/src/java/org/apache/hadoop/hive/common/FileUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java index 3ed2d08..ad43610 100644 --- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java @@ -329,9 +329,13 @@ public final class FileUtils { */ public static void listStatusRecursively(FileSystem fs, FileStatus fileStatus, List<FileStatus> results) throws IOException { + listStatusRecursively(fs, fileStatus, HIDDEN_FILES_PATH_FILTER, results); + } + public static void listStatusRecursively(FileSystem fs, FileStatus fileStatus, + PathFilter filter, List<FileStatus> results) throws IOException { if (fileStatus.isDir()) { - for (FileStatus stat : fs.listStatus(fileStatus.getPath(), HIDDEN_FILES_PATH_FILTER)) { + for (FileStatus stat : fs.listStatus(fileStatus.getPath(), filter)) { listStatusRecursively(fs, stat, results); } } else { http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java b/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java index 7c9d72f..111d99c 100644 --- a/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,15 +51,20 @@ public class HiveStatsUtils { * @return array of FileStatus * @throws IOException */ - public static FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs) + public static FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs) throws IOException { + return getFileStatusRecurse(path, level, fs, FileUtils.HIDDEN_FILES_PATH_FILTER); + } + + public static FileStatus[] getFileStatusRecurse( + Path path, int level, FileSystem fs, PathFilter filter) throws IOException { // if level is <0, the return all files/directories under the specified path - if ( level < 0) { + if (level < 0) { List<FileStatus> result = new ArrayList<FileStatus>(); try { FileStatus fileStatus = fs.getFileStatus(path); - FileUtils.listStatusRecursively(fs, fileStatus, result); + FileUtils.listStatusRecursively(fs, fileStatus, filter, result); } catch (IOException e) { // globStatus() API returns empty FileStatus[] when the specified path // does not exist. But getFileStatus() throw IOException. To mimic the @@ -75,7 +81,7 @@ public class HiveStatsUtils { sb.append(Path.SEPARATOR).append("*"); } Path pathPattern = new Path(path, sb.toString()); - return fs.globStatus(pathPattern, FileUtils.HIDDEN_FILES_PATH_FILTER); + return fs.globStatus(pathPattern, filter); } public static int getNumBitVectorsForNDVEstimation(Configuration conf) throws Exception { http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/Context.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index ceb257c..1013f7c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -233,7 +233,8 @@ public class Context { // Append task specific info to stagingPathName, instead of creating a sub-directory. // This way we don't have to worry about deleting the stagingPathName separately at // end of query execution. - dir = fs.makeQualified(new Path(stagingPathName + "_" + this.executionId + "-" + TaskRunner.getTaskRunnerID())); + // TODO# HERE + dir = fs.makeQualified(new Path(stagingPathName + "_" + getExecutionPrefix())); LOG.debug("Created staging dir = " + dir + " for path = " + inputPath); @@ -819,6 +820,10 @@ public class Context { this.skipTableMasking = skipTableMasking; } + public String getExecutionPrefix() { + return this.executionId + "-" + TaskRunner.getTaskRunnerID(); + } + public ExplainConfiguration getExplainConfig() { return explainConfig; } @@ -827,7 +832,7 @@ public class Context { this.explainConfig = explainConfig; } - public void resetOpContext(){ + public void resetOpContext() { opContext = new CompilationOpContext(); sequencer = new AtomicInteger(); } http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java index dfad6c1..40c784b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java @@ -254,8 +254,8 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc> Path outputDir = conf.getOutputPath(); FileSystem fs = outputDir.getFileSystem(hconf); Path backupPath = backupOutputPath(fs, outputDir); - Utilities - .mvFileToFinalPath(outputDir, hconf, success, LOG, conf.getDpCtx(), + // TODO# merge-related move + Utilities.mvFileToFinalPath(outputDir, hconf, success, LOG, conf.getDpCtx(), null, reporter); if (success) { LOG.info("jobCloseOp moved merged files to output dir: " + outputDir); http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 1f5dfea..b8a2c5a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -27,16 +27,22 @@ import java.io.StringWriter; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -79,6 +85,7 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hive.common.util.HiveStringUtils; import org.slf4j.Logger; @@ -92,6 +99,7 @@ import com.google.common.collect.Lists; public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements Serializable { + private static final String MANIFEST_EXTENSION = ".manifest"; public static final Logger LOG = LoggerFactory.getLogger(FileSinkOperator.class); private static final boolean isInfoEnabled = LOG.isInfoEnabled(); private static final boolean isDebugEnabled = LOG.isDebugEnabled(); @@ -165,7 +173,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } Utilities.LOG14535.info("new FSPaths for " + numFiles + " files, dynParts = " + bDynParts + ": tmpPath " + tmpPath + ", task path " + taskOutputTempPath - + " (spec path " + specPath + ")", new Exception()); + + " (spec path " + specPath + ")"/*, new Exception()*/); outPaths = new Path[numFiles]; finalPaths = new Path[numFiles]; @@ -187,7 +195,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements /** * Update the final paths according to tmpPath. */ - public Path getFinalPath(String taskId, Path tmpPath, String extension) { + private Path getFinalPath(String taskId, Path tmpPath, String extension) { if (extension != null) { return new Path(tmpPath, taskId + extension); } else { @@ -218,41 +226,64 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } private void commit(FileSystem fs) throws HiveException { - if (isMmTable) return; // TODO#: need to propagate to MoveTask instead + List<Path> commitPaths = null; + if (isMmTable) { + commitPaths = new ArrayList<>(); + } for (int idx = 0; idx < outPaths.length; ++idx) { try { - if ((bDynParts || isSkewedStoredAsSubDirectories) - && !fs.exists(finalPaths[idx].getParent())) { - Utilities.LOG14535.info("commit making path for dyn/skew: " + finalPaths[idx].getParent()); - fs.mkdirs(finalPaths[idx].getParent()); - } - boolean needToRename = true; - if (conf.getWriteType() == AcidUtils.Operation.UPDATE || - conf.getWriteType() == AcidUtils.Operation.DELETE) { - // If we're updating or deleting there may be no file to close. This can happen - // because the where clause strained out all of the records for a given bucket. So - // before attempting the rename below, check if our file exists. If it doesn't, - // then skip the rename. If it does try it. We could just blindly try the rename - // and avoid the extra stat, but that would mask other errors. - try { - if (outPaths[idx] != null) { - FileStatus stat = fs.getFileStatus(outPaths[idx]); - } - } catch (FileNotFoundException fnfe) { - needToRename = false; - } - } - Utilities.LOG14535.info("commit potentially moving " + outPaths[idx] + " to " + finalPaths[idx]); - if (needToRename && outPaths[idx] != null && !fs.rename(outPaths[idx], finalPaths[idx])) { - throw new HiveException("Unable to rename output from: " + - outPaths[idx] + " to: " + finalPaths[idx]); - } - updateProgress(); + commitOneOutPath(idx, fs, commitPaths); } catch (IOException e) { throw new HiveException("Unable to rename output from: " + outPaths[idx] + " to: " + finalPaths[idx], e); } } + if (isMmTable) { + Path manifestPath = new Path(specPath, "_tmp." + getPrefixedTaskId() + MANIFEST_EXTENSION); + Utilities.LOG14535.info("Writing manifest to " + manifestPath + " with " + commitPaths); + try { + try (FSDataOutputStream out = fs.create(manifestPath)) { + out.writeInt(commitPaths.size()); + for (Path path : commitPaths) { + out.writeUTF(path.toString()); + } + } + } catch (IOException e) { + throw new HiveException(e); + } + } + } + + private String getPrefixedTaskId() { + return conf.getExecutionPrefix() + "_" + taskId; + } + + private void commitOneOutPath(int idx, FileSystem fs, List<Path> commitPaths) + throws IOException, HiveException { + if ((bDynParts || isSkewedStoredAsSubDirectories) + && !fs.exists(finalPaths[idx].getParent())) { + Utilities.LOG14535.info("commit making path for dyn/skew: " + finalPaths[idx].getParent()); + fs.mkdirs(finalPaths[idx].getParent()); + } + // If we're updating or deleting there may be no file to close. This can happen + // because the where clause strained out all of the records for a given bucket. So + // before attempting the rename below, check if our file exists. If it doesn't, + // then skip the rename. If it does try it. We could just blindly try the rename + // and avoid the extra stat, but that would mask other errors. + boolean needToRename = (conf.getWriteType() != AcidUtils.Operation.UPDATE && + conf.getWriteType() != AcidUtils.Operation.DELETE) || fs.exists(outPaths[idx]); + if (needToRename && outPaths[idx] != null) { + Utilities.LOG14535.info("committing " + outPaths[idx] + " to " + finalPaths[idx] + " (" + isMmTable + ")"); + if (isMmTable) { + assert outPaths[idx].equals(finalPaths[idx]); + commitPaths.add(outPaths[idx]); + } else if (!fs.rename(outPaths[idx], finalPaths[idx])) { + throw new HiveException("Unable to rename output from: " + + outPaths[idx] + " to: " + finalPaths[idx]); + } + } + + updateProgress(); } public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws HiveException { @@ -297,10 +328,10 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements outPaths[filesIdx] = getTaskOutPath(taskId); } else { if (!bDynParts && !isSkewedStoredAsSubDirectories) { - finalPaths[filesIdx] = getFinalPath(taskId, specPath, extension); + finalPaths[filesIdx] = getFinalPath(getPrefixedTaskId(), specPath, extension); } else { // TODO# wrong! - finalPaths[filesIdx] = getFinalPath(taskId, specPath, extension); + finalPaths[filesIdx] = getFinalPath(getPrefixedTaskId(), specPath, extension); } outPaths[filesIdx] = finalPaths[filesIdx]; } @@ -638,7 +669,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable, isSkewedStoredAsSubDirectories); Utilities.LOG14535.info("createBucketForFileIdx " + filesIdx + ": final path " + fsp.finalPaths[filesIdx] + "; out path " + fsp.outPaths[filesIdx] +" (spec path " + specPath + ", tmp path " - + fsp.getTmpPath() + ", task " + taskId + ")", new Exception()); + + fsp.getTmpPath() + ", task " + taskId + ")"/*, new Exception()*/); if (isInfoEnabled) { LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]); @@ -1150,9 +1181,13 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements DynamicPartitionCtx dpCtx = conf.getDynPartCtx(); if (conf.isLinkedFileSink() && (dpCtx != null)) { specPath = conf.getParentDir(); + Utilities.LOG14535.info("Setting specPath to " + specPath + " for dynparts"); + } + if (!conf.isMmTable()) { + Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf, reporter); // TODO# other callers + } else { + handleMmTable(specPath, hconf, success, dpCtx, conf, reporter); } - Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf, - reporter); } } catch (IOException e) { throw new HiveException(e); @@ -1160,6 +1195,95 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements super.jobCloseOp(hconf, success); } + private static class ExecPrefixPathFilter implements PathFilter { + private final String prefix, tmpPrefix; + public ExecPrefixPathFilter(String prefix) { + this.prefix = prefix; + this.tmpPrefix = "_tmp." + prefix; + } + + @Override + public boolean accept(Path path) { + String name = path.getName(); + return name.startsWith(prefix) || name.startsWith(tmpPrefix); + } + } + + + private void handleMmTable(Path specPath, Configuration hconf, boolean success, + DynamicPartitionCtx dpCtx, FileSinkDesc conf, Reporter reporter) + throws IOException, HiveException { + FileSystem fs = specPath.getFileSystem(hconf); + int targetLevel = (dpCtx == null) ? 1 : dpCtx.getNumDPCols(); + if (!success) { + FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse( + specPath, targetLevel, fs, new ExecPrefixPathFilter(conf.getExecutionPrefix())); + for (FileStatus status : statuses) { + Utilities.LOG14535.info("Deleting " + status.getPath() + " on failure"); + tryDelete(fs, status.getPath()); + } + return; + } + FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse( + specPath, targetLevel, fs, new ExecPrefixPathFilter(conf.getExecutionPrefix())); + if (statuses == null) return; + LinkedList<FileStatus> results = new LinkedList<>(); + List<Path> manifests = new ArrayList<>(statuses.length); + for (FileStatus status : statuses) { + if (status.getPath().getName().endsWith(MANIFEST_EXTENSION)) { + manifests.add(status.getPath()); + } else { + results.add(status); + } + } + HashSet<String> committed = new HashSet<>(); + for (Path mfp : manifests) { + try (FSDataInputStream mdis = fs.open(mfp)) { + int fileCount = mdis.readInt(); + for (int i = 0; i < fileCount; ++i) { + String nextFile = mdis.readUTF(); + if (!committed.add(nextFile)) { + throw new HiveException(nextFile + " was specified in multiple manifests"); + } + } + } + } + Iterator<FileStatus> iter = results.iterator(); + while (iter.hasNext()) { + FileStatus rfs = iter.next(); + if (!committed.remove(rfs.getPath().toString())) { + iter.remove(); + Utilities.LOG14535.info("Deleting " + rfs.getPath() + " that was not committed"); + tryDelete(fs, rfs.getPath()); + } + } + if (!committed.isEmpty()) { + throw new HiveException("The following files were committed but not found: " + committed); + } + for (Path mfp : manifests) { + Utilities.LOG14535.info("Deleting manifest " + mfp); + tryDelete(fs, mfp); + } + + if (results.isEmpty()) return; + FileStatus[] finalResults = results.toArray(new FileStatus[results.size()]); + + List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles( + fs, finalResults, dpCtx, conf, hconf); + // create empty buckets if necessary + if (emptyBuckets.size() > 0) { + Utilities.createEmptyBuckets(hconf, emptyBuckets, conf, reporter); + } + } + + private void tryDelete(FileSystem fs, Path path) { + try { + fs.delete(path, false); + } catch (IOException ex) { + LOG.error("Failed to delete " + path, ex); + } + } + @Override public OperatorType getType() { return OperatorType.FILESINK; http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 2ab97f7..e3646da 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -241,6 +241,18 @@ public class MoveTask extends Task<MoveWork> implements Serializable { return false; } + private final static class TaskInformation { + public List<BucketCol> bucketCols = null; + public List<SortCol> sortCols = null; + public int numBuckets = -1; + public Task task; + public String path; + public TaskInformation(Task task, String path) { + this.task = task; + this.path = path; + } + } + @Override public int execute(DriverContext driverContext) { @@ -318,155 +330,15 @@ public class MoveTask extends Task<MoveWork> implements Serializable { LOG.info("Partition is: " + tbd.getPartitionSpec().toString()); // Check if the bucketing and/or sorting columns were inferred - List<BucketCol> bucketCols = null; - List<SortCol> sortCols = null; - int numBuckets = -1; - Task task = this; - String path = tbd.getSourcePath().toUri().toString(); - // Find the first ancestor of this MoveTask which is some form of map reduce task - // (Either standard, local, or a merge) - while (task.getParentTasks() != null && task.getParentTasks().size() == 1) { - task = (Task)task.getParentTasks().get(0); - // If it was a merge task or a local map reduce task, nothing can be inferred - if (task instanceof MergeFileTask || task instanceof MapredLocalTask) { - break; - } - - // If it's a standard map reduce task, check what, if anything, it inferred about - // the directory this move task is moving - if (task instanceof MapRedTask) { - MapredWork work = (MapredWork)task.getWork(); - MapWork mapWork = work.getMapWork(); - bucketCols = mapWork.getBucketedColsByDirectory().get(path); - sortCols = mapWork.getSortedColsByDirectory().get(path); - if (work.getReduceWork() != null) { - numBuckets = work.getReduceWork().getNumReduceTasks(); - } - - if (bucketCols != null || sortCols != null) { - // This must be a final map reduce task (the task containing the file sink - // operator that writes the final output) - assert work.isFinalMapRed(); - } - break; - } - - // If it's a move task, get the path the files were moved from, this is what any - // preceding map reduce task inferred information about, and moving does not invalidate - // those assumptions - // This can happen when a conditional merge is added before the final MoveTask, but the - // condition for merging is not met, see GenMRFileSink1. - if (task instanceof MoveTask) { - if (((MoveTask)task).getWork().getLoadFileWork() != null) { - path = ((MoveTask)task).getWork().getLoadFileWork().getSourcePath().toUri().toString(); - } - } - } + TaskInformation ti = new TaskInformation(this, tbd.getSourcePath().toUri().toString()); + inferTaskInformation(ti); // deal with dynamic partitions DynamicPartitionCtx dpCtx = tbd.getDPCtx(); if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions - - List<LinkedHashMap<String, String>> dps = Utilities.getFullDPSpecs(conf, dpCtx); - - // publish DP columns to its subscribers - if (dps != null && dps.size() > 0) { - pushFeed(FeedType.DYNAMIC_PARTITIONS, dps); - } - console.printInfo(System.getProperty("line.separator")); - long startTime = System.currentTimeMillis(); - // load the list of DP partitions and return the list of partition specs - // TODO: In a follow-up to HIVE-1361, we should refactor loadDynamicPartitions - // to use Utilities.getFullDPSpecs() to get the list of full partSpecs. - // After that check the number of DPs created to not exceed the limit and - // iterate over it and call loadPartition() here. - // The reason we don't do inside HIVE-1361 is the latter is large and we - // want to isolate any potential issue it may introduce. - Map<Map<String, String>, Partition> dp = - db.loadDynamicPartitions( - tbd.getSourcePath(), - tbd.getTable().getTableName(), - tbd.getPartitionSpec(), - tbd.getReplace(), - dpCtx.getNumDPCols(), - isSkewedStoredAsDirs(tbd), - work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, - SessionState.get().getTxnMgr().getCurrentTxnId(), hasFollowingStatsTask(), - work.getLoadTableWork().getWriteType()); - - console.printInfo("\t Time taken to load dynamic partitions: " + - (System.currentTimeMillis() - startTime)/1000.0 + " seconds"); - - if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) { - throw new HiveException("This query creates no partitions." + - " To turn off this error, set hive.error.on.empty.partition=false."); - } - - startTime = System.currentTimeMillis(); - // for each partition spec, get the partition - // and put it to WriteEntity for post-exec hook - for(Map.Entry<Map<String, String>, Partition> entry : dp.entrySet()) { - Partition partn = entry.getValue(); - - if (bucketCols != null || sortCols != null) { - updatePartitionBucketSortColumns( - db, table, partn, bucketCols, numBuckets, sortCols); - } - - WriteEntity enty = new WriteEntity(partn, - (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE : - WriteEntity.WriteType.INSERT)); - if (work.getOutputs() != null) { - work.getOutputs().add(enty); - } - // Need to update the queryPlan's output as well so that post-exec hook get executed. - // This is only needed for dynamic partitioning since for SP the the WriteEntity is - // constructed at compile time and the queryPlan already contains that. - // For DP, WriteEntity creation is deferred at this stage so we need to update - // queryPlan here. - if (queryPlan.getOutputs() == null) { - queryPlan.setOutputs(new LinkedHashSet<WriteEntity>()); - } - queryPlan.getOutputs().add(enty); - - // update columnar lineage for each partition - dc = new DataContainer(table.getTTable(), partn.getTPartition()); - - // Don't set lineage on delete as we don't have all the columns - if (SessionState.get() != null && - work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE && - work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) { - SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc, - table.getCols()); - } - LOG.info("\tLoading partition " + entry.getKey()); - } - console.printInfo("\t Time taken for adding to write entity : " + - (System.currentTimeMillis() - startTime)/1000.0 + " seconds"); - dc = null; // reset data container to prevent it being added again. + dc = handleDynParts(db, table, tbd, ti, dpCtx); } else { // static partitions - List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(), - tbd.getPartitionSpec()); - db.validatePartitionNameCharacters(partVals); - Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath() + " into " + tbd.getTable()); - db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(), - tbd.getPartitionSpec(), tbd.getReplace(), - tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(), - work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, hasFollowingStatsTask(), tbd.isMmTable()); - Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false); - - if (bucketCols != null || sortCols != null) { - updatePartitionBucketSortColumns(db, table, partn, bucketCols, - numBuckets, sortCols); - } - - dc = new DataContainer(table.getTTable(), partn.getTPartition()); - // add this partition to post-execution hook - if (work.getOutputs() != null) { - work.getOutputs().add(new WriteEntity(partn, - (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE - : WriteEntity.WriteType.INSERT))); - } - } + dc = handleStaticParts(db, table, tbd, ti); + } } if (SessionState.get() != null && dc != null) { // If we are doing an update or a delete the number of columns in the table will not @@ -500,6 +372,159 @@ public class MoveTask extends Task<MoveWork> implements Serializable { } } + private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc tbd, + TaskInformation ti) throws HiveException, IOException, InvalidOperationException { + List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(), tbd.getPartitionSpec()); + db.validatePartitionNameCharacters(partVals); + Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath() + " into " + tbd.getTable()); + db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(), + tbd.getPartitionSpec(), tbd.getReplace(), + tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(), + work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, hasFollowingStatsTask(), tbd.isMmTable()); + Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false); + + if (ti.bucketCols != null || ti.sortCols != null) { + updatePartitionBucketSortColumns(db, table, partn, ti.bucketCols, + ti.numBuckets, ti.sortCols); + } + + DataContainer dc = new DataContainer(table.getTTable(), partn.getTPartition()); + // add this partition to post-execution hook + if (work.getOutputs() != null) { + work.getOutputs().add(new WriteEntity(partn, + (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE + : WriteEntity.WriteType.INSERT))); + } + return dc; + } + + private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd, + TaskInformation ti, DynamicPartitionCtx dpCtx) throws HiveException, + IOException, InvalidOperationException { + DataContainer dc; + List<LinkedHashMap<String, String>> dps = Utilities.getFullDPSpecs(conf, dpCtx); + + // publish DP columns to its subscribers + if (dps != null && dps.size() > 0) { + pushFeed(FeedType.DYNAMIC_PARTITIONS, dps); + } + console.printInfo(System.getProperty("line.separator")); + long startTime = System.currentTimeMillis(); + // load the list of DP partitions and return the list of partition specs + // TODO: In a follow-up to HIVE-1361, we should refactor loadDynamicPartitions + // to use Utilities.getFullDPSpecs() to get the list of full partSpecs. + // After that check the number of DPs created to not exceed the limit and + // iterate over it and call loadPartition() here. + // The reason we don't do inside HIVE-1361 is the latter is large and we + // want to isolate any potential issue it may introduce. + Map<Map<String, String>, Partition> dp = + db.loadDynamicPartitions( + tbd.getSourcePath(), + tbd.getTable().getTableName(), + tbd.getPartitionSpec(), + tbd.getReplace(), + dpCtx.getNumDPCols(), + isSkewedStoredAsDirs(tbd), + work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, + SessionState.get().getTxnMgr().getCurrentTxnId(), hasFollowingStatsTask(), + work.getLoadTableWork().getWriteType()); + + console.printInfo("\t Time taken to load dynamic partitions: " + + (System.currentTimeMillis() - startTime)/1000.0 + " seconds"); + + if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) { + throw new HiveException("This query creates no partitions." + + " To turn off this error, set hive.error.on.empty.partition=false."); + } + + startTime = System.currentTimeMillis(); + // for each partition spec, get the partition + // and put it to WriteEntity for post-exec hook + for(Map.Entry<Map<String, String>, Partition> entry : dp.entrySet()) { + Partition partn = entry.getValue(); + + if (ti.bucketCols != null || ti.sortCols != null) { + updatePartitionBucketSortColumns( + db, table, partn, ti.bucketCols, ti.numBuckets, ti.sortCols); + } + + WriteEntity enty = new WriteEntity(partn, + (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE : + WriteEntity.WriteType.INSERT)); + if (work.getOutputs() != null) { + work.getOutputs().add(enty); + } + // Need to update the queryPlan's output as well so that post-exec hook get executed. + // This is only needed for dynamic partitioning since for SP the the WriteEntity is + // constructed at compile time and the queryPlan already contains that. + // For DP, WriteEntity creation is deferred at this stage so we need to update + // queryPlan here. + if (queryPlan.getOutputs() == null) { + queryPlan.setOutputs(new LinkedHashSet<WriteEntity>()); + } + queryPlan.getOutputs().add(enty); + + // update columnar lineage for each partition + dc = new DataContainer(table.getTTable(), partn.getTPartition()); + + // Don't set lineage on delete as we don't have all the columns + if (SessionState.get() != null && + work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE && + work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) { + SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc, + table.getCols()); + } + LOG.info("\tLoading partition " + entry.getKey()); + } + console.printInfo("\t Time taken for adding to write entity : " + + (System.currentTimeMillis() - startTime)/1000.0 + " seconds"); + dc = null; // reset data container to prevent it being added again. + return dc; + } + + private void inferTaskInformation(TaskInformation ti) { + // Find the first ancestor of this MoveTask which is some form of map reduce task + // (Either standard, local, or a merge) + while (ti.task.getParentTasks() != null && ti.task.getParentTasks().size() == 1) { + ti.task = (Task)ti.task.getParentTasks().get(0); + // If it was a merge task or a local map reduce task, nothing can be inferred + if (ti.task instanceof MergeFileTask || ti.task instanceof MapredLocalTask) { + break; + } + + // If it's a standard map reduce task, check what, if anything, it inferred about + // the directory this move task is moving + if (ti.task instanceof MapRedTask) { + MapredWork work = (MapredWork)ti.task.getWork(); + MapWork mapWork = work.getMapWork(); + ti.bucketCols = mapWork.getBucketedColsByDirectory().get(ti.path); + ti.sortCols = mapWork.getSortedColsByDirectory().get(ti.path); + if (work.getReduceWork() != null) { + ti.numBuckets = work.getReduceWork().getNumReduceTasks(); + } + + if (ti.bucketCols != null || ti.sortCols != null) { + // This must be a final map reduce task (the task containing the file sink + // operator that writes the final output) + assert work.isFinalMapRed(); + } + break; + } + + // If it's a move task, get the path the files were moved from, this is what any + // preceding map reduce task inferred information about, and moving does not invalidate + // those assumptions + // This can happen when a conditional merge is added before the final MoveTask, but the + // condition for merging is not met, see GenMRFileSink1. + if (ti.task instanceof MoveTask) { + MoveTask mt = (MoveTask)ti.task; + if (mt.getWork().getLoadFileWork() != null) { + ti.path = mt.getWork().getLoadFileWork().getSourcePath().toUri().toString(); + } + } + } + } + private void checkFileFormats(Hive db, LoadTableDesc tbd, Table table) throws HiveException { if (work.getCheckFileFormat()) { http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index a7f7b9f..427f067 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1409,7 +1409,6 @@ public final class Utilities { Path tmpPath = Utilities.toTempPath(specPath); Path taskTmpPath = Utilities.toTaskTempPath(specPath); if (success) { - // TODO# specPath instead of tmpPath FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse( tmpPath, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs); if(statuses != null && statuses.length > 0) { @@ -1423,8 +1422,6 @@ public final class Utilities { Utilities.LOG14535.info("Moving tmp dir: " + tmpPath + " to: " + specPath); Utilities.renameOrMoveFiles(fs, tmpPath, specPath); } - List<Path> paths = new ArrayList<>(); - // TODO#: HERE listFilesToCommit(specPath, fs, paths); } else { Utilities.LOG14535.info("deleting tmpPath " + tmpPath); fs.delete(tmpPath, true); @@ -1445,7 +1442,7 @@ public final class Utilities { * @throws HiveException * @throws IOException */ - private static void createEmptyBuckets(Configuration hconf, List<Path> paths, + static void createEmptyBuckets(Configuration hconf, List<Path> paths, FileSinkDesc conf, Reporter reporter) throws HiveException, IOException { @@ -1586,19 +1583,18 @@ public final class Utilities { for (FileStatus one : items) { if (isTempPath(one)) { - Utilities.LOG14535.info("removeTempOrDuplicateFiles deleting " + one.getPath(), new Exception()); + Utilities.LOG14535.info("removeTempOrDuplicateFiles deleting " + one.getPath()/*, new Exception()*/); if (!fs.delete(one.getPath(), true)) { throw new IOException("Unable to delete tmp file: " + one.getPath()); } } else { String taskId = getPrefixedTaskIdFromFilename(one.getPath().getName()); - Utilities.LOG14535.info("removeTempOrDuplicateFiles pondering " + one.getPath() + ", taskId " + taskId, new Exception()); + Utilities.LOG14535.info("removeTempOrDuplicateFiles pondering " + one.getPath() + ", taskId " + taskId/*, new Exception()*/); FileStatus otherFile = taskIdToFile.get(taskId); if (otherFile == null) { taskIdToFile.put(taskId, one); } else { - // TODO# file choice! // Compare the file sizes of all the attempt files for the same task, the largest win // any attempt files could contain partial results (due to task failures or // speculative runs), but the largest should be the correct one since the result http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 7d8c961..e43c600 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1878,7 +1878,7 @@ private void constructOneLBLocationMap(FileStatus fSta, Utilities.LOG14535.info("loadPartition called for DPP from " + partPath + " to " + tbl.getTableName()); Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace, true, listBucketingEnabled, - false, isAcid, hasFollowingStatsTask, false); // TODO# here + false, isAcid, hasFollowingStatsTask, false); // TODO# special case #N partitionsMap.put(fullPartSpec, newPartition); if (inPlaceEligible) { http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 6ed379a..499530e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6658,7 +6658,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { acidOp = getAcidType(table_desc.getOutputFileFormatClass()); checkAcidConstraints(qb, table_desc, dest_tab); } - ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp); + ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, isMmTable); ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName())); ltd.setLbCtx(lbCtx); @@ -6860,6 +6860,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { dest_path, currentTableId, destTableIsAcid, destTableIsTemporary, destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS, canBeMerged, isMmTable); + if (isMmTable) { + fileSinkDesc.setExecutionPrefix(ctx.getExecutionPrefix()); + } Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild( fileSinkDesc, fsRS, input), inputRR); http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java index 02896ff..26f1d70 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java @@ -160,7 +160,7 @@ public class TypeCheckCtx implements NodeProcessorCtx { if (LOG.isDebugEnabled()) { // Logger the callstack from which the error has been set. LOG.debug("Setting error: [" + error + "] from " - + ((errorSrcNode == null) ? "null" : errorSrcNode.toStringTree()), new Exception()); + + ((errorSrcNode == null) ? "null" : errorSrcNode.toStringTree())/*, new Exception()*/); } this.error = error; this.errorSrcNode = errorSrcNode; http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index 0a4848b..f51999d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -97,6 +97,7 @@ public class FileSinkDesc extends AbstractOperatorDesc { private Path destPath; private boolean isHiveServerQuery; private boolean isMmTable; + private String executionPrefix; public FileSinkDesc() { } @@ -158,6 +159,7 @@ public class FileSinkDesc extends AbstractOperatorDesc { ret.setWriteType(writeType); ret.setTransactionId(txnId); ret.setStatsTmpDir(statsTmpDir); + ret.setExecutionPrefix(executionPrefix); return ret; } @@ -481,4 +483,11 @@ public class FileSinkDesc extends AbstractOperatorDesc { this.statsTmpDir = statsCollectionTempDir; } + public String getExecutionPrefix() { + return this.executionPrefix; + } + + public void setExecutionPrefix(String value) { + this.executionPrefix = value; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java index 5e4e1fe..5cad65c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java @@ -56,7 +56,7 @@ public class LoadFileDesc extends LoadDesc implements Serializable { final boolean isDfsDir, final String columns, final String columnTypes) { super(sourcePath); - Utilities.LOG14535.info("creating LFD from " + sourcePath + " to " + targetDir, new Exception()); + Utilities.LOG14535.info("creating LFD from " + sourcePath + " to " + targetDir/*, new Exception()*/); this.targetDir = targetDir; this.isDfsDir = isDfsDir; this.columns = columns; http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index 1ac831d..3b49197 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -52,10 +52,10 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc final org.apache.hadoop.hive.ql.plan.TableDesc table, final Map<String, String> partitionSpec, final boolean replace, - final AcidUtils.Operation writeType) { + final AcidUtils.Operation writeType, boolean isMmTable) { super(sourcePath); - Utilities.LOG14535.info("creating part LTD from " + sourcePath + " to " + table.getTableName(), new Exception()); - init(table, partitionSpec, replace, writeType, false); + Utilities.LOG14535.info("creating part LTD from " + sourcePath + " to " + table.getTableName()/*, new Exception()*/); + init(table, partitionSpec, replace, writeType, isMmTable); } /** @@ -69,14 +69,16 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc final TableDesc table, final Map<String, String> partitionSpec, final boolean replace) { - this(sourcePath, table, partitionSpec, replace, AcidUtils.Operation.NOT_ACID); + // TODO# we assume mm=false here + this(sourcePath, table, partitionSpec, replace, AcidUtils.Operation.NOT_ACID, false); } public LoadTableDesc(final Path sourcePath, final org.apache.hadoop.hive.ql.plan.TableDesc table, final Map<String, String> partitionSpec, - final AcidUtils.Operation writeType) { - this(sourcePath, table, partitionSpec, true, writeType); + final AcidUtils.Operation writeType, boolean isMmTable) { + // TODO# we assume mm=false here + this(sourcePath, table, partitionSpec, true, writeType, isMmTable); } /** @@ -88,7 +90,8 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc public LoadTableDesc(final Path sourcePath, final org.apache.hadoop.hive.ql.plan.TableDesc table, final Map<String, String> partitionSpec) { - this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID); + // TODO# we assume mm=false here + this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID, false); } public LoadTableDesc(final Path sourcePath, @@ -98,7 +101,7 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc boolean isReplace, boolean isMmTable) { super(sourcePath); - Utilities.LOG14535.info("creating LTD from " + sourcePath + " to " + table.getTableName(), new Exception()); + Utilities.LOG14535.info("creating LTD from " + sourcePath + " to " + table.getTableName()/*, new Exception()*/); this.dpCtx = dpCtx; if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) { init(table, dpCtx.getPartSpec(), isReplace, writeType, isMmTable); http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/test/queries/clientpositive/mm_current.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/mm_current.q b/ql/src/test/queries/clientpositive/mm_current.q index 882096b..11259cb 100644 --- a/ql/src/test/queries/clientpositive/mm_current.q +++ b/ql/src/test/queries/clientpositive/mm_current.q @@ -2,10 +2,20 @@ set hive.mapred.mode=nonstrict; set hive.explain.user=false; set hive.exec.dynamic.partition.mode=nonstrict; set hive.fetch.task.conversion=none; +set tez.grouping.min-size=1; +set tez.grouping.max-size=2; +set hive.tez.auto.reducer.parallelism=false; -drop table simple_mm; - +create table intermediate(key int) partitioned by (p int) stored as orc; +insert into table intermediate partition(p='455') select key from src limit 3; +insert into table intermediate partition(p='456') select key from src limit 3; +insert into table intermediate partition(p='457') select key from src limit 3; -create table simple_mm(key int) partitioned by (key_mm int) tblproperties ('hivecommit'='true'); -insert into table simple_mm partition(key_mm='455') select key from src limit 3; +create table simple_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true'); + +explain insert into table simple_mm partition(key_mm='455') select key from intermediate; +insert into table simple_mm partition(key_mm='455') select key from intermediate; + +drop table simple_mm; +drop table intermediate; http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/test/results/clientpositive/llap/mm_current.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/mm_current.q.out b/ql/src/test/results/clientpositive/llap/mm_current.q.out index 129bb13..8f1af4c 100644 --- a/ql/src/test/results/clientpositive/llap/mm_current.q.out +++ b/ql/src/test/results/clientpositive/llap/mm_current.q.out @@ -1,21 +1,128 @@ -PREHOOK: query: drop table simple123 -PREHOOK: type: DROPTABLE -POSTHOOK: query: drop table simple123 -POSTHOOK: type: DROPTABLE -PREHOOK: query: create table simple123(key int) partitioned by (key123 int) tblproperties ('hivecommit'='true') +PREHOOK: query: create table intermediate(key int) partitioned by (p int) stored as orc PREHOOK: type: CREATETABLE PREHOOK: Output: database:default -PREHOOK: Output: default@simple123 -POSTHOOK: query: create table simple123(key int) partitioned by (key123 int) tblproperties ('hivecommit'='true') +PREHOOK: Output: default@intermediate +POSTHOOK: query: create table intermediate(key int) partitioned by (p int) stored as orc POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default -POSTHOOK: Output: default@simple123 -PREHOOK: query: insert into table simple123 partition(key123='455') select key from src limit 3 +POSTHOOK: Output: default@intermediate +PREHOOK: query: insert into table intermediate partition(p='455') select key from src limit 3 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@intermediate@p=455 +POSTHOOK: query: insert into table intermediate partition(p='455') select key from src limit 3 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@intermediate@p=455 +POSTHOOK: Lineage: intermediate PARTITION(p=455).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +PREHOOK: query: insert into table intermediate partition(p='456') select key from src limit 3 PREHOOK: type: QUERY PREHOOK: Input: default@src -PREHOOK: Output: default@simple123@key123=455 -POSTHOOK: query: insert into table simple123 partition(key123='455') select key from src limit 3 +PREHOOK: Output: default@intermediate@p=456 +POSTHOOK: query: insert into table intermediate partition(p='456') select key from src limit 3 POSTHOOK: type: QUERY POSTHOOK: Input: default@src -POSTHOOK: Output: default@simple123@key123=455 -POSTHOOK: Lineage: simple123 PARTITION(key123=455).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Output: default@intermediate@p=456 +POSTHOOK: Lineage: intermediate PARTITION(p=456).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +PREHOOK: query: insert into table intermediate partition(p='457') select key from src limit 3 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@intermediate@p=457 +POSTHOOK: query: insert into table intermediate partition(p='457') select key from src limit 3 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@intermediate@p=457 +POSTHOOK: Lineage: intermediate PARTITION(p=457).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +PREHOOK: query: create table simple_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@simple_mm +POSTHOOK: query: create table simple_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@simple_mm +PREHOOK: query: explain insert into table simple_mm partition(key_mm='455') select key from intermediate +PREHOOK: type: QUERY +POSTHOOK: query: explain insert into table simple_mm partition(key_mm='455') select key from intermediate +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: intermediate + Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.simple_mm + Execution mode: llap + LLAP IO: all inputs + + Stage: Stage-2 + Dependency Collection + + Stage: Stage-0 + Move Operator + tables: + partition: + key_mm 455 + replace: false + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.simple_mm + micromanaged table: true + + Stage: Stage-3 + Stats-Aggr Operator + +PREHOOK: query: insert into table simple_mm partition(key_mm='455') select key from intermediate +PREHOOK: type: QUERY +PREHOOK: Input: default@intermediate +PREHOOK: Input: default@intermediate@p=455 +PREHOOK: Input: default@intermediate@p=456 +PREHOOK: Input: default@intermediate@p=457 +PREHOOK: Output: default@simple_mm@key_mm=455 +POSTHOOK: query: insert into table simple_mm partition(key_mm='455') select key from intermediate +POSTHOOK: type: QUERY +POSTHOOK: Input: default@intermediate +POSTHOOK: Input: default@intermediate@p=455 +POSTHOOK: Input: default@intermediate@p=456 +POSTHOOK: Input: default@intermediate@p=457 +POSTHOOK: Output: default@simple_mm@key_mm=455 +POSTHOOK: Lineage: simple_mm PARTITION(key_mm=455).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +PREHOOK: query: drop table simple_mm +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@simple_mm +PREHOOK: Output: default@simple_mm +POSTHOOK: query: drop table simple_mm +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@simple_mm +POSTHOOK: Output: default@simple_mm +PREHOOK: query: drop table intermediate +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@intermediate +PREHOOK: Output: default@intermediate +POSTHOOK: query: drop table intermediate +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@intermediate +POSTHOOK: Output: default@intermediate
