Repository: hive Updated Branches: refs/heads/hive-14535 b60bbc28a -> 0f7f4ed83
HIVE-14920: S3: Optimize SimpleFetchOptimizer::checkThreshold() (Rajesh Balamohan reviewed by Ashutosh Chauhan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f2efa6a2 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f2efa6a2 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f2efa6a2 Branch: refs/heads/hive-14535 Commit: f2efa6a2be52f09e700c931a293c816a446bf619 Parents: 19999da Author: Prasanth Jayachandran <[email protected]> Authored: Fri Oct 21 00:38:36 2016 -0700 Committer: Prasanth Jayachandran <[email protected]> Committed: Fri Oct 21 00:38:36 2016 -0700 ---------------------------------------------------------------------- .../hive/ql/optimizer/SimpleFetchOptimizer.java | 115 +++++++++++++------ 1 file changed, 82 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f2efa6a2/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java index eb0ba7b..0481110 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java @@ -26,7 +26,16 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; - +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.RemoteIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.ContentSummary; @@ -162,13 +171,7 @@ public class SimpleFetchOptimizer extends Transform { return true; } } - long remaining = threshold; - remaining -= data.getInputLength(pctx, remaining); - if (remaining < 0) { - LOG.info("Threshold " + remaining + " exceeded for pseudoMR mode"); - return false; - } - return true; + return data.isDataLengthWitInThreshold(pctx, threshold); } // all we can handle is LimitOperator, FilterOperator SelectOperator and final FS @@ -414,18 +417,16 @@ public class SimpleFetchOptimizer extends Transform { return replaceFSwithLS(fileSink, work.getSerializationNullFormat()); } - private long getInputLength(ParseContext pctx, long remaining) throws Exception { + private boolean isDataLengthWitInThreshold(ParseContext pctx, final long threshold) + throws Exception { if (splitSample != null && splitSample.getTotalLength() != null) { - return splitSample.getTotalLength(); - } - if (splitSample != null) { - return splitSample.getTargetSize(calculateLength(pctx, splitSample.estimateSourceSize(remaining))); + if (LOG.isDebugEnabled()) { + LOG.debug("Threshold " + splitSample.getTotalLength() + " exceeded for pseudoMR mode"); + } + return (threshold - splitSample.getTotalLength()) > 0; } - return calculateLength(pctx, remaining); - } - private long calculateLength(ParseContext pctx, long remaining) throws Exception { - JobConf jobConf = new JobConf(pctx.getConf()); + final JobConf jobConf = new JobConf(pctx.getConf()); Utilities.setColumnNameList(jobConf, scanOp, true); Utilities.setColumnTypeList(jobConf, scanOp, true); HiveStorageHandler handler = table.getStorageHandler(); @@ -434,41 +435,89 @@ public class SimpleFetchOptimizer extends Transform { TableDesc tableDesc = Utilities.getTableDesc(table); PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc); Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf); - return estimator.estimate(jobConf, scanOp, remaining).getTotalLength(); + long len = estimator.estimate(jobConf, scanOp, threshold).getTotalLength(); + if (LOG.isDebugEnabled()) { + LOG.debug("Threshold " + len + " exceeded for pseudoMR mode"); + } + return (threshold - len) > 0; } if (table.isNonNative()) { - return 0; // nothing can be done + return true; // nothing can be done } if (!table.isPartitioned()) { - return getFileLength(jobConf, table.getPath(), table.getInputFormatClass()); + long len = getPathLength(jobConf, table.getPath(), table.getInputFormatClass(), threshold); + if (LOG.isDebugEnabled()) { + LOG.debug("Threshold " + len + " exceeded for pseudoMR mode"); + } + return (threshold - len) > 0; } - long total = 0; - for (Partition partition : partsList.getNotDeniedPartns()) { - Path path = partition.getDataLocation(); - total += getFileLength(jobConf, path, partition.getInputFormatClass()); - if (total > remaining) { - break; + final AtomicLong total = new AtomicLong(0); + //TODO: use common thread pool later? + int threadCount = HiveConf.getIntVar(pctx.getConf(), + HiveConf.ConfVars.HIVE_STATS_GATHER_NUM_THREADS); + final ExecutorService pool = (threadCount > 0) ? + Executors.newFixedThreadPool(threadCount, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("SimpleFetchOptimizer-FileLength-%d").build()) : null; + try { + List<Future> futures = Lists.newLinkedList(); + for (final Partition partition : partsList.getNotDeniedPartns()) { + final Path path = partition.getDataLocation(); + if (pool != null) { + futures.add(pool.submit(new Callable<Long>() { + @Override + public Long call() throws Exception { + long len = getPathLength(jobConf, path, partition.getInputFormatClass(), threshold); + LOG.trace(path + ", length=" + len); + return total.addAndGet(len); + } + })); + } else { + total.addAndGet(getPathLength(jobConf, path, partition.getInputFormatClass(), threshold)); + } + } + if (pool != null) { + pool.shutdown(); + for (Future<Long> future : futures) { + long totalLen = future.get(); + if ((threshold - totalLen) <= 0) { + // early exit, as getting file lengths can be expensive in object stores. + return false; + } + } + } + return (threshold - total.get()) >= 0; + } finally { + LOG.info("Data set size=" + total.get() + ", threshold=" + threshold); + if (pool != null) { + pool.shutdownNow(); } } - return total; } - // from Utilities.getInputSummary() - private long getFileLength(JobConf conf, Path path, Class<? extends InputFormat> clazz) + private long getPathLength(JobConf conf, Path path, + Class<? extends InputFormat> clazz, long threshold) throws IOException { - ContentSummary summary; if (ContentSummaryInputFormat.class.isAssignableFrom(clazz)) { InputFormat input = HiveInputFormat.getInputFormatFromCache(clazz, conf); - summary = ((ContentSummaryInputFormat)input).getContentSummary(path, conf); + return ((ContentSummaryInputFormat)input).getContentSummary(path, conf).getLength(); } else { FileSystem fs = path.getFileSystem(conf); try { - summary = fs.getContentSummary(path); + long length = 0; + RemoteIterator<LocatedFileStatus> results = fs.listFiles(path, true); + // No need to iterate more, when threshold is reached + // (beneficial especially for object stores) + while (length <= threshold && results.hasNext()) { + length += results.next().getLen(); + } + LOG.trace("length=" + length + ", threshold=" + threshold); + return length; } catch (FileNotFoundException e) { return 0; } } - return summary.getLength(); } }
