Author: cws
Date: Mon Jul 25 21:46:51 2011
New Revision: 1150928
URL: http://svn.apache.org/viewvc?rev=1150928&view=rev
Log:
HIVE-2299. Optimize Hive query startup time for multiple partitions (Vaibhav
Aggarwal via cws)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1150928&r1=1150927&r2=1150928&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
Mon Jul 25 21:46:51 2011
@@ -24,6 +24,7 @@ import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -31,6 +32,8 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -49,12 +52,15 @@ import org.apache.hadoop.hive.shims.Hado
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.util.StringUtils;
+
/**
* CombineHiveInputFormat is a parameterized InputFormat which looks at the
path
@@ -274,6 +280,9 @@ public class CombineHiveInputFormat<K ex
// combine splits only from same tables and same partitions. Do not
combine splits from multiple
// tables or multiple partitions.
Path[] paths = combine.getInputPathsShim(job);
+
+ List<Path> inpDirs = new ArrayList<Path>();
+ List<Path> inpFiles = new ArrayList<Path>();
Map<CombinePathInputFormat, CombineFilter> poolMap =
new HashMap<CombinePathInputFormat, CombineFilter>();
Set<Path> poolSet = new HashSet<Path>();
@@ -334,16 +343,6 @@ public class CombineHiveInputFormat<K ex
Path filterPath = path;
- // In the case of tablesample, the input paths are pointing to files
rather than directories.
- // We need to get the parent directory as the filtering path so that all
files in the same
- // parent directory will be grouped into one pool but not files from
different parent
- // directories. This guarantees that a split will combine all files in
the same partition
- // but won't cross multiple partitions if the user has asked so.
- if (mrwork.isMapperCannotSpanPartns() &&
- !path.getFileSystem(job).getFileStatus(path).isDir()) { // path is
not directory
- filterPath = path.getParent();
- }
-
// Does a pool exist for this path already
CombineFilter f = null;
List<Operator<? extends Serializable>> opList = null;
@@ -353,14 +352,20 @@ public class CombineHiveInputFormat<K ex
opList = HiveFileFormatUtils.doGetWorksFromPath(
pathToAliases, aliasToWork, filterPath);
f = poolMap.get(new CombinePathInputFormat(opList,
inputFormatClassName));
- }
- else {
- if (poolSet.contains(filterPath)) {
- LOG.info("CombineHiveInputSplit: pool is already created for " +
path +
- "; using filter path " + filterPath);
- done = true;
+ } else {
+ // In the case of tablesample, the input paths are pointing to files
rather than directories.
+ // We need to get the parent directory as the filtering path so that
all files in the same
+ // parent directory will be grouped into one pool but not files from
different parent
+ // directories. This guarantees that a split will combine all files in
the same partition
+ // but won't cross multiple partitions if the user has asked so.
+ if (!path.getFileSystem(job).getFileStatus(path).isDir()) { // path is
not directory
+ filterPath = path.getParent();
+ inpFiles.add(path);
+ poolSet.add(filterPath);
+ } else {
+ inpDirs.add(path);
}
- poolSet.add(filterPath);
+ done = true;
}
if (!done) {
@@ -380,7 +385,23 @@ public class CombineHiveInputFormat<K ex
}
}
- InputSplitShim[] iss = combine.getSplits(job, 1);
+ // Processing directories
+ List<InputSplitShim> iss = new ArrayList<InputSplitShim>();
+ if (!mrwork.isMapperCannotSpanPartns()) {
+ iss = Arrays.asList(combine.getSplits(job, 1));
+ } else {
+ for (Path path : inpDirs) {
+ processPaths(job, combine, iss, path);
+ }
+
+ if (inpFiles.size() > 0) {
+ // Processing files
+ for (Path filterPath : poolSet) {
+ combine.createPool(job, new CombineFilter(filterPath));
+ }
+ processPaths(job, combine, iss, inpFiles.toArray(new Path[0]));
+ }
+ }
if (mrwork.getNameToSplitSample() != null &&
!mrwork.getNameToSplitSample().isEmpty()) {
iss = sampleSplits(iss);
@@ -395,6 +416,13 @@ public class CombineHiveInputFormat<K ex
return result.toArray(new CombineHiveInputSplit[result.size()]);
}
+ private void processPaths(JobConf job, CombineFileInputFormatShim combine,
+ List<InputSplitShim> iss, Path... path) throws IOException {
+ JobConf currJob = new JobConf(job);
+ FileInputFormat.setInputPaths(currJob, path);
+ iss.addAll(Arrays.asList(combine.getSplits(currJob, 1)));
+ }
+
/**
* This function is used to sample inputs for clauses like "TABLESAMPLE(1
PERCENT)"
*
@@ -406,7 +434,7 @@ public class CombineHiveInputFormat<K ex
* @param splits
* @return the sampled splits
*/
- private InputSplitShim[] sampleSplits(InputSplitShim[] splits) {
+ private List<InputSplitShim> sampleSplits(List<InputSplitShim> splits) {
HashMap<String, SplitSample> nameToSamples = mrwork.getNameToSplitSample();
List<InputSplitShim> retLists = new ArrayList<InputSplitShim>();
Map<String, ArrayList<InputSplitShim>> aliasToSplitList = new
HashMap<String, ArrayList<InputSplitShim>>();
@@ -473,8 +501,7 @@ public class CombineHiveInputFormat<K ex
}
- InputSplitShim[] retArray = new InputSplitShim[retLists.size()];
- return retLists.toArray(retArray);
+ return retLists;
}
/**