Repository: hive Updated Branches: refs/heads/hive-14535 e083d33ac -> 2e602596f
HIVE-15027 : make sure export takes MM information into account (Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2e602596 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2e602596 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2e602596 Branch: refs/heads/hive-14535 Commit: 2e602596f7af6c302fd23628d4337673ca38be86 Parents: e083d33 Author: Sergey Shelukhin <[email protected]> Authored: Thu Oct 27 19:08:33 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Thu Oct 27 19:08:33 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/metastore/ObjectStore.java | 1 - .../apache/hadoop/hive/ql/exec/CopyTask.java | 15 +++-- .../apache/hadoop/hive/ql/exec/Utilities.java | 30 +++++++++ .../hive/ql/io/CombineHiveInputFormat.java | 29 ++++++--- .../hive/ql/parse/ExportSemanticAnalyzer.java | 66 +++++++++++++++++--- .../hive/ql/parse/ImportSemanticAnalyzer.java | 4 +- .../ql/plan/ConditionalResolverMergeFiles.java | 1 + .../apache/hadoop/hive/ql/plan/CopyWork.java | 53 ++++++++++++---- .../org/apache/hadoop/hive/ql/plan/MapWork.java | 10 +++ 9 files changed, 171 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index a1b3a09..8ad7059 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -636,7 +636,6 @@ public class ObjectStore implements RawStore, Configurable { transactionStatus = TXN_STATUS.COMMITED; try { - LOG.error("TODO# grrrrr"); currentTransaction.commit(); } catch (Exception ex) { Throwable candidate = ex; http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java index a8a44bc..9f89ea5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java @@ -53,19 +53,24 @@ public class CopyTask extends Task<CopyWork> implements Serializable { @Override public int execute(DriverContext driverContext) { + Path[] from = work.getFromPaths(), to = work.getToPaths(); + for (int i = 0; i < from.length; ++i) { + int result = copyOnePath(from[i], to[i]); + if (result != 0) return result; + } + return 0; + } + + protected int copyOnePath(Path fromPath, Path toPath) { FileSystem dstFs = null; - Path toPath = null; try { - Path fromPath = work.getFromPath(); - toPath = work.getToPath(); - console.printInfo("Copying data from " + fromPath.toString(), " to " + toPath.toString()); FileSystem srcFs = fromPath.getFileSystem(conf); dstFs = toPath.getFileSystem(conf); - FileStatus[] srcs = matchFilesOrDir(srcFs, fromPath, work.isSourceMm()); + FileStatus[] srcs = matchFilesOrDir(srcFs, fromPath, work.doSkipSourceMmDirs()); if (srcs == null || srcs.length == 0) { if (work.isErrorOnSrcEmpty()) { console.printError("No files matching path: " + fromPath.toString()); http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 6774d4d..8e506aa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -4077,4 +4077,34 @@ public final class Utilities { } } + /** + * @return the complete list of valid MM directories under a table/partition path; null + * if the entire directory is valid (has no uncommitted/temporary files). + */ + public static List<Path> getValidMmDirectoriesFromTableOrPart(Path path, Configuration conf, + ValidWriteIds ids, int lbLevels) throws IOException { + Utilities.LOG14535.info("Looking for valid MM paths under " + path); + // NULL means this directory is entirely valid. + List<Path> result = null; + FileSystem fs = path.getFileSystem(conf); + FileStatus[] children = (lbLevels == 0) ? fs.listStatus(path) + : fs.globStatus(new Path(path, StringUtils.repeat("*" + Path.SEPARATOR, lbLevels) + "*")); + for (int i = 0; i < children.length; ++i) { + FileStatus file = children[i]; + Path childPath = file.getPath(); + Long writeId = ValidWriteIds.extractWriteId(childPath); + if (!file.isDirectory() || writeId == null || !ids.isValid(writeId)) { + Utilities.LOG14535.info("Skipping path " + childPath); + if (result == null) { + result = new ArrayList<>(children.length - 1); + for (int j = 0; j < i; ++j) { + result.add(children[j].getPath()); + } + } + } else if (result != null) { + result.add(childPath); + } + } + return result; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index cc1de11..15d6b9b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.io; +import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -85,20 +87,22 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ private final int start; private final int length; private final JobConf conf; + private final boolean isMerge; - public CheckNonCombinablePathCallable(Path[] paths, int start, int length, JobConf conf) { + public CheckNonCombinablePathCallable( + Path[] paths, int start, int length, JobConf conf, boolean isMerge) { this.paths = paths; this.start = start; this.length = length; this.conf = conf; + this.isMerge = isMerge; } @Override public Set<Integer> call() throws Exception { Set<Integer> nonCombinablePathIndices = new HashSet<Integer>(); for (int i = 0; i < length; i++) { - PartitionDesc part = - HiveFileFormatUtils.getPartitionDescFromPathRecursively( + PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively( pathToPartitionInfo, paths[i + start], IOPrepareCache.get().allocatePartitionDescMap()); // Use HiveInputFormat if any of the paths is not splittable @@ -107,12 +111,16 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ getInputFormatFromCache(inputFormatClass, conf); boolean isAvoidSplitCombine = inputFormat instanceof AvoidSplitCombination && ((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf); - boolean isMmTable = MetaStoreUtils.isInsertOnlyTable(part.getTableDesc().getProperties()); - if (isAvoidSplitCombine || isMmTable) { - if (LOG.isDebugEnabled()) { - LOG.debug("The path [" + paths[i + start] + + + // Combined splits are not supported for MM tables right now. + // However, the merge for MM always combines one directory and should ignore that it's MM. + boolean isMmTableNonMerge = !isMerge + && MetaStoreUtils.isInsertOnlyTable(part.getTableDesc().getProperties()); + if (isAvoidSplitCombine || isMmTableNonMerge) { + //if (LOG.isDebugEnabled()) { + Utilities.LOG14535.info("The path [" + paths[i + start] + "] is being parked for HiveInputFormat.getSplits"); - } + //} nonCombinablePathIndices.add(i + start); } } @@ -467,11 +475,12 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ ExecutorService executor = Executors.newFixedThreadPool(numThreads); List<Future<Set<Integer>>> futureList = new ArrayList<Future<Set<Integer>>>(numThreads); try { + boolean isMerge = mrwork.isMergeFromResolver(); for (int i = 0; i < numThreads; i++) { int start = i * numPathPerThread; int length = i != numThreads - 1 ? numPathPerThread : paths.length - start; - futureList.add(executor.submit( - new CheckNonCombinablePathCallable(paths, start, length, job))); + futureList.add(executor.submit(new CheckNonCombinablePathCallable( + paths, start, length, job, isMerge))); } Set<Integer> nonCombinablePathIndices = new HashSet<Integer>(); for (Future<Set<Integer>> future : futureList) { http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index 475f2c9..12dea9c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -18,6 +18,16 @@ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.hive.ql.metadata.HiveException; + +import org.apache.hadoop.hive.common.ValidWriteIds; + +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.Utilities; + +import org.apache.hadoop.hive.metastore.MetaStoreUtils; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; @@ -171,29 +181,69 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer { .getMsg("Exception while writing out the local file"), e); } - if (!(replicationSpec.isMetadataOnly() || (ts == null))) { + if (replicationSpec.isMetadataOnly() || (ts == null)) return; + + try { Path parentPath = new Path(toURI); + boolean isMmTable = MetaStoreUtils.isInsertOnlyTable(ts.tableHandle.getParameters()); + Utilities.LOG14535.info("Exporting table " + ts.tableName + " / " + + ts.tableHandle.getTableName() + ": " + isMmTable); + + int lbLevels = isMmTable && ts.tableHandle.isStoredAsSubDirectories() + ? ts.tableHandle.getSkewedColNames().size() : 0; + ValidWriteIds ids = isMmTable ? db.getValidWriteIdsForTable( + ts.tableHandle.getDbName(), ts.tableHandle.getTableName()) : null; if (ts.tableHandle.isPartitioned()) { for (Partition partition : partitions) { Path fromPath = partition.getDataLocation(); Path toPartPath = new Path(parentPath, partition.getName()); - Task<? extends Serializable> rTask = TaskFactory.get( - new CopyWork(fromPath, toPartPath, false), - conf); - rootTasks.add(rTask); + CopyWork cw = createCopyWork(isMmTable, lbLevels, ids, fromPath, toPartPath); + rootTasks.add(TaskFactory.get(cw, conf)); inputs.add(new ReadEntity(partition)); } } else { Path fromPath = ts.tableHandle.getDataLocation(); Path toDataPath = new Path(parentPath, "data"); - Task<? extends Serializable> rTask = TaskFactory.get(new CopyWork( - fromPath, toDataPath, false), conf); - rootTasks.add(rTask); + CopyWork cw = createCopyWork(isMmTable, lbLevels, ids, fromPath, toDataPath); + rootTasks.add(TaskFactory.get(cw, conf)); inputs.add(new ReadEntity(ts.tableHandle)); } outputs.add(toWriteEntity(parentPath)); + } catch (HiveException | IOException ex) { + throw new SemanticException(ex); } + } + private CopyWork createCopyWork(boolean isMmTable, int lbLevels, ValidWriteIds ids, + Path fromPath, Path toDataPath) throws IOException { + List<Path> validPaths = null; + if (isMmTable) { + fromPath = fromPath.getFileSystem(conf).makeQualified(fromPath); + validPaths = Utilities.getValidMmDirectoriesFromTableOrPart(fromPath, conf, ids, lbLevels); + } + if (validPaths == null) { + return new CopyWork(fromPath, toDataPath, false); // Not MM, or no need to skip anything. + } else { + return createCopyWorkForValidPaths(fromPath, toDataPath, validPaths); + } } + private CopyWork createCopyWorkForValidPaths( + Path fromPath, Path toPartPath, List<Path> validPaths) { + Path[] from = new Path[validPaths.size()], to = new Path[validPaths.size()]; + int i = 0; + String fromPathStr = fromPath.toString(); + if (!fromPathStr.endsWith(Path.SEPARATOR)) { + fromPathStr += "/"; + } + for (Path validPath : validPaths) { + from[i] = validPath; + // TODO: assumes the results are already qualified. + to[i] = new Path(toPartPath, validPath.toString().substring(fromPathStr.length())); + Utilities.LOG14535.info("Will copy " + from[i] + " to " + to[i] + + " based on dest " + toPartPath + ", from " + fromPathStr + ", subpath " + validPath); + ++i; + } + return new CopyWork(from, to); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 87b85c8..8aa076f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -347,7 +347,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { + mmWriteId + " (src " + isSourceMm + ") for " + (table == null ? "a new table" : table.getTableName())); CopyWork cv = new CopyWork(dataPath, destPath, false); - cv.setIsSourceMm(isSourceMm); + cv.setSkipSourceMmDirs(isSourceMm); LoadTableDesc loadTableWork = new LoadTableDesc(destPath, Utilities.getTableDesc(table), new TreeMap<String, String>(), replace, mmWriteId); MoveWork mv = new MoveWork(getInputs(), getOutputs(), loadTableWork, null, false); @@ -411,7 +411,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { + srcLocation + "; target: " + tgtLocation + "; copy dest " + destPath + "; mm " + mmWriteId + " (src " + isSourceMm + ") for " + partSpecToString(partSpec.getPartSpec())); CopyWork cw = new CopyWork(new Path(srcLocation), destPath, false); - cw.setIsSourceMm(isSourceMm); + cw.setSkipSourceMmDirs(isSourceMm); DDLWork dw = new DDLWork(getInputs(), getOutputs(), addPartitionDesc); LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table), partSpec.getPartSpec(), true, mmWriteId); http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java index 4635f18..7fcb1ff 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java @@ -330,6 +330,7 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, mWork.setMinSplitSize(targetSize); mWork.setMinSplitSizePerNode(targetSize); mWork.setMinSplitSizePerRack(targetSize); + mWork.setIsMergeFromResolver(true); } private static class AverageSize { http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java index 2e484ba..c08911f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java @@ -30,10 +30,10 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level; @Explain(displayName = "Copy", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public class CopyWork implements Serializable { private static final long serialVersionUID = 1L; - private Path fromPath; - private Path toPath; + private Path[] fromPath; + private Path[] toPath; private boolean errorOnSrcEmpty; - private boolean isMm = false; + private boolean isSkipMmDirs = false; public CopyWork() { } @@ -43,18 +43,45 @@ public class CopyWork implements Serializable { } public CopyWork(final Path fromPath, final Path toPath, boolean errorOnSrcEmpty) { + this(new Path[] { fromPath }, new Path[] { toPath }); + this.setErrorOnSrcEmpty(errorOnSrcEmpty); + } + + public CopyWork(final Path[] fromPath, final Path[] toPath) { + if (fromPath.length != toPath.length) { + throw new RuntimeException( + "Cannot copy " + fromPath.length + " paths into " + toPath.length + " paths"); + } this.fromPath = fromPath; this.toPath = toPath; - this.setErrorOnSrcEmpty(errorOnSrcEmpty); } - + + // Keep backward compat in explain for single-file copy tasks. @Explain(displayName = "source", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) - public Path getFromPath() { - return fromPath; + public Path getFromPathExplain() { + return (fromPath == null || fromPath.length > 1) ? null : fromPath[0]; } @Explain(displayName = "destination", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) - public Path getToPath() { + public Path getToPathExplain() { + return (toPath == null || toPath.length > 1) ? null : toPath[0]; + } + + @Explain(displayName = "sources", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public Path[] getFromPathsExplain() { + return (fromPath != null && fromPath.length > 1) ? fromPath : null; + } + + @Explain(displayName = "destinations", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public Path[] getToPathsExplain() { + return (toPath != null && toPath.length > 1) ? toPath : null; + } + + public Path[] getFromPaths() { + return fromPath; + } + + public Path[] getToPaths() { return toPath; } @@ -66,12 +93,14 @@ public class CopyWork implements Serializable { return errorOnSrcEmpty; } - public void setIsSourceMm(boolean isMm) { - this.isMm = isMm; + /** Whether the copy should ignore MM directories in the source, and copy their content to + * destination directly, rather than copying the directories themselves. */ + public void setSkipSourceMmDirs(boolean isMm) { + this.isSkipMmDirs = isMm; } - public boolean isSourceMm() { - return isMm ; + public boolean doSkipSourceMmDirs() { + return isSkipMmDirs ; } } http://git-wip-us.apache.org/repos/asf/hive/blob/2e602596/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 1be4d84..5a81a62 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -148,6 +148,8 @@ public class MapWork extends BaseWork { /** Whether LLAP IO will be used for inputs. */ private String llapIoDesc; + private boolean isMergeFromResolver; + public MapWork() {} public MapWork(String name) { @@ -718,4 +720,12 @@ public class MapWork extends BaseWork { public VectorizedRowBatch getVectorizedRowBatch() { return vectorizedRowBatch; } + + public void setIsMergeFromResolver(boolean b) { + this.isMergeFromResolver = b; + } + + public boolean isMergeFromResolver() { + return this.isMergeFromResolver; + } }
