Repository: spark Updated Branches: refs/heads/master 530fe6832 -> c6610a997
[SPARK-22122][SQL] Use analyzed logical plans to count input rows in TPCDSQueryBenchmark ## What changes were proposed in this pull request? Since the current code ignores WITH clauses to check input relations in TPCDS queries, this leads to inaccurate per-row processing time for benchmark results. For example, in `q2`, this fix could catch all the input relations: `web_sales`, `date_dim`, and `catalog_sales` (the current code catches `date_dim` only). The one-third of the TPCDS queries uses WITH clauses, so I think it is worth fixing this. ## How was this patch tested? Manually checked. Author: Takeshi Yamamuro <yamam...@apache.org> Closes #19344 from maropu/RespectWithInTPCDSBench. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6610a99 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6610a99 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6610a99 Branch: refs/heads/master Commit: c6610a997f69148a1f1bbf69360e8f39e24cb70a Parents: 530fe68 Author: Takeshi Yamamuro <yamam...@apache.org> Authored: Fri Sep 29 21:36:52 2017 -0700 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Fri Sep 29 21:36:52 2017 -0700 ---------------------------------------------------------------------- .../benchmark/TPCDSQueryBenchmark.scala | 32 +++++++------------- 1 file changed, 11 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c6610a99/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala index 99c6df7..69247d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala @@ -20,11 +20,10 @@ package org.apache.spark.sql.execution.benchmark import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.expressions.SubqueryExpression -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.util.Benchmark /** @@ -66,24 +65,15 @@ object TPCDSQueryBenchmark extends Logging { classLoader = Thread.currentThread().getContextClassLoader) // This is an indirect hack to estimate the size of each query's input by traversing the - // logical plan and adding up the sizes of all tables that appear in the plan. Note that this - // currently doesn't take WITH subqueries into account which might lead to fairly inaccurate - // per-row processing time for those cases. + // logical plan and adding up the sizes of all tables that appear in the plan. val queryRelations = scala.collection.mutable.HashSet[String]() - spark.sql(queryString).queryExecution.logical.map { - case UnresolvedRelation(t: TableIdentifier) => - queryRelations.add(t.table) - case lp: LogicalPlan => - lp.expressions.foreach { _ foreach { - case subquery: SubqueryExpression => - subquery.plan.foreach { - case UnresolvedRelation(t: TableIdentifier) => - queryRelations.add(t.table) - case _ => - } - case _ => - } - } + spark.sql(queryString).queryExecution.analyzed.foreach { + case SubqueryAlias(alias, _: LogicalRelation) => + queryRelations.add(alias) + case LogicalRelation(_, _, Some(catalogTable), _) => + queryRelations.add(catalogTable.identifier.table) + case HiveTableRelation(tableMeta, _, _) => + queryRelations.add(tableMeta.identifier.table) case _ => } val numRows = queryRelations.map(tableSizes.getOrElse(_, 0L)).sum --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org