Repository: hive Updated Branches: refs/heads/master a977c3680 -> d7a43c7a0
HIVE-15065: SimpleFetchOptimizer should decide based on metastore stats when available (Prasanth Jayachandran reviewed by Ashutosh Chauhan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d7a43c7a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d7a43c7a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d7a43c7a Branch: refs/heads/master Commit: d7a43c7a0f85d0529430df6e4190ad58072d4bbe Parents: a977c36 Author: Prasanth Jayachandran <[email protected]> Authored: Thu Oct 27 14:22:48 2016 -0700 Committer: Prasanth Jayachandran <[email protected]> Committed: Thu Oct 27 14:22:48 2016 -0700 ---------------------------------------------------------------------- .../test/resources/testconfiguration.properties | 1 + .../hive/ql/optimizer/SimpleFetchOptimizer.java | 169 +++++++++++------- .../clientpositive/stats_based_fetch_decision.q | 15 ++ .../llap/stats_based_fetch_decision.q.out | 176 +++++++++++++++++++ 4 files changed, 300 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d7a43c7a/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 4e91452..f30152b 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -558,6 +558,7 @@ minillaplocal.query.files=acid_globallimit.q,\ semijoin.q,\ smb_cache.q,\ special_character_in_tabnames_1.q,\ + stats_based_fetch_decision.q,\ subquery_notin.q,\ table_access_keys_stats.q,\ tez_bmj_schema_evolution.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/d7a43c7a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java index 0481110..a68ceb4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java @@ -36,9 +36,10 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -55,7 +56,6 @@ import org.apache.hadoop.hive.ql.exec.ScriptOperator; import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.TaskFactory; -import org.apache.hadoop.hive.ql.exec.UDTFOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat; @@ -171,7 +171,7 @@ public class SimpleFetchOptimizer extends Transform { return true; } } - return data.isDataLengthWitInThreshold(pctx, threshold); + return data.isDataLengthWithInThreshold(pctx, threshold); } // all we can handle is LimitOperator, FilterOperator SelectOperator and final FS @@ -321,6 +321,12 @@ public class SimpleFetchOptimizer extends Transform { return true; } + enum Status { + PASS, + FAIL, + UNAVAILABLE + } + private class FetchData { // source table scan @@ -417,7 +423,7 @@ public class SimpleFetchOptimizer extends Transform { return replaceFSwithLS(fileSink, work.getSerializationNullFormat()); } - private boolean isDataLengthWitInThreshold(ParseContext pctx, final long threshold) + private boolean isDataLengthWithInThreshold(ParseContext pctx, final long threshold) throws Exception { if (splitSample != null && splitSample.getTotalLength() != null) { if (LOG.isDebugEnabled()) { @@ -426,74 +432,115 @@ public class SimpleFetchOptimizer extends Transform { return (threshold - splitSample.getTotalLength()) > 0; } - final JobConf jobConf = new JobConf(pctx.getConf()); - Utilities.setColumnNameList(jobConf, scanOp, true); - Utilities.setColumnTypeList(jobConf, scanOp, true); - HiveStorageHandler handler = table.getStorageHandler(); - if (handler instanceof InputEstimator) { - InputEstimator estimator = (InputEstimator) handler; - TableDesc tableDesc = Utilities.getTableDesc(table); - PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc); - Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf); - long len = estimator.estimate(jobConf, scanOp, threshold).getTotalLength(); - if (LOG.isDebugEnabled()) { - LOG.debug("Threshold " + len + " exceeded for pseudoMR mode"); + Status status = checkThresholdWithMetastoreStats(table, partsList, threshold); + if (status.equals(Status.PASS)) { + return true; + } else if (status.equals(Status.FAIL)) { + return false; + } else { + LOG.info("Cannot fetch stats from metastore for table: {}. Falling back to filesystem scan..", + table.getCompleteName()); + // metastore stats is unavailable, fallback to old way + final JobConf jobConf = new JobConf(pctx.getConf()); + Utilities.setColumnNameList(jobConf, scanOp, true); + Utilities.setColumnTypeList(jobConf, scanOp, true); + HiveStorageHandler handler = table.getStorageHandler(); + if (handler instanceof InputEstimator) { + InputEstimator estimator = (InputEstimator) handler; + TableDesc tableDesc = Utilities.getTableDesc(table); + PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc); + Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf); + long len = estimator.estimate(jobConf, scanOp, threshold).getTotalLength(); + if (LOG.isDebugEnabled()) { + LOG.debug("Threshold " + len + " exceeded for pseudoMR mode"); + } + return (threshold - len) > 0; } - return (threshold - len) > 0; - } - if (table.isNonNative()) { - return true; // nothing can be done - } - if (!table.isPartitioned()) { - long len = getPathLength(jobConf, table.getPath(), table.getInputFormatClass(), threshold); - if (LOG.isDebugEnabled()) { - LOG.debug("Threshold " + len + " exceeded for pseudoMR mode"); + if (table.isNonNative()) { + return true; // nothing can be done } - return (threshold - len) > 0; - } - final AtomicLong total = new AtomicLong(0); - //TODO: use common thread pool later? - int threadCount = HiveConf.getIntVar(pctx.getConf(), + if (!table.isPartitioned()) { + long len = getPathLength(jobConf, table.getPath(), table.getInputFormatClass(), threshold); + if (LOG.isDebugEnabled()) { + LOG.debug("Threshold " + len + " exceeded for pseudoMR mode"); + } + return (threshold - len) > 0; + } + final AtomicLong total = new AtomicLong(0); + //TODO: use common thread pool later? + int threadCount = HiveConf.getIntVar(pctx.getConf(), HiveConf.ConfVars.HIVE_STATS_GATHER_NUM_THREADS); - final ExecutorService pool = (threadCount > 0) ? + final ExecutorService pool = (threadCount > 0) ? Executors.newFixedThreadPool(threadCount, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("SimpleFetchOptimizer-FileLength-%d").build()) : null; - try { - List<Future> futures = Lists.newLinkedList(); - for (final Partition partition : partsList.getNotDeniedPartns()) { - final Path path = partition.getDataLocation(); + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("SimpleFetchOptimizer-FileLength-%d").build()) : null; + try { + List<Future> futures = Lists.newLinkedList(); + for (final Partition partition : partsList.getNotDeniedPartns()) { + final Path path = partition.getDataLocation(); + if (pool != null) { + futures.add(pool.submit(new Callable<Long>() { + @Override + public Long call() throws Exception { + long len = getPathLength(jobConf, path, partition.getInputFormatClass(), threshold); + LOG.trace(path + ", length=" + len); + return total.addAndGet(len); + } + })); + } else { + total.addAndGet(getPathLength(jobConf, path, partition.getInputFormatClass(), threshold)); + } + } if (pool != null) { - futures.add(pool.submit(new Callable<Long>() { - @Override - public Long call() throws Exception { - long len = getPathLength(jobConf, path, partition.getInputFormatClass(), threshold); - LOG.trace(path + ", length=" + len); - return total.addAndGet(len); + pool.shutdown(); + for (Future<Long> future : futures) { + long totalLen = future.get(); + if ((threshold - totalLen) <= 0) { + // early exit, as getting file lengths can be expensive in object stores. + return false; } - })); - } else { - total.addAndGet(getPathLength(jobConf, path, partition.getInputFormatClass(), threshold)); - } - } - if (pool != null) { - pool.shutdown(); - for (Future<Long> future : futures) { - long totalLen = future.get(); - if ((threshold - totalLen) <= 0) { - // early exit, as getting file lengths can be expensive in object stores. - return false; } } + return (threshold - total.get()) >= 0; + } finally { + LOG.info("Data set size=" + total.get() + ", threshold=" + threshold); + if (pool != null) { + pool.shutdownNow(); + } } - return (threshold - total.get()) >= 0; - } finally { - LOG.info("Data set size=" + total.get() + ", threshold=" + threshold); - if (pool != null) { - pool.shutdownNow(); + } + } + + // This method gets the basic stats from metastore for table/partitions. This will make use of the statistics from + // AnnotateWithStatistics optimizer when available. If execution engine is tez or spark, AnnotateWithStatistics + // optimization is applied only during physical compilation because of DPP changing the stats. In such case, we + // we will get the basic stats from metastore. When statistics is absent in metastore we will use the fallback of + // scanning the filesystem to get file lengths. + private Status checkThresholdWithMetastoreStats(final Table table, final PrunedPartitionList partsList, + final long threshold) { + if (table != null && !table.isPartitioned()) { + long dataSize = StatsUtils.getTotalSize(table); + if (dataSize <= 0) { + LOG.warn("Cannot determine basic stats for table: {} from metastore. Falling back.", table.getCompleteName()); + return Status.UNAVAILABLE; } + + return (threshold - dataSize) >= 0 ? Status.PASS : Status.FAIL; + } else if (table != null && table.isPartitioned() && partsList != null) { + List<Long> dataSizes = StatsUtils.getBasicStatForPartitions(table, partsList.getNotDeniedPartns(), + StatsSetupConst.TOTAL_SIZE); + long totalDataSize = StatsUtils.getSumIgnoreNegatives(dataSizes); + if (totalDataSize <= 0) { + LOG.warn("Cannot determine basic stats for partitioned table: {} from metastore. Falling back.", + table.getCompleteName()); + return Status.UNAVAILABLE; + } + + return (threshold - totalDataSize) >= 0 ? Status.PASS : Status.FAIL; } + + return Status.UNAVAILABLE; } private long getPathLength(JobConf conf, Path path, http://git-wip-us.apache.org/repos/asf/hive/blob/d7a43c7a/ql/src/test/queries/clientpositive/stats_based_fetch_decision.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/stats_based_fetch_decision.q b/ql/src/test/queries/clientpositive/stats_based_fetch_decision.q new file mode 100644 index 0000000..c66cafc --- /dev/null +++ b/ql/src/test/queries/clientpositive/stats_based_fetch_decision.q @@ -0,0 +1,15 @@ +SET hive.fetch.task.conversion=more; +SET hive.explain.user=false; + +-- will not print tez counters as tasks will not be launched +select * from src where key is null; +select * from srcpart where key is null; +explain select * from src where key is null; +explain select * from srcpart where key is null; + +SET hive.fetch.task.conversion.threshold=1000; +-- will print tez counters as tasks will be launched +select * from src where key is null; +select * from srcpart where key is null; +explain select * from src where key is null; +explain select * from srcpart where key is null; http://git-wip-us.apache.org/repos/asf/hive/blob/d7a43c7a/ql/src/test/results/clientpositive/llap/stats_based_fetch_decision.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/stats_based_fetch_decision.q.out b/ql/src/test/results/clientpositive/llap/stats_based_fetch_decision.q.out new file mode 100644 index 0000000..f61483b --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/stats_based_fetch_decision.q.out @@ -0,0 +1,176 @@ +PREHOOK: query: -- will not print tez counters as tasks will not be launched +select * from src where key is null +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: -- will not print tez counters as tasks will not be launched +select * from src where key is null +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +PREHOOK: query: select * from srcpart where key is null +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +#### A masked pattern was here #### +POSTHOOK: query: select * from srcpart where key is null +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +#### A masked pattern was here #### +PREHOOK: query: explain select * from src where key is null +PREHOOK: type: QUERY +POSTHOOK: query: explain select * from src where key is null +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: src + Filter Operator + predicate: key is null (type: boolean) + Select Operator + expressions: null (type: string), value (type: string) + outputColumnNames: _col0, _col1 + ListSink + +PREHOOK: query: explain select * from srcpart where key is null +PREHOOK: type: QUERY +POSTHOOK: query: explain select * from srcpart where key is null +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: srcpart + Filter Operator + predicate: key is null (type: boolean) + Select Operator + expressions: null (type: string), value (type: string), ds (type: string), hr (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + ListSink + +PREHOOK: query: -- will print tez counters as tasks will be launched +select * from src where key is null +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: -- will print tez counters as tasks will be launched +select * from src where key is null +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +PREHOOK: query: select * from srcpart where key is null +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +#### A masked pattern was here #### +POSTHOOK: query: select * from srcpart where key is null +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +#### A masked pattern was here #### +PREHOOK: query: explain select * from src where key is null +PREHOOK: type: QUERY +POSTHOOK: query: explain select * from src where key is null +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is null (type: boolean) + Statistics: Num rows: 1 Data size: 178 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: null (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 175 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 175 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Execution mode: llap + LLAP IO: no inputs + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: explain select * from srcpart where key is null +PREHOOK: type: QUERY +POSTHOOK: query: explain select * from srcpart where key is null +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: srcpart + Statistics: Num rows: 2000 Data size: 1092000 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is null (type: boolean) + Statistics: Num rows: 1 Data size: 546 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: null (type: string), value (type: string), ds (type: string), hr (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1 Data size: 543 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 543 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Execution mode: llap + LLAP IO: no inputs + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink +
