[ 
https://issues.apache.org/jira/browse/SPARK-43182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17766690#comment-17766690
 ] 

Qian Sun edited comment on SPARK-43182 at 9/19/23 8:14 AM:
-----------------------------------------------------------

Hi [~Resol1992]

I ran your sql, tried different configuration combinations and believe 
regression caused by *spark.sql.adaptive.forceOptimizeSkewedJoin* , which 
introduces 
extra shuffles. AQE can give up skewJoin Optimization if extra shuffle 
introduced when *spark.sql.adaptive.forceOptimizeSkewedJoin* is false. cc 
[~cloud_fan] 

 

ref: 

[https://github.com/apache/spark/blob/87a5442f7ed96b11051d8a9333476d080054e5a0/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala#L225-L229]


was (Author: dcoliversun):
Hi [~Resol1992]

I ran your sql, tried different configuration combinations and believe 
regression caused by *spark.sql.adaptive.forceOptimizeSkewedJoin* , which 
introduces 
extra shuffles. AQE can give up skewJoin Optimization if extra shuffle 
introduced when *spark.sql.adaptive.forceOptimizeSkewedJoin* is false. cc 
[~cloud_fan]  * 
https://github.com/apache/spark/blob/87a5442f7ed96b11051d8a9333476d080054e5a0/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala#L225-L229

> Mutilple tables join with limit when AE is enabled and one table is skewed
> --------------------------------------------------------------------------
>
>                 Key: SPARK-43182
>                 URL: https://issues.apache.org/jira/browse/SPARK-43182
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.0
>            Reporter: Liu Shuo
>            Priority: Critical
>         Attachments: part-m-00000.zip, part-m-00001.zip, part-m-00002.zip, 
> part-m-00003.zip, part-m-00004.zip, part-m-00005.zip, part-m-00006.zip, 
> part-m-00007.zip, part-m-00008.zip, part-m-00009.zip, part-m-00010.zip, 
> part-m-00011.zip, part-m-00012.zip, part-m-00013.zip, part-m-00014.zip, 
> part-m-00015.zip, part-m-00016.zip, part-m-00017.zip, part-m-00018.zip, 
> part-m-00019.zip
>
>
> When we test AE in Spark3.4.0 with the following case, we find If we disable 
> AE or enable Ae but disable skewJoin, the sql will finish in 20s, but if we 
> enable AE and enable skewJoin,it will take very long time.
> The test case:
> {code:java}
> ###uncompress the part-m-***.zip attachment, and put these files under 
> '/tmp/spark-warehouse/data/' dir.
> create table source_aqe(c1 int,c18 string) using csv options(path 
> 'file:///tmp/spark-warehouse/data/');
> create table hive_snappy_aqe_table1(c1 int)stored as PARQUET partitioned 
> by(c18 string); 
> insert into table hive_snappy_aqe_table1 partition(c18=1)select c1 from 
> source_aqe;
> insert into table hive_snappy_aqe_table1 partition(c18=2)select c1 from 
> source_aqe limit 120000;
> insert into table hive_snappy_aqe_table1 partition(c18=3)select c1 from 
> source_aqe limit 150000;create table hive_snappy_aqe_table2(c1 int)stored as 
> PARQUET partitioned by(c18 string); 
> insert into table hive_snappy_aqe_table2 partition(c18=1)select c1 from 
> source_aqe limit 160000;
> insert into table hive_snappy_aqe_table2 partition(c18=2)select c1 from 
> source_aqe limit 120000;create table hive_snappy_aqe_table3(c1 int)stored as 
> PARQUET partitioned by(c18 string); 
> insert into table hive_snappy_aqe_table3 partition(c18=1)select c1 from 
> source_aqe limit 160000;
> insert into table hive_snappy_aqe_table3 partition(c18=2)select c1 from 
> source_aqe limit 120000;
> set spark.sql.adaptive.enabled=false;
> set spark.sql.adaptive.forceOptimizeSkewedJoin = false;
> set spark.sql.adaptive.skewJoin.skewedPartitionFactor=1;
> set spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=10KB;
> set spark.sql.adaptive.advisoryPartitionSizeInBytes=100KB;
> set spark.sql.autoBroadcastJoinThreshold = 51200;
>  
> ###it will finish in 20s 
> select * from hive_snappy_aqe_table1 join hive_snappy_aqe_table2 on 
> hive_snappy_aqe_table1.c18=hive_snappy_aqe_table2.c18 join 
> hive_snappy_aqe_table3 on 
> hive_snappy_aqe_table1.c18=hive_snappy_aqe_table3.c18 limit 10;
> set spark.sql.adaptive.enabled=true;
> set spark.sql.adaptive.forceOptimizeSkewedJoin = true;
> set spark.sql.adaptive.skewJoin.skewedPartitionFactor=1;
> set spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=10KB;
> set spark.sql.adaptive.advisoryPartitionSizeInBytes=100KB;
> set spark.sql.autoBroadcastJoinThreshold = 51200;
> ###it will take very long time 
> select * from hive_snappy_aqe_table1 join hive_snappy_aqe_table2 on 
> hive_snappy_aqe_table1.c18=hive_snappy_aqe_table2.c18 join 
> hive_snappy_aqe_table3 on 
> hive_snappy_aqe_table1.c18=hive_snappy_aqe_table3.c18 limit 10;
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to