This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e683fe880fb [SPARK-39217][SQL] Makes DPP support the pruning side has Union e683fe880fb is described below commit e683fe880fb5599a0089f79170896971e8887e49 Author: Kun Wan <wan...@apache.org> AuthorDate: Fri Jan 13 21:44:24 2023 +0800 [SPARK-39217][SQL] Makes DPP support the pruning side has Union ### What changes were proposed in this pull request? Makes DPP support the pruning side has `Union`. For example: ```sql SELECT f.store_id, f.date_id, s.state_province FROM (SELECT 4 AS store_id, date_id, product_id FROM fact_sk WHERE date_id >= 1300 UNION ALL SELECT store_id, date_id, product_id FROM fact_stats WHERE date_id <= 1000) f JOIN dim_store s ON f.store_id = s.store_id WHERE s.country IN ('US', 'NL') ``` After this PR: ![image](https://user-images.githubusercontent.com/5399861/169023481-80875e00-249b-48dc-adcb-e218756adbe7.png) ### Why are the changes needed? Improve query performance. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #39460 from wankunde/SPARK-39217. Authored-by: Kun Wan <wan...@apache.org> Signed-off-by: Yuming Wang <yumw...@ebay.com> --- .../sql/catalyst/expressions/predicates.scala | 11 +++++++- .../spark/sql/DynamicPartitionPruningSuite.scala | 30 ++++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 6a58f8d3416..b7d7c5e700e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference import org.apache.spark.sql.catalyst.expressions.Cast._ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LeafNode, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LeafNode, LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.util.TypeUtils import org.apache.spark.sql.internal.SQLConf @@ -124,6 +124,15 @@ trait PredicateHelper extends AliasHelper with Logging { findExpressionAndTrackLineageDown(replaceAlias(exp, aliasMap), a.child) case l: LeafNode if exp.references.subsetOf(l.outputSet) => Some((exp, l)) + case u: Union => + val index = u.output.indexWhere(_.semanticEquals(exp)) + if (index > -1) { + u.children + .flatMap(child => findExpressionAndTrackLineageDown(child.output(index), child)) + .headOption + } else { + None + } case other => other.children.flatMap { child => if (exp.references.subsetOf(child.outputSet)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index ff78af7e636..f33432ddb6f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -1616,6 +1616,36 @@ abstract class DynamicPartitionPruningSuiteBase assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size === 1) } } + + test("SPARK-39217: Makes DPP support the pruning side has Union") { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { + val df = sql( + """ + |SELECT f.store_id, + | f.date_id, + | s.state_province + |FROM (SELECT 4 AS store_id, + | date_id, + | product_id + | FROM fact_sk + | WHERE date_id >= 1300 + | UNION ALL + | SELECT store_id, + | date_id, + | product_id + | FROM fact_stats + | WHERE date_id <= 1000) f + |JOIN dim_store s + |ON f.store_id = s.store_id + |WHERE s.country IN ('US', 'NL') + |""".stripMargin) + + checkPartitionPruningPredicate(df, withSubquery = false, withBroadcast = true) + checkAnswer(df, Row(4, 1300, "California") :: Row(1, 1000, "North-Holland") :: Nil) + // CleanupDynamicPruningFilters should remove DPP in first child of union + assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size === 1) + } + } } abstract class DynamicPartitionPruningDataSourceSuiteBase --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org