[ 
https://issues.apache.org/jira/browse/SPARK-43900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhen Wang updated SPARK-43900:
------------------------------
    Summary: Support optimize skewed partitions even if introduce extra shuffle 
 (was: Support optimize skewed partitions if introduce extra shuffle)

> Support optimize skewed partitions even if introduce extra shuffle
> ------------------------------------------------------------------
>
>                 Key: SPARK-43900
>                 URL: https://issues.apache.org/jira/browse/SPARK-43900
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.5.0
>            Reporter: Zhen Wang
>            Priority: Major
>
> Similar to [SPARK-33832|https://issues.apache.org/jira/browse/SPARK-33832], 
> OptimizeSkewInRebalancePartitions will not apply if skew mitigation causes a 
> new shuffle.
> Test case (data skew in RebalancePartition):
> {code:java}
> *(2) HashAggregate(keys=[c1#226], functions=[count(1)], output=[c1#226, 
> count(1)#231L])
> +- *(2) HashAggregate(keys=[c1#226], functions=[partial_count(1)], 
> output=[c1#226, count#235L])
>    +- AQEShuffleRead coalesced
>       +- ShuffleQueryStage 0
>          +- Exchange hashpartitioning(c1#226, 5), 
> REBALANCE_PARTITIONS_BY_COL, [plan_id=106]
>             +- *(1) Project [key#221 AS c1#226]
>                +- *(1) SerializeFromObject 
> [knownnotnull(assertnotnull(input[0, 
> org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#221]
>                   +- Scan[obj#220] {code}
> expect:
> {code:java}
> HashAggregate(keys=[c1#226], functions=[count(1)], output=[c1#226, 
> count(1)#231L])
> +- AQEShuffleRead coalesced
>    +- ShuffleQueryStage 1
>       +- Exchange hashpartitioning(c1#226, 5), ENSURE_REQUIREMENTS, 
> [plan_id=140]
>          +- *(2) HashAggregate(keys=[c1#226], functions=[partial_count(1)], 
> output=[c1#226, count#235L])
>             +- AQEShuffleRead coalesced and skewed
>                +- ShuffleQueryStage 0
>                   +- Exchange hashpartitioning(c1#226, 5), 
> REBALANCE_PARTITIONS_BY_COL, [plan_id=106]
>                      +- *(1) Project [key#221 AS c1#226]
>                         +- *(1) SerializeFromObject 
> [knownnotnull(assertnotnull(input[0, 
> org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#221]
>                            +- Scan[obj#220] {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to