Author: cws
Date: Mon Jul 25 21:46:51 2011
New Revision: 1150928

URL: http://svn.apache.org/viewvc?rev=1150928&view=rev
Log:
HIVE-2299. Optimize Hive query startup time for multiple partitions (Vaibhav 
Aggarwal via cws)

Modified:
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1150928&r1=1150927&r2=1150928&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java 
(original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java 
Mon Jul 25 21:46:51 2011
@@ -24,6 +24,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -31,6 +32,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,12 +52,15 @@ import org.apache.hadoop.hive.shims.Hado
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.util.StringUtils;
+
 
 /**
  * CombineHiveInputFormat is a parameterized InputFormat which looks at the 
path
@@ -274,6 +280,9 @@ public class CombineHiveInputFormat<K ex
     // combine splits only from same tables and same partitions. Do not 
combine splits from multiple
     // tables or multiple partitions.
     Path[] paths = combine.getInputPathsShim(job);
+
+    List<Path> inpDirs = new ArrayList<Path>();
+    List<Path> inpFiles = new ArrayList<Path>();
     Map<CombinePathInputFormat, CombineFilter> poolMap =
       new HashMap<CombinePathInputFormat, CombineFilter>();
     Set<Path> poolSet = new HashSet<Path>();
@@ -334,16 +343,6 @@ public class CombineHiveInputFormat<K ex
 
       Path filterPath = path;
 
-      // In the case of tablesample, the input paths are pointing to files 
rather than directories.
-      // We need to get the parent directory as the filtering path so that all 
files in the same
-      // parent directory will be grouped into one pool but not files from 
different parent
-      // directories. This guarantees that a split will combine all files in 
the same partition
-      // but won't cross multiple partitions if the user has asked so.
-      if (mrwork.isMapperCannotSpanPartns() &&
-          !path.getFileSystem(job).getFileStatus(path).isDir()) { // path is 
not directory
-        filterPath = path.getParent();
-      }
-
       // Does a pool exist for this path already
       CombineFilter f = null;
       List<Operator<? extends Serializable>> opList = null;
@@ -353,14 +352,20 @@ public class CombineHiveInputFormat<K ex
         opList = HiveFileFormatUtils.doGetWorksFromPath(
                    pathToAliases, aliasToWork, filterPath);
         f = poolMap.get(new CombinePathInputFormat(opList, 
inputFormatClassName));
-      }
-      else {
-        if (poolSet.contains(filterPath)) {
-          LOG.info("CombineHiveInputSplit: pool is already created for " + 
path +
-                   "; using filter path " + filterPath);
-          done = true;
+      } else {
+        // In the case of tablesample, the input paths are pointing to files 
rather than directories.
+        // We need to get the parent directory as the filtering path so that 
all files in the same
+        // parent directory will be grouped into one pool but not files from 
different parent
+        // directories. This guarantees that a split will combine all files in 
the same partition
+        // but won't cross multiple partitions if the user has asked so.
+        if (!path.getFileSystem(job).getFileStatus(path).isDir()) { // path is 
not directory
+          filterPath = path.getParent();
+          inpFiles.add(path);
+          poolSet.add(filterPath);
+        } else {
+          inpDirs.add(path);
         }
-        poolSet.add(filterPath);
+        done = true;
       }
 
       if (!done) {
@@ -380,7 +385,23 @@ public class CombineHiveInputFormat<K ex
       }
     }
 
-    InputSplitShim[] iss = combine.getSplits(job, 1);
+    // Processing directories
+    List<InputSplitShim> iss = new ArrayList<InputSplitShim>();
+    if (!mrwork.isMapperCannotSpanPartns()) {
+      iss = Arrays.asList(combine.getSplits(job, 1));
+    } else {
+      for (Path path : inpDirs) {
+        processPaths(job, combine, iss, path);
+      }
+
+      if (inpFiles.size() > 0) {
+        // Processing files
+        for (Path filterPath : poolSet) {
+          combine.createPool(job, new CombineFilter(filterPath));
+        }
+        processPaths(job, combine, iss, inpFiles.toArray(new Path[0]));
+      }
+    }
 
     if (mrwork.getNameToSplitSample() != null && 
!mrwork.getNameToSplitSample().isEmpty()) {
       iss = sampleSplits(iss);
@@ -395,6 +416,13 @@ public class CombineHiveInputFormat<K ex
     return result.toArray(new CombineHiveInputSplit[result.size()]);
   }
 
+    private void processPaths(JobConf job, CombineFileInputFormatShim combine,
+      List<InputSplitShim> iss, Path... path) throws IOException {
+    JobConf currJob = new JobConf(job);
+    FileInputFormat.setInputPaths(currJob, path);
+    iss.addAll(Arrays.asList(combine.getSplits(currJob, 1)));
+  }
+
   /**
    * This function is used to sample inputs for clauses like "TABLESAMPLE(1 
PERCENT)"
    *
@@ -406,7 +434,7 @@ public class CombineHiveInputFormat<K ex
    * @param splits
    * @return the sampled splits
    */
-  private InputSplitShim[] sampleSplits(InputSplitShim[] splits) {
+  private List<InputSplitShim> sampleSplits(List<InputSplitShim> splits) {
     HashMap<String, SplitSample> nameToSamples = mrwork.getNameToSplitSample();
     List<InputSplitShim> retLists = new ArrayList<InputSplitShim>();
     Map<String, ArrayList<InputSplitShim>> aliasToSplitList = new 
HashMap<String, ArrayList<InputSplitShim>>();
@@ -473,8 +501,7 @@ public class CombineHiveInputFormat<K ex
 
     }
 
-    InputSplitShim[] retArray = new InputSplitShim[retLists.size()];
-    return retLists.toArray(retArray);
+    return retLists;
   }
 
   /**


Reply via email to