This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e683fe880fb [SPARK-39217][SQL] Makes DPP support the pruning side has 
Union
e683fe880fb is described below

commit e683fe880fb5599a0089f79170896971e8887e49
Author: Kun Wan <wan...@apache.org>
AuthorDate: Fri Jan 13 21:44:24 2023 +0800

    [SPARK-39217][SQL] Makes DPP support the pruning side has Union
    
    ### What changes were proposed in this pull request?
    Makes DPP support the pruning side has `Union`. For example:
    
    ```sql
    SELECT f.store_id,
           f.date_id,
           s.state_province
    FROM (SELECT 4 AS store_id,
                   date_id,
                   product_id
          FROM   fact_sk
          WHERE  date_id >= 1300
          UNION ALL
          SELECT   store_id,
                   date_id,
                   product_id
          FROM   fact_stats
          WHERE  date_id <= 1000) f
    JOIN dim_store s
    ON f.store_id = s.store_id
    WHERE s.country IN ('US', 'NL')
    ```
    
    After this PR: 
![image](https://user-images.githubusercontent.com/5399861/169023481-80875e00-249b-48dc-adcb-e218756adbe7.png)
    
    ### Why are the changes needed?
    Improve query performance.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Unit test.
    
    Closes #39460 from wankunde/SPARK-39217.
    
    Authored-by: Kun Wan <wan...@apache.org>
    Signed-off-by: Yuming Wang <yumw...@ebay.com>
---
 .../sql/catalyst/expressions/predicates.scala      | 11 +++++++-
 .../spark/sql/DynamicPartitionPruningSuite.scala   | 30 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 6a58f8d3416..b7d7c5e700e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -27,7 +27,7 @@ import 
org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference
 import org.apache.spark.sql.catalyst.expressions.Cast._
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.expressions.codegen.Block._
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LeafNode, 
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LeafNode, 
LogicalPlan, Project, Union}
 import org.apache.spark.sql.catalyst.trees.TreePattern._
 import org.apache.spark.sql.catalyst.util.TypeUtils
 import org.apache.spark.sql.internal.SQLConf
@@ -124,6 +124,15 @@ trait PredicateHelper extends AliasHelper with Logging {
         findExpressionAndTrackLineageDown(replaceAlias(exp, aliasMap), a.child)
       case l: LeafNode if exp.references.subsetOf(l.outputSet) =>
         Some((exp, l))
+      case u: Union =>
+        val index = u.output.indexWhere(_.semanticEquals(exp))
+        if (index > -1) {
+          u.children
+            .flatMap(child => 
findExpressionAndTrackLineageDown(child.output(index), child))
+            .headOption
+        } else {
+          None
+        }
       case other =>
         other.children.flatMap {
           child => if (exp.references.subsetOf(child.outputSet)) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index ff78af7e636..f33432ddb6f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -1616,6 +1616,36 @@ abstract class DynamicPartitionPruningSuiteBase
       
assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size 
=== 1)
     }
   }
+
+  test("SPARK-39217: Makes DPP support the pruning side has Union") {
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
+      val df = sql(
+        """
+          |SELECT f.store_id,
+          |       f.date_id,
+          |       s.state_province
+          |FROM (SELECT 4 AS store_id,
+          |               date_id,
+          |               product_id
+          |      FROM   fact_sk
+          |      WHERE  date_id >= 1300
+          |      UNION ALL
+          |      SELECT   store_id,
+          |               date_id,
+          |               product_id
+          |      FROM   fact_stats
+          |      WHERE  date_id <= 1000) f
+          |JOIN dim_store s
+          |ON f.store_id = s.store_id
+          |WHERE s.country IN ('US', 'NL')
+          |""".stripMargin)
+
+      checkPartitionPruningPredicate(df, withSubquery = false, withBroadcast = 
true)
+      checkAnswer(df, Row(4, 1300, "California") :: Row(1, 1000, 
"North-Holland") :: Nil)
+      // CleanupDynamicPruningFilters should remove DPP in first child of union
+      
assert(collectDynamicPruningExpressions(df.queryExecution.executedPlan).size 
=== 1)
+    }
+  }
 }
 
 abstract class DynamicPartitionPruningDataSourceSuiteBase


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to