[ https://issues.apache.org/jira/browse/SPARK-24343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
yucai updated SPARK-24343: -------------------------- Description: When shuffle.partition > bucket number, Spark needs to shuffle the bucket table as per the shuffle.partition, can we avoid this? See below example: {code:java} CREATE TABLE dev USING PARQUET AS SELECT ws_item_sk, i_item_sk FROM web_sales_bucketed JOIN item ON ws_item_sk = i_item_sk;{code} web_sales_bucketed is bucketed into 4 buckets and set shuffle.partition=10 Currently, both tables are shuffled into 10 partitions. {code:java} Execute CreateDataSourceTableAsSelectCommand CreateDataSourceTableAsSelectCommand `dev`, ErrorIfExists, [ws_item_sk#2, i_item_sk#6] +- *(5) SortMergeJoin [ws_item_sk#2], [i_item_sk#6], Inner :- *(2) Sort [ws_item_sk#2 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ws_item_sk#2, 10) : +- *(1) Project [ws_item_sk#2] : +- *(1) Filter isnotnull(ws_item_sk#2) : +- *(1) FileScan parquet tpcds10.web_sales_bucketed[ws_item_sk#2] Batched: true, Format: Parquet, Location:... +- *(4) Sort [i_item_sk#6 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(i_item_sk#6, 10) +- *(3) Project [i_item_sk#6] +- *(3) Filter isnotnull(i_item_sk#6) +- *(3) FileScan parquet tpcds10.item[i_item_sk#6] Batched: true, Format: Parquet, Location:... {code} A better plan should avoid the shuffle in the bucket table. {code:java} Execute CreateDataSourceTableAsSelectCommand CreateDataSourceTableAsSelectCommand `dev`, ErrorIfExists, [ws_item_sk#2, i_item_sk#6] +- *(4) SortMergeJoin [ws_item_sk#2], [i_item_sk#6], Inner :- *(1) Sort [ws_item_sk#2 ASC NULLS FIRST], false, 0 : +- *(1) Project [ws_item_sk#2] : +- *(1) Filter isnotnull(ws_item_sk#2) : +- *(1) FileScan parquet tpcds10.web_sales_bucketed[ws_item_sk#2] Batched: true, Format: Parquet, Location:... +- *(3) Sort [i_item_sk#6 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(i_item_sk#6, 4) +- *(2) Project [i_item_sk#6] +- *(2) Filter isnotnull(i_item_sk#6) +- *(2) FileScan parquet tpcds10.item[i_item_sk#6] Batched: true, Format: Parquet, Location:...{code} This problem could be worse if we enable the adaptive execution, because it usually prefers a big shuffle.parititon. was: When shuffle.partition > bucket number, Spark needs to shuffle the bucket table as per the shuffle.partition, can we avoid this? See below example: {code:java} CREATE TABLE dev USING PARQUET AS SELECT ws_item_sk, i_item_sk FROM web_sales_bucketed JOIN item ON ws_item_sk = i_item_sk;{code} web_sales_bucketed is bucketed into 4 buckets and set shuffle.partition=10 Currently, both tables are shuffled into 10 partitions. {code:java} Execute CreateDataSourceTableAsSelectCommand CreateDataSourceTableAsSelectCommand `dev`, ErrorIfExists, [ws_item_sk#2, i_item_sk#6] +- *(5) SortMergeJoin [ws_item_sk#2], [i_item_sk#6], Inner :- *(2) Sort [ws_item_sk#2 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ws_item_sk#2, 10) : +- *(1) Project [ws_item_sk#2] : +- *(1) Filter isnotnull(ws_item_sk#2) : +- *(1) FileScan parquet tpcds10.web_sales_bucketed[ws_item_sk#2] Batched: true, Format: Parquet, Location:... +- *(4) Sort [i_item_sk#6 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(i_item_sk#6, 10) +- *(3) Project [i_item_sk#6] +- *(3) Filter isnotnull(i_item_sk#6) +- *(3) FileScan parquet tpcds10.item[i_item_sk#6] Batched: true, Format: Parquet, Location:... {code} A better plan should avoid the shuffle in the bucket table. {code:java} Execute CreateDataSourceTableAsSelectCommand CreateDataSourceTableAsSelectCommand `dev`, ErrorIfExists, [ws_item_sk#2, i_item_sk#6] +- *(4) SortMergeJoin [ws_item_sk#2], [i_item_sk#6], Inner :- *(1) Sort [ws_item_sk#2 ASC NULLS FIRST], false, 0 : +- *(1) Project [ws_item_sk#2] : +- *(1) Filter isnotnull(ws_item_sk#2) : +- *(1) FileScan parquet tpcds10.web_sales_bucketed[ws_item_sk#2] Batched: true, Format: Parquet, Location:... +- *(3) Sort [i_item_sk#6 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(i_item_sk#6, 4) +- *(2) Project [i_item_sk#6] +- *(2) Filter isnotnull(i_item_sk#6) +- *(2) FileScan parquet tpcds10.item[i_item_sk#6] Batched: true, Format: Parquet, Location:...{code} This problem could be worse if we enable the adaptive execution, because it usually prefers a big shuffle.parititon. > Avoid shuffle for the bucketed table when shuffle.partition > bucket number > --------------------------------------------------------------------------- > > Key: SPARK-24343 > URL: https://issues.apache.org/jira/browse/SPARK-24343 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.0 > Reporter: yucai > Priority: Major > > When shuffle.partition > bucket number, Spark needs to shuffle the bucket > table as per the shuffle.partition, can we avoid this? > See below example: > {code:java} > CREATE TABLE dev > USING PARQUET > AS SELECT ws_item_sk, i_item_sk > FROM web_sales_bucketed > JOIN item ON ws_item_sk = i_item_sk;{code} > web_sales_bucketed is bucketed into 4 buckets and set shuffle.partition=10 > Currently, both tables are shuffled into 10 partitions. > {code:java} > Execute CreateDataSourceTableAsSelectCommand > CreateDataSourceTableAsSelectCommand `dev`, ErrorIfExists, [ws_item_sk#2, > i_item_sk#6] > +- *(5) SortMergeJoin [ws_item_sk#2], [i_item_sk#6], Inner > :- *(2) Sort [ws_item_sk#2 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(ws_item_sk#2, 10) > : +- *(1) Project [ws_item_sk#2] > : +- *(1) Filter isnotnull(ws_item_sk#2) > : +- *(1) FileScan parquet tpcds10.web_sales_bucketed[ws_item_sk#2] Batched: > true, Format: Parquet, Location:... > +- *(4) Sort [i_item_sk#6 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(i_item_sk#6, 10) > +- *(3) Project [i_item_sk#6] > +- *(3) Filter isnotnull(i_item_sk#6) > +- *(3) FileScan parquet tpcds10.item[i_item_sk#6] Batched: true, Format: > Parquet, Location:... > {code} > A better plan should avoid the shuffle in the bucket table. > {code:java} > Execute CreateDataSourceTableAsSelectCommand > CreateDataSourceTableAsSelectCommand `dev`, ErrorIfExists, [ws_item_sk#2, > i_item_sk#6] > +- *(4) SortMergeJoin [ws_item_sk#2], [i_item_sk#6], Inner > :- *(1) Sort [ws_item_sk#2 ASC NULLS FIRST], false, 0 > : +- *(1) Project [ws_item_sk#2] > : +- *(1) Filter isnotnull(ws_item_sk#2) > : +- *(1) FileScan parquet tpcds10.web_sales_bucketed[ws_item_sk#2] Batched: > true, Format: Parquet, Location:... > +- *(3) Sort [i_item_sk#6 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(i_item_sk#6, 4) > +- *(2) Project [i_item_sk#6] > +- *(2) Filter isnotnull(i_item_sk#6) > +- *(2) FileScan parquet tpcds10.item[i_item_sk#6] Batched: true, Format: > Parquet, Location:...{code} > This problem could be worse if we enable the adaptive execution, because it > usually prefers a big shuffle.parititon. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org