This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 4fc718fb [SPARK-38570][SQL][3.0] Incorrect DynamicPartitionPruning 
caused by Literal
4fc718fb is described below

commit 4fc718fbc7e70784c250ea9315ccfc56cfaa5893
Author: mcdull-zhang <work4d...@163.com>
AuthorDate: Sat Mar 26 12:48:08 2022 +0800

    [SPARK-38570][SQL][3.0] Incorrect DynamicPartitionPruning caused by Literal
    
    This is a backport of #35878  to branch 3.0.
    
    The return value of Literal.references is an empty AttributeSet, so Literal 
is mistaken for a partition column.
    
    For example, the sql in the test case will generate such a physical plan 
when the adaptive is closed:
    ```tex
    *(4) Project [store_id#5281, date_id#5283, state_province#5292]
    +- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, 
BuildRight, false
       :- Union
       :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
       :  :  +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 
1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
       :  :     :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, 
[store_id#5291], [id=#336]
       :  :     +- *(1) ColumnarToRow
       :  :        +- FileScan parquet 
default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: 
[isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: 
CatalogFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
 PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], 
PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], 
ReadSchema: struct<date_id:int>
       :  :              +- SubqueryBroadcast dynamicpruning#5300, 0, 
[store_id#5291], [id=#336]
       :  :                 +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), 
[id=#335]
       :  :                    +- *(1) Project [store_id#5291, 
state_province#5292]
       :  :                       +- *(1) Filter (((isnotnull(country#5293) AND 
(country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND 
isnotnull(store_id#5291))
       :  :                          +- *(1) ColumnarToRow
       :  :                             +- FileScan parquet 
default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: 
true, DataFilters: [isnotnull(country#5293), (country#5293 = US), 
((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, 
Location: InMemoryFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache....,
 PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), 
Or(EqualNullSafe(store_id,4), [...]
       :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
       :     +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 
1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
       :        :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, 
[store_id#5291], [id=#336]
       :        +- *(2) ColumnarToRow
       :           +- FileScan parquet 
default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: 
[isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: 
CatalogFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
 PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], 
PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: 
struct<date_id:int>
       :                 +- ReusedSubquery SubqueryBroadcast 
dynamicpruning#5300, 0, [store_id#5291], [id=#336]
       +- ReusedExchange [store_id#5291, state_province#5292], 
BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [id=#335]
    ```
    after this pr:
    ```tex
    *(4) Project [store_id#5281, date_id#5283, state_province#5292]
    +- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, 
BuildRight, false
       :- Union
       :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
       :  :  +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
       :  :     +- *(1) ColumnarToRow
       :  :        +- FileScan parquet 
default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: 
[isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: 
CatalogFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
 PartitionFilters: [], PushedFilters: [IsNotNull(date_id), 
GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
       :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
       :     +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000))
       :        +- *(2) ColumnarToRow
       :           +- FileScan parquet 
default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: 
[isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: 
CatalogFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
 PartitionFilters: [], PushedFilters: [IsNotNull(date_id), 
LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
       +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
true] as bigint)),false), [id=#326]
          +- *(3) Project [store_id#5291, state_province#5292]
             +- *(3) Filter (((isnotnull(country#5293) AND (country#5293 = US)) 
AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND 
isnotnull(store_id#5291))
                +- *(3) ColumnarToRow
                   +- FileScan parquet 
default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: 
true, DataFilters: [isnotnull(country#5293), (country#5293 = US), 
((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, 
Location: InMemoryFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache....,
 PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), 
Or(EqualNullSafe(store_id,4),EqualNullSafe(store_i [...]
    ```
    
    Execution performance improvement
    
    No
    
    Added unit test
    
    Closes #35967 from mcdull-zhang/spark_38570_3.2.
    
    Authored-by: mcdull-zhang <work4d...@163.com>
    Signed-off-by: Yuming Wang <yumw...@ebay.com>
    (cherry picked from commit 8621914e2052eeab25e6ac4e7d5f48b5570c71f7)
    Signed-off-by: Yuming Wang <yumw...@ebay.com>
---
 .../sql/catalyst/expressions/predicates.scala      |  1 +
 .../spark/sql/DynamicPartitionPruningSuite.scala   | 28 ++++++++++++++++++++++
 2 files changed, 29 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 2cff5eb..c9df8e7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -113,6 +113,7 @@ trait PredicateHelper {
   def findExpressionAndTrackLineageDown(
       exp: Expression,
       plan: LogicalPlan): Option[(Expression, LogicalPlan)] = {
+    if (exp.references.isEmpty) return None
 
     plan match {
       case Project(projectList, child) =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index d67deca..18d3765 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -1332,6 +1332,34 @@ abstract class DynamicPartitionPruningSuiteBase
       }
     }
   }
+
+  test("SPARK-38570: Fix incorrect DynamicPartitionPruning caused by Literal") 
{
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
+      val df = sql(
+        """
+          |SELECT f.store_id,
+          |       f.date_id,
+          |       s.state_province
+          |FROM (SELECT 4 AS store_id,
+          |               date_id,
+          |               product_id
+          |      FROM   fact_sk
+          |      WHERE  date_id >= 1300
+          |      UNION ALL
+          |      SELECT 5 AS store_id,
+          |               date_id,
+          |               product_id
+          |      FROM   fact_stats
+          |      WHERE  date_id <= 1000) f
+          |JOIN dim_store s
+          |ON f.store_id = s.store_id
+          |WHERE s.country = 'US'
+          |""".stripMargin)
+
+      checkPartitionPruningPredicate(df, withSubquery = false, withBroadcast = 
false)
+      checkAnswer(df, Row(4, 1300, "California") :: Row(5, 1000, "Texas") :: 
Nil)
+    }
+  }
 }
 
 class DynamicPartitionPruningSuiteAEOff extends 
DynamicPartitionPruningSuiteBase {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to