Author: jssarma
Date: Mon Mar 21 22:01:34 2011
New Revision: 1083984

URL: http://svn.apache.org/viewvc?rev=1083984&view=rev
Log:
HIVE-2051: getInputSummary() to call FileSystem.getContentSummary() in parallel 
(Siying Dong via jssarma)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1083984&r1=1083983&r2=1083984&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Mon 
Mar 21 22:01:34 2011
@@ -59,6 +59,12 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -100,8 +106,8 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.hive.serde.Constants;
@@ -110,8 +116,8 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.mapred.FileOutputFormat;
@@ -402,8 +408,8 @@ public final class Utilities {
       e.setPersistenceDelegate(Operator.ProgressCounter.class, new 
EnumDelegate());
 
       e.writeObject(t);
-    }finally {
-      if(null != e){
+    } finally {
+      if (null != e) {
         e.close();
       }
     }
@@ -632,9 +638,9 @@ public final class Utilities {
   public static TableDesc getTableDesc(String cols, String colTypes) {
     return (new TableDesc(LazySimpleSerDe.class, SequenceFileInputFormat.class,
         HiveSequenceFileOutputFormat.class, Utilities.makeProperties(
-            org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "" + 
Utilities.ctrlaCode,
-            org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS, cols,
-            org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES, 
colTypes)));
+        org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "" + 
Utilities.ctrlaCode,
+        org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS, cols,
+        org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES, colTypes)));
   }
 
   public static PartitionDesc getPartitionDesc(Partition part) throws 
HiveException {
@@ -1142,7 +1148,8 @@ public final class Utilities {
   }
 
   public static void mvFileToFinalPath(String specPath, Configuration hconf,
-      boolean success, Log log, DynamicPartitionCtx dpCtx, FileSinkDesc conf) 
throws IOException, HiveException {
+      boolean success, Log log, DynamicPartitionCtx dpCtx, FileSinkDesc conf) 
throws IOException,
+      HiveException {
 
     FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
     Path tmpPath = Utilities.toTempPath(specPath);
@@ -1158,7 +1165,7 @@ public final class Utilities {
         Utilities.rename(fs, tmpPath, intermediatePath);
         // Step2: remove any tmp file or double-committed output files
         ArrayList<String> emptyBuckets =
-          Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, dpCtx);
+            Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, dpCtx);
         // create empty buckets if necessary
         if (emptyBuckets.size() > 0) {
           createEmptyBuckets(hconf, emptyBuckets, conf);
@@ -1176,13 +1183,18 @@ public final class Utilities {
   /**
    * Check the existence of buckets according to bucket specification. Create 
empty buckets if
    * needed.
-   * @param specPath The final path where the dynamic partitions should be in.
-   * @param conf FileSinkDesc.
-   * @param dpCtx dynamic partition context.
+   *
+   * @param specPath
+   *          The final path where the dynamic partitions should be in.
+   * @param conf
+   *          FileSinkDesc.
+   * @param dpCtx
+   *          dynamic partition context.
    * @throws HiveException
    * @throws IOException
    */
-  private static void createEmptyBuckets(Configuration hconf, 
ArrayList<String> paths, FileSinkDesc conf)
+  private static void createEmptyBuckets(Configuration hconf, 
ArrayList<String> paths,
+      FileSinkDesc conf)
       throws HiveException, IOException {
 
     JobConf jc;
@@ -1209,7 +1221,7 @@ public final class Utilities {
       throw new HiveException(e);
     }
 
-    for (String p: paths) {
+    for (String p : paths) {
       Path path = new Path(p);
       RecordWriter writer = HiveFileFormatUtils.getRecordWriter(
           jc, hiveOutputFormat, outputClass, isCompressed, 
tableInfo.getProperties(), path);
@@ -1503,6 +1515,8 @@ public final class Utilities {
     }
   }
 
+  public static Object getInputSummaryLock = new Object();
+
   /**
    * Calculate the total size of input files.
    *
@@ -1520,9 +1534,13 @@ public final class Utilities {
 
     long[] summary = {0, 0, 0};
 
-    // For each input path, calculate the total size.
-    for (String path : work.getPathToAliases().keySet()) {
-      try {
+    List<String> pathNeedProcess = new ArrayList<String>();
+
+    // Since multiple threads could call this method concurrently, locking
+    // this method will avoid number of threads out of control.
+    synchronized (getInputSummaryLock) {
+      // For each input path, calculate the total size.
+      for (String path : work.getPathToAliases().keySet()) {
         Path p = new Path(path);
 
         if (filter != null && !filter.accept(p)) {
@@ -1531,37 +1549,118 @@ public final class Utilities {
 
         ContentSummary cs = ctx.getCS(path);
         if (cs == null) {
-          JobConf jobConf = new JobConf(ctx.getConf());
-          PartitionDesc partDesc = work.getPathToPartitionInfo().get(
-              p.toString());
-          Class<? extends InputFormat> inputFormatCls = partDesc
-              .getInputFileFormatClass();
-          InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache(
-              inputFormatCls, jobConf);
-          if(inputFormatObj instanceof ContentSummaryInputFormat) {
-            cs = ((ContentSummaryInputFormat) 
inputFormatObj).getContentSummary(p, jobConf);
-          } else {
-            FileSystem fs = p.getFileSystem(ctx.getConf());
-            cs = fs.getContentSummary(p);
+          if (path == null) {
+            continue;
           }
-          ctx.addCS(path, cs);
-          LOG.info("Cache Content Summary for " + path + " length: " + 
cs.getLength() + " file count: "
-              + cs.getFileCount() + " directory count: " + 
cs.getDirectoryCount());
+          pathNeedProcess.add(path);
+        } else {
+          summary[0] += cs.getLength();
+          summary[1] += cs.getFileCount();
+          summary[2] += cs.getDirectoryCount();
         }
+      }
+
+      // Process the case when name node call is needed
+      final Map<String, ContentSummary> resultMap = new 
ConcurrentHashMap<String, ContentSummary>();
+      ArrayList<Future<?>> results = new ArrayList<Future<?>>();
+      ThreadPoolExecutor executor = null;
+      int maxThreads = 
ctx.getConf().getInt("mapred.dfsclient.parallelism.max", 0);
+      if (pathNeedProcess.size() > 1 && maxThreads > 1) {
+        int numExecutors = Math.min(pathNeedProcess.size(), maxThreads);
+        LOG.info("Using " + numExecutors + " threads for getContentSummary");
+        executor = new ThreadPoolExecutor(numExecutors, numExecutors, 60, 
TimeUnit.SECONDS,
+            new LinkedBlockingQueue<Runnable>());
+      }
+
+      //
+      Configuration conf = ctx.getConf();
+      JobConf jobConf = new JobConf(conf);
+      for (String path : pathNeedProcess) {
+        final Path p = new Path(path);
+        final String pathStr = path;
+        // All threads share the same Configuration and JobConf based on the
+        // assumption that they are thread safe if only read operations are
+        // executed. It is not stated in Hadoop's javadoc, the sourcce codes
+        // clearly showed that they made efforts for it and we believe it is
+        // thread safe. Will revisit this piece of codes if we find the 
assumption
+        // is not correct.
+        final Configuration myConf = conf;
+        final JobConf myJobConf = jobConf;
+        final PartitionDesc partDesc = work.getPathToPartitionInfo().get(
+            p.toString());
+        Runnable r = new Runnable() {
+          public void run() {
+            try {
+              ContentSummary resultCs;
+
+              Class<? extends InputFormat> inputFormatCls = partDesc
+                  .getInputFileFormatClass();
+              InputFormat inputFormatObj = 
HiveInputFormat.getInputFormatFromCache(
+                  inputFormatCls, myJobConf);
+              if (inputFormatObj instanceof ContentSummaryInputFormat) {
+                resultCs = ((ContentSummaryInputFormat) 
inputFormatObj).getContentSummary(p,
+                    myJobConf);
+              } else {
+                FileSystem fs = p.getFileSystem(myConf);
+                resultCs = fs.getContentSummary(p);
+              }
+              resultMap.put(pathStr, resultCs);
+            } catch (IOException e) {
+              // We safely ignore this exception for summary data.
+              // We don't update the cache to protect it from polluting other
+              // usages. The worst case is that IOException will always be
+              // retried for another getInputSummary(), which is fine as
+              // IOException is not considered as a common case.
+              LOG.info("Cannot get size of " + pathStr + ". Safely ignored.");
+            }
+          }
+        };
+
+        if (executor == null) {
+          r.run();
+        } else {
+          Future<?> result = executor.submit(r);
+          results.add(result);
+        }
+      }
+
+      if (executor != null) {
+        for (Future<?> result : results) {
+          boolean executorDone = false;
+          do {
+            try {
+              result.get();
+              executorDone = true;
+            } catch (InterruptedException e) {
+              LOG.info("Interrupted when waiting threads: ", e);
+              Thread.currentThread().interrupt();
+            } catch (ExecutionException e) {
+              throw new IOException(e);
+            }
+          } while (!executorDone);
+        }
+        executor.shutdown();
+      }
+
+      for (Map.Entry<String, ContentSummary> entry : resultMap.entrySet()) {
+        ContentSummary cs = entry.getValue();
 
         summary[0] += cs.getLength();
         summary[1] += cs.getFileCount();
         summary[2] += cs.getDirectoryCount();
 
-      } catch (IOException e) {
-        LOG.info("Cannot get size of " + path + ". Safely ignored.");
+        ctx.addCS(entry.getKey(), cs);
+        LOG.info("Cache Content Summary for " + entry.getKey() + " length: " + 
cs.getLength()
+            + " file count: "
+            + cs.getFileCount() + " directory count: " + 
cs.getDirectoryCount());
       }
+
+      return new ContentSummary(summary[0], summary[1], summary[2]);
     }
-    return new ContentSummary(summary[0], summary[1], summary[2]);
   }
 
   public static boolean isEmptyPath(JobConf job, String dirPath, Context ctx)
-    throws Exception {
+      throws Exception {
     ContentSummary cs = ctx.getCS(dirPath);
     if (cs != null) {
       LOG.info("Content Summary " + dirPath + "length: " + cs.getLength() + " 
num files: "
@@ -1712,17 +1811,17 @@ public final class Utilities {
   }
 
   public static String generateTarURI(String baseURI, String filename) {
-    String tmpFileURI = new String(baseURI + Path.SEPARATOR + 
filename+".tar.gz");
+    String tmpFileURI = new String(baseURI + Path.SEPARATOR + filename + 
".tar.gz");
     return tmpFileURI;
   }
 
   public static String generateTarURI(Path baseURI, String filename) {
-    String tmpFileURI = new String(baseURI + Path.SEPARATOR + 
filename+".tar.gz");
+    String tmpFileURI = new String(baseURI + Path.SEPARATOR + filename + 
".tar.gz");
     return tmpFileURI;
   }
 
   public static String generateTarFileName(String name) {
-    String tmpFileURI = new String(name+".tar.gz");
+    String tmpFileURI = new String(name + ".tar.gz");
     return tmpFileURI;
   }
 
@@ -1738,7 +1837,7 @@ public final class Utilities {
   }
 
   public static double showTime(long time) {
-    double result = (double) time / (double)1000;
+    double result = (double) time / (double) 1000;
     return result;
   }
 


Reply via email to