HIVE-14635 : establish a separate path for FSOP to write into final path (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d0f5b893 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d0f5b893 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d0f5b893 Branch: refs/heads/hive-14535 Commit: d0f5b893a8f3448882f18cca7cd2fec02c708874 Parents: c97450c Author: Sergey Shelukhin <[email protected]> Authored: Mon Aug 29 11:34:42 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Mon Aug 29 11:34:42 2016 -0700 ---------------------------------------------------------------------- .../metastore/api/hive_metastoreConstants.java | 30 +- .../hadoop/hive/ql/exec/FileSinkOperator.java | 127 +++++--- .../hadoop/hive/ql/exec/JoinOperator.java | 1 + .../apache/hadoop/hive/ql/exec/MoveTask.java | 126 ++++---- .../apache/hadoop/hive/ql/exec/Utilities.java | 17 +- .../apache/hadoop/hive/ql/metadata/Hive.java | 98 +++++- .../hive/ql/optimizer/GenMapRedUtils.java | 4 + .../optimizer/unionproc/UnionProcFactory.java | 3 + .../hadoop/hive/ql/parse/GenTezProcContext.java | 2 +- .../hadoop/hive/ql/parse/GenTezUtils.java | 4 + .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 311 +++++++++++-------- .../ql/plan/ConditionalResolverMergeFiles.java | 5 + .../hadoop/hive/ql/plan/FileSinkDesc.java | 11 +- .../hadoop/hive/ql/plan/LoadFileDesc.java | 2 + .../hadoop/hive/ql/plan/LoadTableDesc.java | 24 +- .../apache/hadoop/hive/ql/plan/MoveWork.java | 1 + .../hive/ql/exec/TestFileSinkOperator.java | 2 +- ql/src/test/queries/clientpositive/mm_all.q | 63 ++++ ql/src/test/queries/clientpositive/mm_current.q | 11 + .../clientpositive/llap/mm_current.q.out | 21 ++ 20 files changed, 567 insertions(+), 296 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java index 8de8896..6a5f550 100644 --- a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java +++ b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java @@ -6,34 +6,7 @@ */ package org.apache.hadoop.hive.metastore.api; -import org.apache.thrift.scheme.IScheme; -import org.apache.thrift.scheme.SchemeFactory; -import org.apache.thrift.scheme.StandardScheme; - -import org.apache.thrift.scheme.TupleScheme; -import org.apache.thrift.protocol.TTupleProtocol; -import org.apache.thrift.protocol.TProtocolException; -import org.apache.thrift.EncodingUtils; -import org.apache.thrift.TException; -import org.apache.thrift.async.AsyncMethodCallback; -import org.apache.thrift.server.AbstractNonblockingServer.*; -import java.util.List; -import java.util.ArrayList; -import java.util.Map; -import java.util.HashMap; -import java.util.EnumMap; -import java.util.Set; -import java.util.HashSet; -import java.util.EnumSet; -import java.util.Collections; -import java.util.BitSet; -import java.nio.ByteBuffer; -import java.util.Arrays; -import javax.annotation.Generated; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) + public class hive_metastoreConstants { public static final String DDL_TIME = "transient_lastDdlTime"; @@ -84,4 +57,5 @@ public class hive_metastoreConstants { public static final String TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties"; + public static final String TABLE_IS_MM = "hivecommit"; } http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index b0c3d3f..755120f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -143,8 +143,8 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } public class FSPaths implements Cloneable { - Path tmpPath; - Path taskOutputTempPath; + private Path tmpPath; + private Path taskOutputTempPath; Path[] outPaths; Path[] finalPaths; RecordWriter[] outWriters; @@ -152,10 +152,21 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements Stat stat; int acidLastBucket = -1; int acidFileOffset = -1; + private boolean isMmTable; + + public FSPaths(Path specPath, boolean isMmTable) { + this.isMmTable = isMmTable; + if (!isMmTable) { + tmpPath = Utilities.toTempPath(specPath); + taskOutputTempPath = Utilities.toTaskTempPath(specPath); + } else { + tmpPath = specPath; + taskOutputTempPath = null; // Should not be used. + } + Utilities.LOG14535.info("new FSPaths for " + numFiles + " files, dynParts = " + bDynParts + + ": tmpPath " + tmpPath + ", task path " + taskOutputTempPath + + " (spec path " + specPath + ")", new Exception()); - public FSPaths(Path specPath) { - tmpPath = Utilities.toTempPath(specPath); - taskOutputTempPath = Utilities.toTaskTempPath(specPath); outPaths = new Path[numFiles]; finalPaths = new Path[numFiles]; outWriters = new RecordWriter[numFiles]; @@ -207,10 +218,12 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } private void commit(FileSystem fs) throws HiveException { + if (isMmTable) return; // TODO#: need to propagate to MoveTask instead for (int idx = 0; idx < outPaths.length; ++idx) { try { if ((bDynParts || isSkewedStoredAsSubDirectories) && !fs.exists(finalPaths[idx].getParent())) { + Utilities.LOG14535.info("commit making path for dyn/skew: " + finalPaths[idx].getParent()); fs.mkdirs(finalPaths[idx].getParent()); } boolean needToRename = true; @@ -229,6 +242,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements needToRename = false; } } + Utilities.LOG14535.info("commit potentially moving " + outPaths[idx] + " to " + finalPaths[idx]); if (needToRename && outPaths[idx] != null && !fs.rename(outPaths[idx], finalPaths[idx])) { throw new HiveException("Unable to rename output from: " + outPaths[idx] + " to: " + finalPaths[idx]); @@ -260,6 +274,54 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements public Stat getStat() { return stat; } + + public void configureDynPartPath(String dirName, String childSpecPathDynLinkedPartitions) { + dirName = (childSpecPathDynLinkedPartitions == null) ? dirName : + dirName + Path.SEPARATOR + childSpecPathDynLinkedPartitions; + tmpPath = new Path(tmpPath, dirName); + if (taskOutputTempPath != null) { + taskOutputTempPath = new Path(taskOutputTempPath, dirName); + } + } + + public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeTable, + boolean isSkewedStoredAsSubDirectories) { + if (isNativeTable) { + String extension = Utilities.getFileExtension(jc, isCompressed, hiveOutputFormat); + if (!isMmTable) { + if (!bDynParts && !isSkewedStoredAsSubDirectories) { + finalPaths[filesIdx] = getFinalPath(taskId, parent, extension); + } else { + finalPaths[filesIdx] = getFinalPath(taskId, tmpPath, extension); + } + outPaths[filesIdx] = getTaskOutPath(taskId); + } else { + if (!bDynParts && !isSkewedStoredAsSubDirectories) { + finalPaths[filesIdx] = getFinalPath(taskId, specPath, extension); + } else { + // TODO# wrong! + finalPaths[filesIdx] = getFinalPath(taskId, specPath, extension); + } + outPaths[filesIdx] = finalPaths[filesIdx]; + } + if (isInfoEnabled) { + LOG.info("Final Path: FS " + finalPaths[filesIdx]); + if (isInfoEnabled && !isMmTable) { + LOG.info("Writing to temp file: FS " + outPaths[filesIdx]); + } + } + } else { + finalPaths[filesIdx] = outPaths[filesIdx] = specPath; + } + } + + public Path getTmpPath() { + return tmpPath; + } + + public Path getTaskOutputTempPath() { + return taskOutputTempPath; + } } // class FSPaths private static final long serialVersionUID = 1L; @@ -297,6 +359,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements protected boolean filesCreated = false; private void initializeSpecPath() { + // TODO# special case #N // For a query of the type: // insert overwrite table T1 // select * from (subq1 union all subq2)u; @@ -397,7 +460,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } if (!bDynParts) { - fsp = new FSPaths(specPath); + fsp = new FSPaths(specPath, conf.isMmTable()); // Create all the files - this is required because empty files need to be created for // empty buckets @@ -411,6 +474,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements .getVar(hconf, HIVE_TEMPORARY_TABLE_STORAGE)); if (isTemporary && fsp != null && tmpStorage != StoragePolicyValue.DEFAULT) { + assert !conf.isMmTable(); // Not supported for temp tables. final Path outputPath = fsp.taskOutputTempPath; StoragePolicyShim shim = ShimLoader.getHadoopShims() .getStoragePolicyShim(fs); @@ -557,7 +621,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements assert filesIdx == numFiles; // in recent hadoop versions, use deleteOnExit to clean tmp files. - if (isNativeTable && fs != null && fsp != null) { + if (isNativeTable && fs != null && fsp != null && !conf.isMmTable()) { autoDelete = fs.deleteOnExit(fsp.outPaths[0]); } } catch (Exception e) { @@ -571,34 +635,16 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) throws HiveException { try { - if (isNativeTable) { - fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, null); - if (isInfoEnabled) { - LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]); - } - fsp.outPaths[filesIdx] = fsp.getTaskOutPath(taskId); - if (isInfoEnabled) { - LOG.info("Writing to temp file: FS " + fsp.outPaths[filesIdx]); - } - } else { - fsp.finalPaths[filesIdx] = fsp.outPaths[filesIdx] = specPath; - } - // The reason to keep these instead of using - // OutputFormat.getRecordWriter() is that - // getRecordWriter does not give us enough control over the file name that - // we create. - String extension = Utilities.getFileExtension(jc, isCompressed, hiveOutputFormat); - if (!bDynParts && !this.isSkewedStoredAsSubDirectories) { - fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, parent, extension); - } else { - fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, extension); - } + fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable, isSkewedStoredAsSubDirectories); + Utilities.LOG14535.info("createBucketForFileIdx " + filesIdx + ": final path " + fsp.finalPaths[filesIdx] + + "; out path " + fsp.outPaths[filesIdx] +" (spec path " + specPath + ", tmp path " + + fsp.getTmpPath() + ", task " + taskId + ")", new Exception()); if (isInfoEnabled) { LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]); } - if (isNativeTable) { + if (isNativeTable && !conf.isMmTable()) { // in recent hadoop versions, use deleteOnExit to clean tmp files. autoDelete = fs.deleteOnExit(fsp.outPaths[filesIdx]); } @@ -828,6 +874,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements protected FSPaths lookupListBucketingPaths(String lbDirName) throws HiveException { FSPaths fsp2 = valToPaths.get(lbDirName); if (fsp2 == null) { + Utilities.LOG14535.info("lookupListBucketingPaths for " + lbDirName); fsp2 = createNewPaths(lbDirName); } return fsp2; @@ -841,18 +888,10 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements * @throws HiveException */ private FSPaths createNewPaths(String dirName) throws HiveException { - FSPaths fsp2 = new FSPaths(specPath); - if (childSpecPathDynLinkedPartitions != null) { - fsp2.tmpPath = new Path(fsp2.tmpPath, - dirName + Path.SEPARATOR + childSpecPathDynLinkedPartitions); - fsp2.taskOutputTempPath = - new Path(fsp2.taskOutputTempPath, - dirName + Path.SEPARATOR + childSpecPathDynLinkedPartitions); - } else { - fsp2.tmpPath = new Path(fsp2.tmpPath, dirName); - fsp2.taskOutputTempPath = - new Path(fsp2.taskOutputTempPath, dirName); - } + FSPaths fsp2 = new FSPaths(specPath, conf.isMmTable()); // TODO# this will break + fsp2.configureDynPartPath(dirName, childSpecPathDynLinkedPartitions); + Utilities.LOG14535.info("creating new paths for " + dirName + ", childSpec " + childSpecPathDynLinkedPartitions + + ": tmpPath " + fsp2.getTmpPath() + ", task path " + fsp2.getTaskOutputTempPath()); if(!conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) { createBucketFiles(fsp2); valToPaths.put(dirName, fsp2); @@ -1082,7 +1121,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements // Hadoop always call close() even if an Exception was thrown in map() or // reduce(). for (FSPaths fsp : valToPaths.values()) { - fsp.abortWriters(fs, abort, !autoDelete && isNativeTable); + fsp.abortWriters(fs, abort, !autoDelete && isNativeTable && !conf.isMmTable()); } } fsp = prevFsp = null; @@ -1193,7 +1232,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements for (Map.Entry<String, FSPaths> entry : valToPaths.entrySet()) { String fspKey = entry.getKey(); // DP/LB FSPaths fspValue = entry.getValue(); - + // TODO# useful code as reference, as it takes apart the crazy paths // for bucketed tables, hive.optimize.sort.dynamic.partition optimization // adds the taskId to the fspKey. if (conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) { http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java index 08cc4b4..8f319ba 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java @@ -233,6 +233,7 @@ public class JoinOperator extends CommonJoinOperator<JoinDesc> implements Serial // point, updates from speculative tasks still writing to tmpPath // will not appear in finalPath. log.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath); + Utilities.LOG14535.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath + "(spec " + specPath + ")"); Utilities.rename(fs, tmpPath, intermediatePath); // Step2: remove any tmp file or double-committed output files Utilities.removeTempOrDuplicateFiles(fs, intermediatePath); http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 546919b..14a84cd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -252,6 +252,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable { if (lfd != null) { Path targetPath = lfd.getTargetDir(); Path sourcePath = lfd.getSourcePath(); + Utilities.LOG14535.info("MoveTask moving LFD " + sourcePath + " to " + targetPath); moveFile(sourcePath, targetPath, lfd.getIsDfsDir()); } @@ -268,6 +269,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable { if (!fs.exists(destPath.getParent())) { fs.mkdirs(destPath.getParent()); } + Utilities.LOG14535.info("MoveTask moving LMFD " + srcPath + " to " + destPath); moveFile(srcPath, destPath, isDfsDir); i++; } @@ -288,71 +290,17 @@ public class MoveTask extends Task<MoveWork> implements Serializable { mesg.append(')'); } String mesg_detail = " from " + tbd.getSourcePath(); + Utilities.LOG14535.info("" + mesg.toString() + " " + mesg_detail); console.printInfo(mesg.toString(), mesg_detail); Table table = db.getTable(tbd.getTable().getTableName()); - if (work.getCheckFileFormat()) { - // Get all files from the src directory - FileStatus[] dirs; - ArrayList<FileStatus> files; - FileSystem srcFs; // source filesystem - try { - srcFs = tbd.getSourcePath().getFileSystem(conf); - dirs = srcFs.globStatus(tbd.getSourcePath()); - files = new ArrayList<FileStatus>(); - for (int i = 0; (dirs != null && i < dirs.length); i++) { - files.addAll(Arrays.asList(srcFs.listStatus(dirs[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER))); - // We only check one file, so exit the loop when we have at least - // one. - if (files.size() > 0) { - break; - } - } - } catch (IOException e) { - throw new HiveException( - "addFiles: filesystem error in check phase", e); - } - - // handle file format check for table level - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECHECKFILEFORMAT)) { - boolean flag = true; - // work.checkFileFormat is set to true only for Load Task, so assumption here is - // dynamic partition context is null - if (tbd.getDPCtx() == null) { - if (tbd.getPartitionSpec() == null || tbd.getPartitionSpec().isEmpty()) { - // Check if the file format of the file matches that of the table. - flag = HiveFileFormatUtils.checkInputFormat( - srcFs, conf, tbd.getTable().getInputFileFormatClass(), files); - } else { - // Check if the file format of the file matches that of the partition - Partition oldPart = db.getPartition(table, tbd.getPartitionSpec(), false); - if (oldPart == null) { - // this means we have just created a table and are specifying partition in the - // load statement (without pre-creating the partition), in which case lets use - // table input format class. inheritTableSpecs defaults to true so when a new - // partition is created later it will automatically inherit input format - // from table object - flag = HiveFileFormatUtils.checkInputFormat( - srcFs, conf, tbd.getTable().getInputFileFormatClass(), files); - } else { - flag = HiveFileFormatUtils.checkInputFormat( - srcFs, conf, oldPart.getInputFormatClass(), files); - } - } - if (!flag) { - throw new HiveException( - "Wrong file format. Please check the file's format."); - } - } else { - LOG.warn("Skipping file format check as dpCtx is not null"); - } - } - } + checkFileFormats(db, tbd, table); // Create a data container DataContainer dc = null; if (tbd.getPartitionSpec().size() == 0) { dc = new DataContainer(table.getTTable()); + Utilities.LOG14535.info("loadTable called from " + tbd.getSourcePath() + " into " + tbd.getTable()); db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getReplace(), work.isSrcLocal(), isSkewedStoredAsDirs(tbd), work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, @@ -495,10 +443,11 @@ public class MoveTask extends Task<MoveWork> implements Serializable { List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(), tbd.getPartitionSpec()); db.validatePartitionNameCharacters(partVals); + Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath() + " into " + tbd.getTable()); db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getPartitionSpec(), tbd.getReplace(), tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(), - work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, hasFollowingStatsTask()); + work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, hasFollowingStatsTask(), tbd.isMmTable()); Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false); if (bucketCols != null || sortCols != null) { @@ -547,6 +496,67 @@ public class MoveTask extends Task<MoveWork> implements Serializable { } } + private void checkFileFormats(Hive db, LoadTableDesc tbd, Table table) + throws HiveException { + if (work.getCheckFileFormat()) { + // Get all files from the src directory + FileStatus[] dirs; + ArrayList<FileStatus> files; + FileSystem srcFs; // source filesystem + try { + srcFs = tbd.getSourcePath().getFileSystem(conf); + dirs = srcFs.globStatus(tbd.getSourcePath()); + files = new ArrayList<FileStatus>(); + for (int i = 0; (dirs != null && i < dirs.length); i++) { + files.addAll(Arrays.asList(srcFs.listStatus(dirs[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER))); + // We only check one file, so exit the loop when we have at least + // one. + if (files.size() > 0) { + break; + } + } + } catch (IOException e) { + throw new HiveException( + "addFiles: filesystem error in check phase", e); + } + + // handle file format check for table level + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECHECKFILEFORMAT)) { + boolean flag = true; + // work.checkFileFormat is set to true only for Load Task, so assumption here is + // dynamic partition context is null + if (tbd.getDPCtx() == null) { + if (tbd.getPartitionSpec() == null || tbd.getPartitionSpec().isEmpty()) { + // Check if the file format of the file matches that of the table. + flag = HiveFileFormatUtils.checkInputFormat( + srcFs, conf, tbd.getTable().getInputFileFormatClass(), files); + } else { + // Check if the file format of the file matches that of the partition + Partition oldPart = db.getPartition(table, tbd.getPartitionSpec(), false); + if (oldPart == null) { + // this means we have just created a table and are specifying partition in the + // load statement (without pre-creating the partition), in which case lets use + // table input format class. inheritTableSpecs defaults to true so when a new + // partition is created later it will automatically inherit input format + // from table object + flag = HiveFileFormatUtils.checkInputFormat( + srcFs, conf, tbd.getTable().getInputFileFormatClass(), files); + } else { + flag = HiveFileFormatUtils.checkInputFormat( + srcFs, conf, oldPart.getInputFormatClass(), files); + } + } + if (!flag) { + throw new HiveException( + "Wrong file format. Please check the file's format."); + } + } else { + LOG.warn("Skipping file format check as dpCtx is not null"); + } + } + } + } + private boolean isSkewedStoredAsDirs(LoadTableDesc tbd) { return (tbd.getLbCtx() == null) ? false : tbd.getLbCtx() .isSkewedStoredAsDir(); http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index a542dc4..5bc04e1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -208,6 +208,9 @@ import com.google.common.base.Preconditions; @SuppressWarnings("nls") public final class Utilities { + // TODO: remove when merging + public static final Logger LOG14535 = LoggerFactory.getLogger("Log14535"); + /** * The object in the reducer are composed of these top level fields. */ @@ -1405,6 +1408,7 @@ public final class Utilities { Path tmpPath = Utilities.toTempPath(specPath); Path taskTmpPath = Utilities.toTaskTempPath(specPath); if (success) { + // TODO# specPath instead of tmpPath FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse( tmpPath, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs); if(statuses != null && statuses.length > 0) { @@ -1414,17 +1418,21 @@ public final class Utilities { if (emptyBuckets.size() > 0) { createEmptyBuckets(hconf, emptyBuckets, conf, reporter); } - // move to the file destination - log.info("Moving tmp dir: " + tmpPath + " to: " + specPath); + Utilities.LOG14535.info("Moving tmp dir: " + tmpPath + " to: " + specPath); Utilities.renameOrMoveFiles(fs, tmpPath, specPath); } + List<Path> paths = new ArrayList<>(); + // TODO#: HERE listFilesToCommit(specPath, fs, paths); } else { + Utilities.LOG14535.info("deleting tmpPath " + tmpPath); fs.delete(tmpPath, true); } + Utilities.LOG14535.info("deleting taskTmpPath " + taskTmpPath); fs.delete(taskTmpPath, true); } + /** * Check the existence of buckets according to bucket specification. Create empty buckets if * needed. @@ -1465,6 +1473,7 @@ public final class Utilities { } for (Path path : paths) { + Utilities.LOG14535.info("creating empty bucket for " + path); RecordWriter writer = HiveFileFormatUtils.getRecordWriter( jc, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), path, reporter); @@ -1576,15 +1585,19 @@ public final class Utilities { for (FileStatus one : items) { if (isTempPath(one)) { + Utilities.LOG14535.info("removeTempOrDuplicateFiles deleting " + one.getPath(), new Exception()); if (!fs.delete(one.getPath(), true)) { throw new IOException("Unable to delete tmp file: " + one.getPath()); } } else { String taskId = getPrefixedTaskIdFromFilename(one.getPath().getName()); + Utilities.LOG14535.info("removeTempOrDuplicateFiles pondering " + one.getPath() + ", taskId " + taskId, new Exception()); + FileStatus otherFile = taskIdToFile.get(taskId); if (otherFile == null) { taskIdToFile.put(taskId, one); } else { + // TODO# file choice! // Compare the file sizes of all the attempt files for the same task, the largest win // any attempt files could contain partial results (due to task failures or // speculative runs), but the largest should be the correct one since the result http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 5f53aef..7d8c961 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1472,10 +1472,12 @@ public class Hive { public void loadPartition(Path loadPath, String tableName, Map<String, String> partSpec, boolean replace, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, - boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask) throws HiveException { + boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask, boolean isMmTable) + throws HiveException { Table tbl = getTable(tableName); + // TODO# dbl check if table is still mm for consistency loadPartition(loadPath, tbl, partSpec, replace, inheritTableSpecs, - isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask); + isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask, isMmTable); } /** @@ -1499,10 +1501,10 @@ public class Hive { * If the source directory is LOCAL * @param isAcid true if this is an ACID operation */ - public Partition loadPartition(Path loadPath, Table tbl, - Map<String, String> partSpec, boolean replace, - boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, - boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask) throws HiveException { + public Partition loadPartition(Path loadPath, Table tbl, Map<String, String> partSpec, + boolean replace, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, + boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask, boolean isMmTable) + throws HiveException { Path tblDataLocationPath = tbl.getDataLocation(); try { @@ -1540,17 +1542,25 @@ public class Hive { } else { newPartPath = oldPartPath; } - List<Path> newFiles = null; - if (replace || (oldPart == null && !isAcid)) { - replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(), - isSrcLocal); - } else { - if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null) { - newFiles = Collections.synchronizedList(new ArrayList<Path>()); + List<Path> newFiles = null, mmFiles = null; + if (isMmTable) { + mmFiles = handleMicromanagedPartition( + loadPath, tbl, replace, oldPart, newPartPath, isAcid); + if (areEventsForDmlNeeded(tbl, oldPart)) { + newFiles = mmFiles; } + } else { + if (replace || (oldPart == null && !isAcid)) { + replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(), + isSrcLocal); + } else { + if (areEventsForDmlNeeded(tbl, oldPart)) { + newFiles = Collections.synchronizedList(new ArrayList<Path>()); + } - FileSystem fs = tbl.getDataLocation().getFileSystem(conf); - Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, newFiles); + FileSystem fs = tbl.getDataLocation().getFileSystem(conf); + Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, newFiles); + } } Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath); alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString()); @@ -1621,6 +1631,58 @@ public class Hive { } } + + private boolean areEventsForDmlNeeded(Table tbl, Partition oldPart) { + return conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null; + } + + + private List<Path> handleMicromanagedPartition(Path loadPath, Table tbl, boolean replace, + Partition oldPart, Path newPartPath, boolean isAcid) throws HiveException { + Utilities.LOG14535.info("not moving " + loadPath + " to " + newPartPath); + if (replace) { + // TODO#: would need a list of new files to support. Then, old ones only would need + // to be removed from MS (and FS). Also, per-partition IOW is problematic for + // the prefix case. + throw new HiveException("Replace and MM are not supported"); + } + if (isAcid) { + // TODO# need to make sure ACID writes to final directories. Otherwise, might need to move. + throw new HiveException("ACID and MM are not supported"); + } + List<Path> newFiles = new ArrayList<Path>(); + FileStatus[] srcs; + FileSystem srcFs; + try { + srcFs = loadPath.getFileSystem(conf); + srcs = srcFs.globStatus(loadPath); + } catch (IOException e) { + LOG.error("Error listing files", e); + throw new HiveException(e); + } + if (srcs == null) { + LOG.info("No sources specified: " + loadPath); + return newFiles; + } + + // TODO: just like the move path, we only do one level of recursion. + for (FileStatus src : srcs) { + if (src.isDirectory()) { + try { + for (FileStatus srcFile : + srcFs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER)) { + newFiles.add(srcFile.getPath()); + } + } catch (IOException e) { + throw new HiveException(e); + } + } else { + newFiles.add(src.getPath()); + } + } + return newFiles; + } + private void setStatsPropAndAlterPartition(boolean hasFollowingStatsTask, Table tbl, Partition newTPart) throws MetaException, TException { EnvironmentContext environmentContext = null; @@ -1813,9 +1875,10 @@ private void constructOneLBLocationMap(FileStatus fSta, LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec); // load the partition + Utilities.LOG14535.info("loadPartition called for DPP from " + partPath + " to " + tbl.getTableName()); Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace, true, listBucketingEnabled, - false, isAcid, hasFollowingStatsTask); + false, isAcid, hasFollowingStatsTask, false); // TODO# here partitionsMap.put(fullPartSpec, newPartition); if (inPlaceEligible) { @@ -2803,6 +2866,9 @@ private void constructOneLBLocationMap(FileStatus fSta, destPath = mvFile(conf, srcP, destPath, isSrcLocal, srcFs, destFs, name, filetype); } + if (inheritPerms) { + HdfsUtils.setFullFileStatus(conf, fullDestStatus, srcGroup, destFs, destPath, false); + } if (null != newFiles) { newFiles.add(destPath); } http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index cea99e1..4e44d49 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1268,6 +1268,7 @@ public final class GenMapRedUtils { // Create a FileSink operator TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone(); + // TODO# special case #N - merge FS is created here FileSinkDesc fsOutputDesc = new FileSinkDesc(finalName, ts, conf.getBoolVar(ConfVars.COMPRESSRESULT)); FileSinkOperator fsOutput = (FileSinkOperator) OperatorFactory.getAndMakeChild( @@ -1806,6 +1807,7 @@ public final class GenMapRedUtils { // Create the required temporary file in the HDFS location if the destination // path of the FileSinkOperator table is a blobstore path. + // TODO# HERE Path tmpDir = baseCtx.getTempDirForPath(fileSinkDesc.getDestPath()); // Change all the linked file sink descriptors @@ -1813,9 +1815,11 @@ public final class GenMapRedUtils { for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) { fsConf.setParentDir(tmpDir); fsConf.setDirName(new Path(tmpDir, fsConf.getDirName().getName())); + Utilities.LOG14535.info("createMoveTask setting tmpDir for LinkedFileSink chDir " + fsConf.getDirName() + "; new parent " + tmpDir + ", dest was " + fileSinkDesc.getDestPath()); } } else { fileSinkDesc.setDirName(tmpDir); + Utilities.LOG14535.info("createMoveTask setting tmpDir for LinkedFileSink chDir " + tmpDir + "; dest was " + fileSinkDesc.getDestPath()); } } http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java index 2a7f3d4..7f7d192 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.UnionOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -217,11 +218,13 @@ public final class UnionProcFactory { // each parent List<FileSinkDesc> fileDescLists = new ArrayList<FileSinkDesc>(); + // TODO# special case #N - unions for (Operator<? extends OperatorDesc> parent : parents) { FileSinkDesc fileSinkDesc = (FileSinkDesc) fileSinkOp.getConf().clone(); fileSinkDesc.setDirName(new Path(parentDirName, parent.getIdentifier())); fileSinkDesc.setLinkedFileSink(true); fileSinkDesc.setParentDir(parentDirName); + Utilities.LOG14535.info("Created LinkedFileSink for union " + fileSinkDesc.getDirName() + "; parent " + parentDirName); parent.setChildOperators(null); Operator<? extends OperatorDesc> tmpFileSinkOp = OperatorFactory.getAndMakeChild(fileSinkDesc, parent.getSchema(), parent); http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java index 0c160ac..e1fc103 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java @@ -65,7 +65,7 @@ public class GenTezProcContext implements NodeProcessorCtx{ public final ParseContext parseContext; public final HiveConf conf; - public final List<Task<MoveWork>> moveTask; + public final List<Task<MoveWork>> moveTask; // TODO# // rootTasks is the entry point for all generated tasks public final List<Task<? extends Serializable>> rootTasks; http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 6715dbf..f4b23e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -305,7 +305,9 @@ public class GenTezUtils { linked.add(desc); desc.setIndexInTezUnion(linked.size()); + // TODO# special case #N - unions (tez) desc.setDirName(new Path(path, "" + desc.getIndexInTezUnion())); + Utilities.LOG14535.info("removing union - new desc with " + desc.getDirName() + "; parent " + path); desc.setLinkedFileSink(true); desc.setParentDir(path); desc.setLinkedFileSinkDesc(linked); @@ -373,6 +375,8 @@ public class GenTezUtils { // If underlying data is RCFile or OrcFile, RCFileBlockMerge task or // OrcFileStripeMerge task would be created. LOG.info("using CombineHiveInputformat for the merge job"); + Utilities.LOG14535.info("merging files from " + fileSink.getConf().getDirName() + " to " + finalName); + // TODO# special case #N - merge GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName, context.dependencyTask, context.moveTask, hconf, context.currentTask); http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 66589fe..c54a171 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -73,6 +73,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.SQLForeignKey; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryProperties; @@ -6542,6 +6543,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { LoadTableDesc ltd = null; ListBucketingCtx lbCtx = null; Map<String, String> partSpec = null; + boolean isMmTable = false; switch (dest_type.intValue()) { case QBMetaData.DEST_TABLE: { @@ -6551,70 +6553,27 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { destTableIsTemporary = dest_tab.isTemporary(); // Is the user trying to insert into a external tables - if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) && - (dest_tab.getTableType().equals(TableType.EXTERNAL_TABLE))) { - throw new SemanticException( - ErrorMsg.INSERT_EXTERNAL_TABLE.getMsg(dest_tab.getTableName())); - } + checkExternalTable(dest_tab); partSpec = qbm.getPartSpecForAlias(dest); dest_path = dest_tab.getPath(); - // If the query here is an INSERT_INTO and the target is an immutable table, - // verify that our destination is empty before proceeding - if (dest_tab.isImmutable() && - qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),dest_tab.getTableName())){ - try { - FileSystem fs = dest_path.getFileSystem(conf); - if (! MetaStoreUtils.isDirEmpty(fs,dest_path)){ - LOG.warn("Attempted write into an immutable table : " - + dest_tab.getTableName() + " : " + dest_path); - throw new SemanticException( - ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(dest_tab.getTableName())); - } - } catch (IOException ioe) { - LOG.warn("Error while trying to determine if immutable table has any data : " - + dest_tab.getTableName() + " : " + dest_path); - throw new SemanticException(ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(ioe.getMessage())); - } - } + checkImmutableTable(qb, dest_tab, dest_path, false); - // check for partition - List<FieldSchema> parts = dest_tab.getPartitionKeys(); - if (parts != null && parts.size() > 0) { // table is partitioned - if (partSpec == null || partSpec.size() == 0) { // user did NOT specify partition - throw new SemanticException(generateErrorMessage( - qb.getParseInfo().getDestForClause(dest), - ErrorMsg.NEED_PARTITION_ERROR.getMsg())); - } - dpCtx = qbm.getDPCtx(dest); - if (dpCtx == null) { - dest_tab.validatePartColumnNames(partSpec, false); - dpCtx = new DynamicPartitionCtx(dest_tab, partSpec, - conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME), - conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE)); - qbm.setDPCtx(dest, dpCtx); - } - - if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING)) { // allow DP - throw new SemanticException(generateErrorMessage( - qb.getParseInfo().getDestForClause(dest), - ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg())); - } - if (dpCtx.getSPPath() != null) { - dest_path = new Path(dest_tab.getPath(), dpCtx.getSPPath()); - } - if ((dest_tab.getNumBuckets() > 0)) { - dpCtx.setNumBuckets(dest_tab.getNumBuckets()); - } + // Check for dynamic partitions. + dpCtx = checkDynPart(qb, qbm, dest_tab, partSpec, dest); + if (dpCtx != null && dpCtx.getSPPath() != null) { + dest_path = new Path(dest_tab.getPath(), dpCtx.getSPPath()); } boolean isNonNativeTable = dest_tab.isNonNative(); - if (isNonNativeTable) { + isMmTable = isMmTable(dest_tab); + if (isNonNativeTable || isMmTable) { queryTmpdir = dest_path; } else { queryTmpdir = ctx.getTempDirForPath(dest_path); } + Utilities.LOG14535.info("createFS for table specifying " + queryTmpdir + " from " + dest_path); if (dpCtx != null) { // set the root of the temporary path where dynamic partition columns will populate dpCtx.setRootPath(queryTmpdir); @@ -6641,9 +6600,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { acidOp = getAcidType(table_desc.getOutputFileFormatClass()); checkAcidConstraints(qb, table_desc, dest_tab); } - ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp); - ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), - dest_tab.getTableName())); + boolean isReplace = !qb.getParseInfo().isInsertIntoTable( + dest_tab.getDbName(), dest_tab.getTableName()); + ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, isReplace, isMmTable); ltd.setLbCtx(lbCtx); loadTableWork.add(ltd); } else { @@ -6652,42 +6611,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { setStatsForNonNativeTable(dest_tab); } - WriteEntity output = null; - - // Here only register the whole table for post-exec hook if no DP present - // in the case of DP, we will register WriteEntity in MoveTask when the - // list of dynamically created partitions are known. - if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) { - output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable)); - if (!outputs.add(output)) { - throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES - .getMsg(dest_tab.getTableName())); - } - } - if ((dpCtx != null) && (dpCtx.getNumDPCols() >= 0)) { - // No static partition specified - if (dpCtx.getNumSPCols() == 0) { - output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable), false); - outputs.add(output); - } - // part of the partition specified - // Create a DummyPartition in this case. Since, the metastore does not store partial - // partitions currently, we need to store dummy partitions - else { - try { - String ppath = dpCtx.getSPPath(); - ppath = ppath.substring(0, ppath.length() - 1); - DummyPartition p = - new DummyPartition(dest_tab, dest_tab.getDbName() - + "@" + dest_tab.getTableName() + "@" + ppath, - partSpec); - output = new WriteEntity(p, WriteEntity.WriteType.INSERT, false); - outputs.add(output); - } catch (HiveException e) { - throw new SemanticException(e.getMessage(), e); - } - } - } + WriteEntity output = generateTableWriteEntity( + dest_tab, partSpec, ltd, dpCtx, isNonNativeTable); ctx.getLoadTableOutputMap().put(ltd, output); break; @@ -6697,40 +6622,22 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { dest_part = qbm.getDestPartitionForAlias(dest); dest_tab = dest_part.getTable(); destTableIsAcid = AcidUtils.isAcidTable(dest_tab); - if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) && - dest_tab.getTableType().equals(TableType.EXTERNAL_TABLE)) { - throw new SemanticException( - ErrorMsg.INSERT_EXTERNAL_TABLE.getMsg(dest_tab.getTableName())); - } + + checkExternalTable(dest_tab); Path tabPath = dest_tab.getPath(); Path partPath = dest_part.getDataLocation(); - // If the query here is an INSERT_INTO and the target is an immutable table, - // verify that our destination is empty before proceeding - if (dest_tab.isImmutable() && - qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),dest_tab.getTableName())){ - try { - FileSystem fs = partPath.getFileSystem(conf); - if (! MetaStoreUtils.isDirEmpty(fs,partPath)){ - LOG.warn("Attempted write into an immutable table partition : " - + dest_tab.getTableName() + " : " + partPath); - throw new SemanticException( - ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(dest_tab.getTableName())); - } - } catch (IOException ioe) { - LOG.warn("Error while trying to determine if immutable table partition has any data : " - + dest_tab.getTableName() + " : " + partPath); - throw new SemanticException(ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(ioe.getMessage())); - } - } + checkImmutableTable(qb, dest_tab, partPath, true); // if the table is in a different dfs than the partition, // replace the partition's dfs with the table's dfs. dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri() .getAuthority(), partPath.toUri().getPath()); - queryTmpdir = ctx.getTempDirForPath(dest_path); + isMmTable = isMmTable(dest_tab); + queryTmpdir = isMmTable ? dest_path : ctx.getTempDirForPath(dest_path); + Utilities.LOG14535.info("createFS for partition specifying " + queryTmpdir + " from " + dest_path); table_desc = Utilities.getTableDesc(dest_tab); // Add sorting/bucketing if needed @@ -6946,6 +6853,54 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { genPartnCols(dest, input, qb, table_desc, dest_tab, rsCtx); } + FileSinkDesc fileSinkDesc = createFileSinkDesc(table_desc, dest_part, + dest_path, currentTableId, destTableIsAcid, destTableIsTemporary, + destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS, + canBeMerged, isMmTable); + + Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild( + fileSinkDesc, fsRS, input), inputRR); + + handleLineage(ltd, output); + + if (LOG.isDebugEnabled()) { + LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: " + + dest_path + " row schema: " + inputRR.toString()); + } + + FileSinkOperator fso = (FileSinkOperator) output; + fso.getConf().setTable(dest_tab); + fsopToTable.put(fso, dest_tab); + // the following code is used to collect column stats when + // hive.stats.autogather=true + // and it is an insert overwrite or insert into table + if (dest_tab != null && conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER) + && conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER) + && ColumnStatsAutoGatherContext.canRunAutogatherStats(fso)) { + if (dest_type.intValue() == QBMetaData.DEST_TABLE) { + genAutoColumnStatsGatheringPipeline(qb, table_desc, partSpec, input, qb.getParseInfo() + .isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName())); + } else if (dest_type.intValue() == QBMetaData.DEST_PARTITION) { + genAutoColumnStatsGatheringPipeline(qb, table_desc, dest_part.getSpec(), input, qb + .getParseInfo().isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName())); + + } + } + return output; + } + + private static boolean isMmTable(Table table) { + // TODO: perhaps it should be a 3rd value for 'transactional'? + String value = table.getProperty(hive_metastoreConstants.TABLE_IS_MM); + return value != null && value.equalsIgnoreCase("true"); + } + + private FileSinkDesc createFileSinkDesc(TableDesc table_desc, + Partition dest_part, Path dest_path, int currentTableId, + boolean destTableIsAcid, boolean destTableIsTemporary, + boolean destTableIsMaterialization, Path queryTmpdir, + SortBucketRSCtx rsCtx, DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx, + RowSchema fsRS, boolean canBeMerged, boolean isMmTable) throws SemanticException { FileSinkDesc fileSinkDesc = new FileSinkDesc( queryTmpdir, table_desc, @@ -6957,7 +6912,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { rsCtx.getTotalFiles(), rsCtx.getPartnCols(), dpCtx, - dest_path); + dest_path, + isMmTable); fileSinkDesc.setHiveServerQuery(SessionState.get().isHiveServerQuery()); // If this is an insert, update, or delete on an ACID table then mark that so the @@ -7001,10 +6957,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } else if (dpCtx != null) { fileSinkDesc.setStaticSpec(dpCtx.getSPPath()); } + return fileSinkDesc; + } - Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild( - fileSinkDesc, fsRS, input), inputRR); - + private void handleLineage(LoadTableDesc ltd, Operator output) + throws SemanticException { if (ltd != null && SessionState.get() != null) { SessionState.get().getLineageState() .mapDirToFop(ltd.getSourcePath(), (FileSinkOperator) output); @@ -7022,33 +6979,111 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { SessionState.get().getLineageState() .mapDirToFop(tlocation, (FileSinkOperator) output); } + } - if (LOG.isDebugEnabled()) { - LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: " - + dest_path + " row schema: " + inputRR.toString()); + private WriteEntity generateTableWriteEntity(Table dest_tab, + Map<String, String> partSpec, LoadTableDesc ltd, + DynamicPartitionCtx dpCtx, boolean isNonNativeTable) + throws SemanticException { + WriteEntity output = null; + + // Here only register the whole table for post-exec hook if no DP present + // in the case of DP, we will register WriteEntity in MoveTask when the + // list of dynamically created partitions are known. + if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) { + output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable)); + if (!outputs.add(output)) { + throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES + .getMsg(dest_tab.getTableName())); + } } + if ((dpCtx != null) && (dpCtx.getNumDPCols() >= 0)) { + // No static partition specified + if (dpCtx.getNumSPCols() == 0) { + output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable), false); + outputs.add(output); + } + // part of the partition specified + // Create a DummyPartition in this case. Since, the metastore does not store partial + // partitions currently, we need to store dummy partitions + else { + try { + String ppath = dpCtx.getSPPath(); + ppath = ppath.substring(0, ppath.length() - 1); + DummyPartition p = + new DummyPartition(dest_tab, dest_tab.getDbName() + + "@" + dest_tab.getTableName() + "@" + ppath, + partSpec); + output = new WriteEntity(p, WriteEntity.WriteType.INSERT, false); + outputs.add(output); + } catch (HiveException e) { + throw new SemanticException(e.getMessage(), e); + } + } + } + return output; + } - FileSinkOperator fso = (FileSinkOperator) output; - fso.getConf().setTable(dest_tab); - fsopToTable.put(fso, dest_tab); - // the following code is used to collect column stats when - // hive.stats.autogather=true - // and it is an insert overwrite or insert into table - if (dest_tab != null && conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER) - && conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER) - && ColumnStatsAutoGatherContext.canRunAutogatherStats(fso)) { - if (dest_type.intValue() == QBMetaData.DEST_TABLE) { - genAutoColumnStatsGatheringPipeline(qb, table_desc, partSpec, input, qb.getParseInfo() - .isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName())); - } else if (dest_type.intValue() == QBMetaData.DEST_PARTITION) { - genAutoColumnStatsGatheringPipeline(qb, table_desc, dest_part.getSpec(), input, qb - .getParseInfo().isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName())); + private void checkExternalTable(Table dest_tab) throws SemanticException { + if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) && + (dest_tab.getTableType().equals(TableType.EXTERNAL_TABLE))) { + throw new SemanticException( + ErrorMsg.INSERT_EXTERNAL_TABLE.getMsg(dest_tab.getTableName())); + } + } + private void checkImmutableTable(QB qb, Table dest_tab, Path dest_path, boolean isPart) + throws SemanticException { + // If the query here is an INSERT_INTO and the target is an immutable table, + // verify that our destination is empty before proceeding + if (!dest_tab.isImmutable() || !qb.getParseInfo().isInsertIntoTable( + dest_tab.getDbName(), dest_tab.getTableName())) { + return; + } + try { + FileSystem fs = dest_path.getFileSystem(conf); + if (! MetaStoreUtils.isDirEmpty(fs,dest_path)){ + LOG.warn("Attempted write into an immutable table : " + + dest_tab.getTableName() + " : " + dest_path); + throw new SemanticException( + ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(dest_tab.getTableName())); } + } catch (IOException ioe) { + LOG.warn("Error while trying to determine if immutable table " + + (isPart ? "partition " : "") + "has any data : " + dest_tab.getTableName() + + " : " + dest_path); + throw new SemanticException(ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(ioe.getMessage())); } - return output; } + private DynamicPartitionCtx checkDynPart(QB qb, QBMetaData qbm, Table dest_tab, + Map<String, String> partSpec, String dest) throws SemanticException { + List<FieldSchema> parts = dest_tab.getPartitionKeys(); + if (parts == null || parts.isEmpty()) return null; // table is not partitioned + if (partSpec == null || partSpec.size() == 0) { // user did NOT specify partition + throw new SemanticException(generateErrorMessage(qb.getParseInfo().getDestForClause(dest), + ErrorMsg.NEED_PARTITION_ERROR.getMsg())); + } + DynamicPartitionCtx dpCtx = qbm.getDPCtx(dest); + if (dpCtx == null) { + dest_tab.validatePartColumnNames(partSpec, false); + dpCtx = new DynamicPartitionCtx(dest_tab, partSpec, + conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME), + conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE)); + qbm.setDPCtx(dest, dpCtx); + } + + if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING)) { // allow DP + throw new SemanticException(generateErrorMessage(qb.getParseInfo().getDestForClause(dest), + ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg())); + } + if ((dest_tab.getNumBuckets() > 0)) { + dpCtx.setNumBuckets(dest_tab.getNumBuckets()); + } + return dpCtx; + } + + private void genAutoColumnStatsGatheringPipeline(QB qb, TableDesc table_desc, Map<String, String> partSpec, Operator curr, boolean isInsertInto) throws SemanticException { String tableName = table_desc.getTableName(); http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java index 68b0ad9..ffc9c3e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.Utilities; /** * Conditional task resolution interface. This is invoked at run time to get the @@ -243,6 +244,7 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, Path path = ptpi.keySet().iterator().next(); PartitionDesc partDesc = ptpi.get(path); TableDesc tblDesc = partDesc.getTableDesc(); + Utilities.LOG14535.info("merge resolver removing " + path); work.removePathToPartitionInfo(path); // the root path is not useful anymore // cleanup pathToAliases @@ -264,9 +266,12 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, totalSz += len; PartitionDesc pDesc = (dpCtx != null) ? generateDPFullPartSpec(dpCtx, status, tblDesc, i) : partDesc; + Utilities.LOG14535.info("merge resolver will merge " + status[i].getPath()); work.resolveDynamicPartitionStoredAsSubDirsMerge(conf, status[i].getPath(), tblDesc, aliases, pDesc); } else { + Utilities.LOG14535.info("merge resolver will move " + status[i].getPath()); + toMove.add(status[i].getPath()); } } http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index ce0e0a8..6ae7fa8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -97,6 +97,7 @@ public class FileSinkDesc extends AbstractOperatorDesc { private transient Table table; private Path destPath; private boolean isHiveServerQuery; + private boolean isMmTable; public FileSinkDesc() { } @@ -107,7 +108,8 @@ public class FileSinkDesc extends AbstractOperatorDesc { public FileSinkDesc(final Path dirName, final TableDesc tableInfo, final boolean compressed, final int destTableId, final boolean multiFileSpray, final boolean canBeMerged, final int numFiles, final int totalFiles, - final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx, Path destPath) { + final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx, Path destPath, + boolean isMmTable) { this.dirName = dirName; this.tableInfo = tableInfo; @@ -121,6 +123,7 @@ public class FileSinkDesc extends AbstractOperatorDesc { this.dpCtx = dpCtx; this.dpSortState = DPSortState.NONE; this.destPath = destPath; + this.isMmTable = isMmTable; } public FileSinkDesc(final Path dirName, final TableDesc tableInfo, @@ -142,7 +145,7 @@ public class FileSinkDesc extends AbstractOperatorDesc { public Object clone() throws CloneNotSupportedException { FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed, destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles, - partitionCols, dpCtx, destPath); + partitionCols, dpCtx, destPath, isMmTable); ret.setCompressCodec(compressCodec); ret.setCompressType(compressType); ret.setGatherStats(gatherStats); @@ -249,6 +252,10 @@ public class FileSinkDesc extends AbstractOperatorDesc { this.temporary = temporary; } + public boolean isMmTable() { + return isMmTable; + } + public boolean isMaterialization() { return materialization; } http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java index df153a2..5e4e1fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java @@ -22,6 +22,7 @@ import java.io.Serializable; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.PTFUtils; +import org.apache.hadoop.hive.ql.exec.Utilities; /** * LoadFileDesc. @@ -55,6 +56,7 @@ public class LoadFileDesc extends LoadDesc implements Serializable { final boolean isDfsDir, final String columns, final String columnTypes) { super(sourcePath); + Utilities.LOG14535.info("creating LFD from " + sourcePath + " to " + targetDir, new Exception()); this.targetDir = targetDir; this.isDfsDir = isDfsDir; this.columns = columns; http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index 771a919..1ac831d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -23,6 +23,7 @@ import java.util.LinkedHashMap; import java.util.Map; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -41,18 +42,20 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc // Need to remember whether this is an acid compliant operation, and if so whether it is an // insert, update, or delete. private AcidUtils.Operation writeType; + private boolean isMmTable; // TODO: the below seems like they should just be combined into partitionDesc private org.apache.hadoop.hive.ql.plan.TableDesc table; private Map<String, String> partitionSpec; // NOTE: this partitionSpec has to be ordered map - public LoadTableDesc(final Path sourcePath, + private LoadTableDesc(final Path sourcePath, final org.apache.hadoop.hive.ql.plan.TableDesc table, final Map<String, String> partitionSpec, final boolean replace, final AcidUtils.Operation writeType) { super(sourcePath); - init(table, partitionSpec, replace, writeType); + Utilities.LOG14535.info("creating part LTD from " + sourcePath + " to " + table.getTableName(), new Exception()); + init(table, partitionSpec, replace, writeType, false); } /** @@ -91,13 +94,16 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc public LoadTableDesc(final Path sourcePath, final org.apache.hadoop.hive.ql.plan.TableDesc table, final DynamicPartitionCtx dpCtx, - final AcidUtils.Operation writeType) { + final AcidUtils.Operation writeType, + boolean isReplace, + boolean isMmTable) { super(sourcePath); + Utilities.LOG14535.info("creating LTD from " + sourcePath + " to " + table.getTableName(), new Exception()); this.dpCtx = dpCtx; if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) { - init(table, dpCtx.getPartSpec(), true, writeType); + init(table, dpCtx.getPartSpec(), isReplace, writeType, isMmTable); } else { - init(table, new LinkedHashMap<String, String>(), true, writeType); + init(table, new LinkedHashMap<String, String>(), isReplace, writeType, isMmTable); } } @@ -105,11 +111,12 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc final org.apache.hadoop.hive.ql.plan.TableDesc table, final Map<String, String> partitionSpec, final boolean replace, - AcidUtils.Operation writeType) { + AcidUtils.Operation writeType, boolean isMmTable) { this.table = table; this.partitionSpec = partitionSpec; this.replace = replace; this.writeType = writeType; + this.isMmTable = isMmTable; } @Explain(displayName = "table", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -135,6 +142,11 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc return replace; } + @Explain(displayName = "micromanaged table") + public boolean isMmTable() { + return isMmTable; + } + public void setReplace(boolean replace) { this.replace = replace; } http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java index 9f498c7..227b0d2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level; */ @Explain(displayName = "Move Operator", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public class MoveWork implements Serializable { + // TODO# all the places where MoveWork is created need to be handled. private static final long serialVersionUID = 1L; private LoadTableDesc loadTableWork; private LoadFileDesc loadFileWork; http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index a8d7c9c..1c27873 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -285,7 +285,7 @@ public class TestFileSinkOperator { partColMap.put(PARTCOL_NAME, null); DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(null, partColMap, "Sunday", 100); //todo: does this need the finalDestination? - desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null); + desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null, false); } else { desc = new FileSinkDesc(basePath, tableDesc, false); } http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/test/queries/clientpositive/mm_all.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/mm_all.q b/ql/src/test/queries/clientpositive/mm_all.q new file mode 100644 index 0000000..aaf8d48 --- /dev/null +++ b/ql/src/test/queries/clientpositive/mm_all.q @@ -0,0 +1,63 @@ +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.fetch.task.conversion=none; + +drop table simple_mm; +drop table partunion_mm; +drop table merge_mm; +drop table ctas_mm; +drop table T1; +drop table T2; +drop table skew_mm; + + +create table simple_mm(key int) partitioned by (key_mm int) tblproperties ('hivecommit'='true'); +insert into table simple_mm partition(key_mm='455') select key from src limit 3; + +create table ctas_mm tblproperties ('hivecommit'='true') as select * from src limit 3; + +create table partunion_mm(id_mm int) partitioned by (key_mm int) tblproperties ('hivecommit'='true'); + + +insert into table partunion_mm partition(key_mm) +select temps.* from ( +select key as key_mm, key from ctas_mm +union all +select key as key_mm, key from simple_mm ) temps; + +set hive.merge.mapredfiles=true; +set hive.merge.sparkfiles=true; +set hive.merge.tezfiles=true; + +CREATE TABLE merge_mm (key INT, value STRING) + PARTITIONED BY (ds STRING, part STRING) STORED AS ORC tblproperties ('hivecommit'='true'); + +EXPLAIN +INSERT OVERWRITE TABLE merge_mm PARTITION (ds='123', part) + SELECT key, value, PMOD(HASH(key), 2) as part + FROM src; + +INSERT OVERWRITE TABLE merge_mm PARTITION (ds='123', part) + SELECT key, value, PMOD(HASH(key), 2) as part + FROM src; + + +set hive.optimize.skewjoin.compiletime = true; +-- the test case is wrong? + +CREATE TABLE T1(key STRING, val STRING) +SKEWED BY (key) ON ((2)) STORED AS TEXTFILE; +LOAD DATA LOCAL INPATH '../../data/files/T1.txt' INTO TABLE T1; +CREATE TABLE T2(key STRING, val STRING) +SKEWED BY (key) ON ((3)) STORED AS TEXTFILE; +LOAD DATA LOCAL INPATH '../../data/files/T2.txt' INTO TABLE T2; + +EXPLAIN +SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key; + +create table skew_mm(k1 string, k2 string, k3 string, k4 string) SKEWED BY (key) ON ((2)) tblproperties ('hivecommit'='true'); +INSERT OVERWRITE TABLE skew_mm +SELECT a.key as k1, a.val as k2, b.key as k3, b.val as k4 FROM T1 a JOIN T2 b ON a.key = b.key; + +-- TODO load, acid, etc http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/test/queries/clientpositive/mm_current.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/mm_current.q b/ql/src/test/queries/clientpositive/mm_current.q new file mode 100644 index 0000000..882096b --- /dev/null +++ b/ql/src/test/queries/clientpositive/mm_current.q @@ -0,0 +1,11 @@ +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.fetch.task.conversion=none; + +drop table simple_mm; + + +create table simple_mm(key int) partitioned by (key_mm int) tblproperties ('hivecommit'='true'); +insert into table simple_mm partition(key_mm='455') select key from src limit 3; + http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/test/results/clientpositive/llap/mm_current.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/mm_current.q.out b/ql/src/test/results/clientpositive/llap/mm_current.q.out new file mode 100644 index 0000000..129bb13 --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/mm_current.q.out @@ -0,0 +1,21 @@ +PREHOOK: query: drop table simple123 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table simple123 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table simple123(key int) partitioned by (key123 int) tblproperties ('hivecommit'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@simple123 +POSTHOOK: query: create table simple123(key int) partitioned by (key123 int) tblproperties ('hivecommit'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@simple123 +PREHOOK: query: insert into table simple123 partition(key123='455') select key from src limit 3 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@simple123@key123=455 +POSTHOOK: query: insert into table simple123 partition(key123='455') select key from src limit 3 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@simple123@key123=455 +POSTHOOK: Lineage: simple123 PARTITION(key123=455).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
