Repository: hive Updated Branches: refs/heads/hive-14535 36ad3a405 -> b143f5ce3
HIVE-14990 : run all tests for MM tables and fix the issues that are found - issue with FetchOperator (Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b143f5ce Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b143f5ce Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b143f5ce Branch: refs/heads/hive-14535 Commit: b143f5ce3cf9546ff49eb658a18db43ac319583d Parents: 36ad3a4 Author: Sergey Shelukhin <[email protected]> Authored: Wed Oct 26 18:44:30 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Wed Oct 26 18:44:30 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/FetchOperator.java | 31 ++++++++++++++++ .../hadoop/hive/ql/io/HiveInputFormat.java | 39 ++++++++++++-------- 2 files changed, 55 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b143f5ce/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 7375cd4..f89372c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -36,7 +36,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidWriteIds; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader; @@ -76,6 +78,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; /** * FetchTask implementation. @@ -125,6 +128,7 @@ public class FetchOperator implements Serializable { private transient StructObjectInspector outputOI; private transient Object[] row; + private transient Map<String, ValidWriteIds> writeIdMap; public FetchOperator(FetchWork work, JobConf job) throws HiveException { this(work, job, null, null); @@ -369,6 +373,9 @@ public class FetchOperator implements Serializable { Class<? extends InputFormat> formatter = currDesc.getInputFileFormatClass(); Utilities.copyTableJobPropertiesToConf(currDesc.getTableDesc(), job); InputFormat inputFormat = getInputFormatFromCache(formatter, job); + String inputs = processCurrPathForMmWriteIds(inputFormat); + if (inputs == null) return null; + job.set("mapred.input.dir", inputs); InputSplit[] splits = inputFormat.getSplits(job, 1); FetchInputFormatSplit[] inputSplits = new FetchInputFormatSplit[splits.length]; @@ -385,6 +392,30 @@ public class FetchOperator implements Serializable { return null; } + private String processCurrPathForMmWriteIds(InputFormat inputFormat) throws IOException { + if (inputFormat instanceof HiveInputFormat) { + return StringUtils.escapeString(currPath.toString()); // No need to process here. + } + if (writeIdMap == null) { + writeIdMap = new HashMap<String, ValidWriteIds>(); + } + // No need to check for MM table - if it is, the IDs should be in the job config. + ValidWriteIds ids = HiveInputFormat.extractWriteIds(writeIdMap, job, currDesc.getTableName()); + if (ids != null) { + Utilities.LOG14535.info("Observing " + currDesc.getTableName() + ": " + ids); + } + + Path[] dirs = HiveInputFormat.processPathsForMmRead(Lists.newArrayList(currPath), job, ids); + if (dirs == null || dirs.length == 0) { + return null; // No valid inputs. This condition is logged inside the call. + } + StringBuffer str = new StringBuffer(StringUtils.escapeString(dirs[0].toString())); + for(int i = 1; i < dirs.length;i++) { + str.append(",").append(StringUtils.escapeString(dirs[i].toString())); + } + return str.toString(); + } + private FetchInputFormatSplit[] splitSampling(SplitSample splitSample, FetchInputFormatSplit[] splits) { long totalSize = 0; http://git-wip-us.apache.org/repos/asf/hive/blob/b143f5ce/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index a539799..428093c 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -364,19 +364,11 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> pushFilters(conf, tableScan); } - if (writeIds == null) { - FileInputFormat.setInputPaths(conf, dirs.toArray(new Path[dirs.size()])); - } else { - List<Path> finalPaths = new ArrayList<>(dirs.size()); - for (Path dir : dirs) { - processForWriteIds(dir, conf, writeIds, finalPaths); - } - if (finalPaths.isEmpty()) { - LOG.warn("No valid inputs found in " + dirs); - return; - } - FileInputFormat.setInputPaths(conf, finalPaths.toArray(new Path[finalPaths.size()])); + Path[] finalDirs = processPathsForMmRead(dirs, conf, writeIds); + if (finalDirs == null) { + return; // No valid inputs. } + FileInputFormat.setInputPaths(conf, finalDirs); conf.setInputFormat(inputFormat.getClass()); int headerCount = 0; @@ -396,7 +388,24 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> } } - private void processForWriteIds(Path dir, JobConf conf, + public static Path[] processPathsForMmRead(List<Path> dirs, JobConf conf, + ValidWriteIds writeIds) throws IOException { + if (writeIds == null) { + return dirs.toArray(new Path[dirs.size()]); + } else { + List<Path> finalPaths = new ArrayList<>(dirs.size()); + for (Path dir : dirs) { + processForWriteIds(dir, conf, writeIds, finalPaths); + } + if (finalPaths.isEmpty()) { + LOG.warn("No valid inputs found in " + dirs); + return null; + } + return finalPaths.toArray(new Path[finalPaths.size()]); + } + } + + private static void processForWriteIds(Path dir, JobConf conf, ValidWriteIds writeIds, List<Path> finalPaths) throws IOException { FileSystem fs = dir.getFileSystem(conf); Utilities.LOG14535.warn("Checking " + dir + " (root) for inputs"); @@ -413,7 +422,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> } } - private void handleNonMmDirChild(FileStatus file, ValidWriteIds writeIds, + private static void handleNonMmDirChild(FileStatus file, ValidWriteIds writeIds, LinkedList<Path> subdirs, List<Path> finalPaths) { Path path = file.getPath(); Utilities.LOG14535.warn("Checking " + path + " for inputs"); @@ -561,7 +570,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> return result.toArray(new HiveInputSplit[result.size()]); } - private static ValidWriteIds extractWriteIds(Map<String, ValidWriteIds> writeIdMap, + public static ValidWriteIds extractWriteIds(Map<String, ValidWriteIds> writeIdMap, JobConf newjob, String tableName) { if (StringUtils.isBlank(tableName)) return null; ValidWriteIds writeIds = writeIdMap.get(tableName);
