[
https://issues.apache.org/jira/browse/SPARK-48486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860381#comment-17860381
]
chenfengbin edited comment on SPARK-48486 at 6/27/24 8:24 AM:
--------------------------------------------------------------
SELECT *
FROM fact_sk a
LEFT JOIN fact_sk_1 b on b.store_id = a.store_id
LEFT JOIN fact_sk_2 c on c.store_id = a.store_id
WHERE a.store_id in ('4','5')
By analyzing the generated Optimized Logical Plan.
== Optimized Logical Plan ==
Join LeftOuter, (store_id#5490 = store_id#5482)
:- Join LeftOuter, (store_id#5486 = store_id#5482)
: :- Filter cast(store_id#5482 as string) IN (4,5)
: : +- Relation
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482|#5479,product_id#5480,units_sold#5481,store_id#5482]
parquet
: +- Filter ((cast(store_id#5486 as string) IN (4,5) AND
isnotnull(store_id#5486)) AND dynamicpruning#5506 [store_id#5486|#5486])
: : +- Filter cast(store_id#5482 as string) IN (4,5)
: : +- Relation
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482|#5479,product_id#5480,units_sold#5481,store_id#5482]
parquet
: +- Relation
default.fact_sk_1[date_id#5483,product_id#5484,units_sold#5485,store_id#5486|#5483,product_id#5484,units_sold#5485,store_id#5486]
parquet
+- Filter ((cast(store_id#5490 as string) IN (4,5) AND
isnotnull(store_id#5490)) AND dynamicpruning#5507 [store_id#5490|#5490])
: +- Join LeftOuter, (store_id#5486 = store_id#5482)
: :- Filter cast(store_id#5482 as string) IN (4,5)
: : +- Relation
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482|#5479,product_id#5480,units_sold#5481,store_id#5482]
parquet
: +- Filter dynamicpruning#5506 [store_id#5486|#5486]
: : +- Filter cast(store_id#5482 as string) IN (4,5)
: : +- Relation
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482|#5479,product_id#5480,units_sold#5481,store_id#5482]
parquet
: +- Filter (cast(store_id#5486 as string) IN (4,5) AND
isnotnull(store_id#5486))
: +- Relation
default.fact_sk_1[date_id#5483,product_id#5484,units_sold#5485,store_id#5486|#5483,product_id#5484,units_sold#5485,store_id#5486]
parquet
+- Relation
default.fact_sk_2[date_id#5487,product_id#5488,units_sold#5489,store_id#5490|#5487,product_id#5488,units_sold#5489,store_id#5490]
parquet
In the previous join, during the DPP transformation, the pushdown is converted
into a subquery, forming the following:
:- Join LeftOuter, (store_id#5486 = store_id#5482)
: :- Filter cast(store_id#5482 as string) IN (4,5)
: : +- Relation
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482|#5479,product_id#5480,units_sold#5481,store_id#5482]
parquet
: +- Filter ((cast(store_id#5486 as string) IN (4,5) AND
isnotnull(store_id#5486)) AND dynamicpruning#5506 [store_id#5486|#5486])
: : +- Filter cast(store_id#5482 as string) IN (4,5)
: : +- Relation
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482|#5479,product_id#5480,units_sold#5481,store_id#5482]
parquet
: +- Relation
default.fact_sk_1[date_id#5483,product_id#5484,units_sold#5485,store_id#5486|#5483,product_id#5484,units_sold#5485,store_id#5486]
parquet
This is then treated as a whole, and then passed down. On this basis, a
subquery is added.
+- Filter (cast(store_id#5490 as string) IN (4,5) AND isnotnull(store_id#5490))
+- Relation
default.fact_sk_2[date_id#5487,product_id#5488,units_sold#5489,store_id#5490|#5487,product_id#5488,units_sold#5489,store_id#5490]
parquet The above will be converted to:
+- Filter ((cast(store_id#5490 as string) IN (4,5) AND
isnotnull(store_id#5490)) AND dynamicpruning#5507 [store_id#5490|#5490])
: +- Join LeftOuter, (store_id#5486 = store_id#5482)
: :- Filter cast(store_id#5482 as string) IN (4,5)
: : +- Relation
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482|#5479,product_id#5480,units_sold#5481,store_id#5482]
parquet
: +- Filter dynamicpruning#5506 [store_id#5486|#5486]
: : +- Filter cast(store_id#5482 as string) IN (4,5) : : +- Relation
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482|#5479,product_id#5480,units_sold#5481,store_id#5482]
parquet
: +- Filter (cast(store_id#5486 as string) IN (4,5) AND
isnotnull(store_id#5486))
: +- Relation
default.fact_sk_1[date_id#5483,product_id#5484,units_sold#5485,store_id#5486|#5483,product_id#5484,units_sold#5485,store_id#5486]
parquet
+- Relation
default.fact_sk_2[date_id#5487,product_id#5488,units_sold#5489,store_id#5490|#5487,product_id#5488,units_sold#5489,store_id#5490]
parquet
In this case, when encountering multiple nested subquery pushdowns,
accumulation occurs. If it's 5 times, it's 2 to the power of 5; if it's 10
times, it's 2 to the power of 10. Here, a strategy needs to be added to limit
the nesting within a certain range.
If it is changed to traverse by transformDown, it means giving up nesting. When
each DPP is processed, only the last layer of subquery will be added.
was (Author: JIRAUSER305634):
SELECT *
FROM fact_sk a
LEFT JOIN fact_sk_1 b on b.store_id = a.store_id
LEFT JOIN fact_sk_2 c on c.store_id = a.store_id
WHERE a.store_id in ('4','5')
By analyzing the generated Optimized Logical Plan.
== Optimized Logical Plan ==
Join LeftOuter, (store_id#5490 = store_id#5482)
:- Join LeftOuter, (store_id#5486 = store_id#5482)
: :- Filter cast(store_id#5482 as string) IN (4,5)
: : +- Relation
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482]
parquet
: +- Filter ((cast(store_id#5486 as string) IN (4,5) AND
isnotnull(store_id#5486)) AND dynamicpruning#5506 [store_id#5486])
: : +- Filter cast(store_id#5482 as string) IN (4,5)
: : +- Relation
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482]
parquet
: +- Relation
default.fact_sk_1[date_id#5483,product_id#5484,units_sold#5485,store_id#5486]
parquet
+- Filter ((cast(store_id#5490 as string) IN (4,5) AND
isnotnull(store_id#5490)) AND dynamicpruning#5507 [store_id#5490])
: +- Join LeftOuter, (store_id#5486 = store_id#5482)
: :- Filter cast(store_id#5482 as string) IN (4,5)
: : +- Relation
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482]
parquet
: +- Filter dynamicpruning#5506 [store_id#5486]
: : +- Filter cast(store_id#5482 as string) IN (4,5)
: : +- Relation
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482]
parquet
: +- Filter (cast(store_id#5486 as string) IN (4,5) AND
isnotnull(store_id#5486))
: +- Relation
default.fact_sk_1[date_id#5483,product_id#5484,units_sold#5485,store_id#5486]
parquet
+- Relation
default.fact_sk_2[date_id#5487,product_id#5488,units_sold#5489,store_id#5490]
parquet
In the previous join, during the DPP transformation, the pushdown is converted
into a subquery, forming the following:
:- Join LeftOuter, (store_id#5486 = store_id#5482) : :- Filter
cast(store_id#5482 as string) IN (4,5) : : +- Relation
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482]
parquet : +- Filter ((cast(store_id#5486 as string) IN (4,5) AND
isnotnull(store_id#5486)) AND dynamicpruning#5506 [store_id#5486]) : : +-
Filter cast(store_id#5482 as string) IN (4,5) : : +- Relation
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482]
parquet : +- Relation
default.fact_sk_1[date_id#5483,product_id#5484,units_sold#5485,store_id#5486]
parquet
This is then treated as a whole, and then passed down. On this basis, a
subquery is added.
+- Filter (cast(store_id#5490 as string) IN (4,5) AND isnotnull(store_id#5490))
+- Relation
default.fact_sk_2[date_id#5487,product_id#5488,units_sold#5489,store_id#5490]
parquet The above will be converted to:
+- Filter ((cast(store_id#5490 as string) IN (4,5) AND
isnotnull(store_id#5490)) AND dynamicpruning#5507 [store_id#5490]) : +- Join
LeftOuter, (store_id#5486 = store_id#5482) : :- Filter cast(store_id#5482 as
string) IN (4,5) : : +- Relation
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482]
parquet : +- Filter dynamicpruning#5506 [store_id#5486] : : +- Filter
cast(store_id#5482 as string) IN (4,5) : : +- Relation
default.fact_sk[date_id#5479,product_id#5480,units_sold#5481,store_id#5482]
parquet : +- Filter (cast(store_id#5486 as string) IN (4,5) AND
isnotnull(store_id#5486)) : +- Relation
default.fact_sk_1[date_id#5483,product_id#5484,units_sold#5485,store_id#5486]
parquet +- Relation
default.fact_sk_2[date_id#5487,product_id#5488,units_sold#5489,store_id#5490]
parquet
In this case, when encountering multiple nested subquery pushdowns,
accumulation occurs. If it's 5 times, it's 2 to the power of 5; if it's 10
times, it's 2 to the power of 10. Here, a strategy needs to be added to limit
the nesting within a certain range.
If it is changed to traverse by transformDown, it means giving up nesting. When
each DPP is processed, only the last layer of subquery will be added.
> The Dynamic Partition Pruning (DPP) feature in Spark can cause the generated
> Directed Acyclic Graph (DAG) to expand.
> --------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-48486
> URL: https://issues.apache.org/jira/browse/SPARK-48486
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.3.0
> Reporter: chenfengbin
> Priority: Major
> Attachments: create_table.sql
>
>
> The Dynamic Partition Pruning (DPP) feature in Spark can cause the generated
> Directed Acyclic Graph (DAG) to expand.
> for example: The partition field of table A or B is part_dt.
> select a.part_dt,{*}b.part_dt{*}
> from a left join b on b.part_dt = a.part_dt
> where a.part_dt in ('2024-05-30','2024-05-31')
> During the generation of Dynamic Partition Pruning (DPP), the tree will be
> traversed recursively more times.
> 24/05/31 16:14:14 INFO DataSourceStrategy: Pruning directories with:
> part_dt#219 IN (2024-05-30,2024-05-31)
> 24/05/31 16:14:14 INFO DataSourceStrategy: Pruning directories with:
> part_dt#223 IN
> (2024-05-30,2024-05-31),isnotnull(part_dt#223),dynamicpruning#234
> [part_dt#223|#223]
> 24/05/31 16:14:14 INFO DataSourceStrategy: Pruning directories with:
> part_dt#219 IN (2024-05-30,2024-05-31)
> The last one is extra:24/05/31 16:14:14 INFO DataSourceStrategy: Pruning
> directories with: part_dt#219 IN (2024-05-30,2024-05-31)
> When more partitions meet the condition, it will increase by 2 to the power
> of n, then divided by 2.
> for example:
> select a.part_dt{*},b.part_dt{*},c.part_dt
> from a
> left join b on b.part_dt = a.part_dt
> left join c on c.part_dt = a.part_dt
> where a.part_dt in ('2024-05-30','2024-05-31')
> result:
> 24/05/31 16:29:19 INFO ExecuteStatement: Execute in full collect mode
> 24/05/31 16:29:19 INFO DataSourceStrategy: Pruning directories with:
> part_dt#249 IN (2024-05-30,2024-05-31)
> 24/05/31 16:29:19 INFO DataSourceStrategy: Pruning directories with:
> part_dt#253 IN
> (2024-05-30,2024-05-31),isnotnull(part_dt#253),dynamicpruning#261
> [part_dt#253|#253]
> 24/05/31 16:29:19 INFO DataSourceStrategy: Pruning directories with:
> part_dt#257 IN
> (2024-05-30,2024-05-31),isnotnull(part_dt#257),dynamicpruning#262
> [part_dt#257|#257]
> 24/05/31 16:29:19 INFO DataSourceStrategy: Pruning directories with:
> part_dt#249 IN (2024-05-30,2024-05-31)
> 24/05/31 16:29:19 INFO DataSourceStrategy: Pruning directories with:
> part_dt#249 IN (2024-05-30,2024-05-31)
> 24/05/31 16:29:19 INFO DataSourceStrategy: Pruning directories with:
> part_dt#253 IN
> (2024-05-30,2024-05-31),isnotnull(part_dt#253),dynamicpruning#261
> [part_dt#253|#253]
> 24/05/31 16:29:19 INFO DataSourceStrategy: Pruning directories with:
> part_dt#249 IN (2024-05-30,2024-05-31)
> The last four lines are all superfluous.
> When more than 10 conditions are met, it will cause a memory overflow when
> generating DPP, and the generated plan will be about 400 times larger.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]