[ 
https://issues.apache.org/jira/browse/SPARK-48486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860381#comment-17860381
 ] 

chenfengbin edited comment on SPARK-48486 at 6/27/24 8:24 AM:
--------------------------------------------------------------

SELECT *
FROM fact_sk a
LEFT JOIN fact_sk_1 b on b.store_id = a.store_id
LEFT JOIN fact_sk_2 c on c.store_id = a.store_id
WHERE a.store_id in ('4','5')
By analyzing the generated Optimized Logical Plan.

== Optimized Logical Plan ==
Join LeftOuter, (store_id#5490 = store_id#5482)
:- Join LeftOuter, (store_id#5486 = store_id#5482)
:  :- Filter cast(store_id#5482 as string) IN (4,5)
:  :  +- Relation 
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482|#5479,product_id#5480,units_sold#5481,store_id#5482]
 parquet
:  +- Filter ((cast(store_id#5486 as string) IN (4,5) AND 
isnotnull(store_id#5486)) AND dynamicpruning#5506 [store_id#5486|#5486])
:     :  +- Filter cast(store_id#5482 as string) IN (4,5)
:     :     +- Relation 
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482|#5479,product_id#5480,units_sold#5481,store_id#5482]
 parquet
:     +- Relation 
default.fact_sk_1[date_id#5483,product_id#5484,units_sold#5485,store_id#5486|#5483,product_id#5484,units_sold#5485,store_id#5486]
 parquet
+- Filter ((cast(store_id#5490 as string) IN (4,5) AND 
isnotnull(store_id#5490)) AND dynamicpruning#5507 [store_id#5490|#5490])
   :  +- Join LeftOuter, (store_id#5486 = store_id#5482)
   :     :- Filter cast(store_id#5482 as string) IN (4,5)
   :     :  +- Relation 
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482|#5479,product_id#5480,units_sold#5481,store_id#5482]
 parquet
   :     +- Filter dynamicpruning#5506 [store_id#5486|#5486]
   :        :  +- Filter cast(store_id#5482 as string) IN (4,5)
   :        :     +- Relation 
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482|#5479,product_id#5480,units_sold#5481,store_id#5482]
 parquet
   :        +- Filter (cast(store_id#5486 as string) IN (4,5) AND 
isnotnull(store_id#5486))
   :           +- Relation 
default.fact_sk_1[date_id#5483,product_id#5484,units_sold#5485,store_id#5486|#5483,product_id#5484,units_sold#5485,store_id#5486]
 parquet
   +- Relation 
default.fact_sk_2[date_id#5487,product_id#5488,units_sold#5489,store_id#5490|#5487,product_id#5488,units_sold#5489,store_id#5490]
 parquet

 

In the previous join, during the DPP transformation, the pushdown is converted 
into a subquery, forming the following:

:- Join LeftOuter, (store_id#5486 = store_id#5482) 
: :- Filter cast(store_id#5482 as string) IN (4,5) 
: : +- Relation 
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482|#5479,product_id#5480,units_sold#5481,store_id#5482]
 parquet 
: +- Filter ((cast(store_id#5486 as string) IN (4,5) AND 
isnotnull(store_id#5486)) AND dynamicpruning#5506 [store_id#5486|#5486]) 
: : +- Filter cast(store_id#5482 as string) IN (4,5) 
: : +- Relation 
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482|#5479,product_id#5480,units_sold#5481,store_id#5482]
 parquet 
: +- Relation 
default.fact_sk_1[date_id#5483,product_id#5484,units_sold#5485,store_id#5486|#5483,product_id#5484,units_sold#5485,store_id#5486]
 parquet

This is then treated as a whole, and then passed down. On this basis, a 
subquery is added.

+- Filter (cast(store_id#5490 as string) IN (4,5) AND isnotnull(store_id#5490)) 
+- Relation 
default.fact_sk_2[date_id#5487,product_id#5488,units_sold#5489,store_id#5490|#5487,product_id#5488,units_sold#5489,store_id#5490]
 parquet The above will be converted to:

+- Filter ((cast(store_id#5490 as string) IN (4,5) AND 
isnotnull(store_id#5490)) AND dynamicpruning#5507 [store_id#5490|#5490]) 
: +- Join LeftOuter, (store_id#5486 = store_id#5482) 
: :- Filter cast(store_id#5482 as string) IN (4,5) 
: : +- Relation 
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482|#5479,product_id#5480,units_sold#5481,store_id#5482]
 parquet 
: +- Filter dynamicpruning#5506 [store_id#5486|#5486] 
: : +- Filter cast(store_id#5482 as string) IN (4,5) : : +- Relation 
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482|#5479,product_id#5480,units_sold#5481,store_id#5482]
 parquet 
: +- Filter (cast(store_id#5486 as string) IN (4,5) AND 
isnotnull(store_id#5486)) 
: +- Relation 
default.fact_sk_1[date_id#5483,product_id#5484,units_sold#5485,store_id#5486|#5483,product_id#5484,units_sold#5485,store_id#5486]
 parquet 
+- Relation 
default.fact_sk_2[date_id#5487,product_id#5488,units_sold#5489,store_id#5490|#5487,product_id#5488,units_sold#5489,store_id#5490]
 parquet

In this case, when encountering multiple nested subquery pushdowns, 
accumulation occurs. If it's 5 times, it's 2 to the power of 5; if it's 10 
times, it's 2 to the power of 10. Here, a strategy needs to be added to limit 
the nesting within a certain range.

If it is changed to traverse by transformDown, it means giving up nesting. When 
each DPP is processed, only the last layer of subquery will be added.


was (Author: JIRAUSER305634):
SELECT *
FROM fact_sk a
LEFT JOIN fact_sk_1 b on b.store_id = a.store_id
LEFT JOIN fact_sk_2 c on c.store_id = a.store_id
WHERE a.store_id in ('4','5')
By analyzing the generated Optimized Logical Plan.

== Optimized Logical Plan ==
Join LeftOuter, (store_id#5490 = store_id#5482)
:- Join LeftOuter, (store_id#5486 = store_id#5482)
:  :- Filter cast(store_id#5482 as string) IN (4,5)
:  :  +- Relation 
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482] 
parquet
:  +- Filter ((cast(store_id#5486 as string) IN (4,5) AND 
isnotnull(store_id#5486)) AND dynamicpruning#5506 [store_id#5486])
:     :  +- Filter cast(store_id#5482 as string) IN (4,5)
:     :     +- Relation 
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482] 
parquet
:     +- Relation 
default.fact_sk_1[date_id#5483,product_id#5484,units_sold#5485,store_id#5486] 
parquet
+- Filter ((cast(store_id#5490 as string) IN (4,5) AND 
isnotnull(store_id#5490)) AND dynamicpruning#5507 [store_id#5490])
   :  +- Join LeftOuter, (store_id#5486 = store_id#5482)
   :     :- Filter cast(store_id#5482 as string) IN (4,5)
   :     :  +- Relation 
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482] 
parquet
   :     +- Filter dynamicpruning#5506 [store_id#5486]
   :        :  +- Filter cast(store_id#5482 as string) IN (4,5)
   :        :     +- Relation 
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482] 
parquet
   :        +- Filter (cast(store_id#5486 as string) IN (4,5) AND 
isnotnull(store_id#5486))
   :           +- Relation 
default.fact_sk_1[date_id#5483,product_id#5484,units_sold#5485,store_id#5486] 
parquet
   +- Relation 
default.fact_sk_2[date_id#5487,product_id#5488,units_sold#5489,store_id#5490] 
parquet

 

In the previous join, during the DPP transformation, the pushdown is converted 
into a subquery, forming the following:

:- Join LeftOuter, (store_id#5486 = store_id#5482) : :- Filter 
cast(store_id#5482 as string) IN (4,5) : : +- Relation 
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482] 
parquet : +- Filter ((cast(store_id#5486 as string) IN (4,5) AND 
isnotnull(store_id#5486)) AND dynamicpruning#5506 [store_id#5486]) : : +- 
Filter cast(store_id#5482 as string) IN (4,5) : : +- Relation 
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482] 
parquet : +- Relation 
default.fact_sk_1[date_id#5483,product_id#5484,units_sold#5485,store_id#5486] 
parquet

This is then treated as a whole, and then passed down. On this basis, a 
subquery is added.

+- Filter (cast(store_id#5490 as string) IN (4,5) AND isnotnull(store_id#5490)) 
+- Relation 
default.fact_sk_2[date_id#5487,product_id#5488,units_sold#5489,store_id#5490] 
parquet The above will be converted to:

+- Filter ((cast(store_id#5490 as string) IN (4,5) AND 
isnotnull(store_id#5490)) AND dynamicpruning#5507 [store_id#5490]) : +- Join 
LeftOuter, (store_id#5486 = store_id#5482) : :- Filter cast(store_id#5482 as 
string) IN (4,5) : : +- Relation 
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482] 
parquet : +- Filter dynamicpruning#5506 [store_id#5486] : : +- Filter 
cast(store_id#5482 as string) IN (4,5) : : +- Relation 
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482] 
parquet : +- Filter (cast(store_id#5486 as string) IN (4,5) AND 
isnotnull(store_id#5486)) : +- Relation 
default.fact_sk_1[date_id#5483,product_id#5484,units_sold#5485,store_id#5486] 
parquet +- Relation 
default.fact_sk_2[date_id#5487,product_id#5488,units_sold#5489,store_id#5490] 
parquet

In this case, when encountering multiple nested subquery pushdowns, 
accumulation occurs. If it's 5 times, it's 2 to the power of 5; if it's 10 
times, it's 2 to the power of 10. Here, a strategy needs to be added to limit 
the nesting within a certain range.

If it is changed to traverse by transformDown, it means giving up nesting. When 
each DPP is processed, only the last layer of subquery will be added.

> The Dynamic Partition Pruning (DPP) feature in Spark can cause the generated 
> Directed Acyclic Graph (DAG) to expand.
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-48486
>                 URL: https://issues.apache.org/jira/browse/SPARK-48486
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.3.0
>            Reporter: chenfengbin
>            Priority: Major
>         Attachments: create_table.sql
>
>
> The Dynamic Partition Pruning (DPP) feature in Spark can cause the generated 
> Directed Acyclic Graph (DAG) to expand.
> for example: The partition field of table A or B is part_dt.  
> select a.part_dt,{*}b.part_dt{*}
> from a left join b on b.part_dt = a.part_dt 
> where a.part_dt in ('2024-05-30','2024-05-31')
> During the generation of Dynamic Partition Pruning (DPP), the tree will be 
> traversed recursively more times. 
> 24/05/31 16:14:14 INFO DataSourceStrategy: Pruning directories with: 
> part_dt#219 IN (2024-05-30,2024-05-31)
> 24/05/31 16:14:14 INFO DataSourceStrategy: Pruning directories with: 
> part_dt#223 IN 
> (2024-05-30,2024-05-31),isnotnull(part_dt#223),dynamicpruning#234 
> [part_dt#223|#223]
> 24/05/31 16:14:14 INFO DataSourceStrategy: Pruning directories with: 
> part_dt#219 IN (2024-05-30,2024-05-31)
> The last one is extra:24/05/31 16:14:14 INFO DataSourceStrategy: Pruning 
> directories with: part_dt#219 IN (2024-05-30,2024-05-31)
> When more partitions meet the condition, it will increase by 2 to the power 
> of n, then divided by 2.
> for example:
> select a.part_dt{*},b.part_dt{*},c.part_dt
> from a 
> left join b on b.part_dt = a.part_dt 
> left join c on c.part_dt = a.part_dt
> where a.part_dt in ('2024-05-30','2024-05-31')
> result:
> 24/05/31 16:29:19 INFO ExecuteStatement: Execute in full collect mode
> 24/05/31 16:29:19 INFO DataSourceStrategy: Pruning directories with: 
> part_dt#249 IN (2024-05-30,2024-05-31)
> 24/05/31 16:29:19 INFO DataSourceStrategy: Pruning directories with: 
> part_dt#253 IN 
> (2024-05-30,2024-05-31),isnotnull(part_dt#253),dynamicpruning#261 
> [part_dt#253|#253]
> 24/05/31 16:29:19 INFO DataSourceStrategy: Pruning directories with: 
> part_dt#257 IN 
> (2024-05-30,2024-05-31),isnotnull(part_dt#257),dynamicpruning#262 
> [part_dt#257|#257]
> 24/05/31 16:29:19 INFO DataSourceStrategy: Pruning directories with: 
> part_dt#249 IN (2024-05-30,2024-05-31)
> 24/05/31 16:29:19 INFO DataSourceStrategy: Pruning directories with: 
> part_dt#249 IN (2024-05-30,2024-05-31)
> 24/05/31 16:29:19 INFO DataSourceStrategy: Pruning directories with: 
> part_dt#253 IN 
> (2024-05-30,2024-05-31),isnotnull(part_dt#253),dynamicpruning#261 
> [part_dt#253|#253]
> 24/05/31 16:29:19 INFO DataSourceStrategy: Pruning directories with: 
> part_dt#249 IN (2024-05-30,2024-05-31)
> The last four lines are all superfluous.
> When more than 10 conditions are met, it will cause a memory overflow when 
> generating DPP, and the generated plan will be about 400 times larger.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to