This is an automated email from the ASF dual-hosted git repository.
yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4c51851 [SPARK-38570][SQL] Incorrect DynamicPartitionPruning caused
by Literal
4c51851 is described below
commit 4c51851c4227f22df9385a66280905108d529fba
Author: mcdull-zhang <[email protected]>
AuthorDate: Fri Mar 25 08:47:47 2022 +0800
[SPARK-38570][SQL] Incorrect DynamicPartitionPruning caused by Literal
### What changes were proposed in this pull request?
The return value of Literal.references is an empty AttributeSet, so Literal
is mistaken for a partition column.
For example, the sql in the test case will generate such a physical plan
when the adaptive is closed:
```text
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner,
BuildRight, false
:- Union
: :- *(1) Project [4 AS store_id#5281, date_id#5283]
: : +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >=
1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
: : : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0,
[store_id#5291], [id=#336]
: : +- *(1) ColumnarToRow
: : +- FileScan parquet
default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters:
[isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location:
CatalogFileIndex(1
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)],
PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)],
ReadSchema: struct<date_id:int>
: : +- SubqueryBroadcast dynamicpruning#5300, 0,
[store_id#5291], [id=#336]
: : +- BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false),
[id=#335]
: : +- *(1) Project [store_id#5291,
state_province#5292]
: : +- *(1) Filter (((isnotnull(country#5293) AND
(country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND
isnotnull(store_id#5291))
: : +- *(1) ColumnarToRow
: : +- FileScan parquet
default.dim_store[store_id#5291,state_province#5292,country#5293] Batched:
true, DataFilters: [isnotnull(country#5293), (country#5293 = US),
((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet,
Location: InMemoryFileIndex(1
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache....,
PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US),
Or(EqualNullSafe(store_id,4), [...]
: +- *(2) Project [5 AS store_id#5282, date_id#5287]
: +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <=
1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
: : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0,
[store_id#5291], [id=#336]
: +- *(2) ColumnarToRow
: +- FileScan parquet
default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters:
[isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location:
CatalogFileIndex(1
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)],
PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema:
struct<date_id:int>
: +- ReusedSubquery SubqueryBroadcast
dynamicpruning#5300, 0, [store_id#5291], [id=#336]
+- ReusedExchange [store_id#5291, state_province#5292],
BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as
bigint)),false), [id=#335]
```
after this pr:
```text
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner,
BuildRight, false
:- Union
: :- *(1) Project [4 AS store_id#5281, date_id#5283]
: : +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
: : +- *(1) ColumnarToRow
: : +- FileScan parquet
default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters:
[isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location:
CatalogFileIndex(1
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
PartitionFilters: [], PushedFilters: [IsNotNull(date_id),
GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
: +- *(2) Project [5 AS store_id#5282, date_id#5287]
: +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000))
: +- *(2) ColumnarToRow
: +- FileScan parquet
default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters:
[isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location:
CatalogFileIndex(1
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
PartitionFilters: [], PushedFilters: [IsNotNull(date_id),
LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int,
true] as bigint)),false), [id=#326]
+- *(3) Project [store_id#5291, state_province#5292]
+- *(3) Filter (((isnotnull(country#5293) AND (country#5293 = US))
AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND
isnotnull(store_id#5291))
+- *(3) ColumnarToRow
+- FileScan parquet
default.dim_store[store_id#5291,state_province#5292,country#5293] Batched:
true, DataFilters: [isnotnull(country#5293), (country#5293 = US),
((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet,
Location: InMemoryFileIndex(1
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache....,
PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US),
Or(EqualNullSafe(store_id,4),EqualNullSafe(store_i [...]
```
### Why are the changes needed?
Execution performance improvement
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit test
Closes #35878 from mcdull-zhang/literal_dynamic_partition.
Lead-authored-by: mcdull-zhang <[email protected]>
Co-authored-by: mcdull_zhang <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
---
.../sql/catalyst/expressions/predicates.scala | 1 +
.../spark/sql/DynamicPartitionPruningSuite.scala | 28 ++++++++++++++++++++++
2 files changed, 29 insertions(+)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index d16e09c..949ce97 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -128,6 +128,7 @@ trait PredicateHelper extends AliasHelper with Logging {
def findExpressionAndTrackLineageDown(
exp: Expression,
plan: LogicalPlan): Option[(Expression, LogicalPlan)] = {
+ if (exp.references.isEmpty) return None
plan match {
case p: Project =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index f74e047..cfdd2e0 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -1528,6 +1528,34 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
}
+
+ test("SPARK-38570: Fix incorrect DynamicPartitionPruning caused by Literal")
{
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
+ val df = sql(
+ """
+ |SELECT f.store_id,
+ | f.date_id,
+ | s.state_province
+ |FROM (SELECT 4 AS store_id,
+ | date_id,
+ | product_id
+ | FROM fact_sk
+ | WHERE date_id >= 1300
+ | UNION ALL
+ | SELECT 5 AS store_id,
+ | date_id,
+ | product_id
+ | FROM fact_stats
+ | WHERE date_id <= 1000) f
+ |JOIN dim_store s
+ |ON f.store_id = s.store_id
+ |WHERE s.country = 'US'
+ |""".stripMargin)
+
+ checkPartitionPruningPredicate(df, withSubquery = false, withBroadcast =
false)
+ checkAnswer(df, Row(4, 1300, "California") :: Row(5, 1000, "Texas") ::
Nil)
+ }
+ }
}
abstract class DynamicPartitionPruningDataSourceSuiteBase
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]