[ 
https://issues.apache.org/jira/browse/SPARK-31350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-31350:
------------------------------------

    Assignee: Apache Spark

> Coalesce bucketed tables for join if applicable
> -----------------------------------------------
>
>                 Key: SPARK-31350
>                 URL: https://issues.apache.org/jira/browse/SPARK-31350
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 3.1.0
>            Reporter: Terry Kim
>            Assignee: Apache Spark
>            Priority: Major
>
> The following example of joining two bucketed tables introduces a full 
> shuffle:
> {code:java}
> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
> val df1 = (0 until 20).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", 
> "k")
> val df2 = (0 until 20).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", 
> "k")
> df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
> df2.write.format("parquet").bucketBy(4, "i").saveAsTable("t2")
> val t1 = spark.table("t1")
> val t2 = spark.table("t2")
> val joined = t1.join(t2, t1("i") === t2("i"))
> joined.explain(true)
> == Physical Plan ==
> *(5) SortMergeJoin [i#44], [i#50], Inner
> :- *(2) Sort [i#44 ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(i#44, 200), true, [id=#105]
> :     +- *(1) Project [i#44, j#45, k#46]
> :        +- *(1) Filter isnotnull(i#44)
> :           +- *(1) ColumnarToRow
> :              +- FileScan parquet default.t1[i#44,j#45,k#46] Batched: true, 
> DataFilters: [isnotnull(i#44)], Format: Parquet, Location: 
> InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)], 
> ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 8 out of 8
> +- *(4) Sort [i#50 ASC NULLS FIRST], false, 0
>    +- Exchange hashpartitioning(i#50, 200), true, [id=#115]
>       +- *(3) Project [i#50, j#51, k#52]
>          +- *(3) Filter isnotnull(i#50)
>             +- *(3) ColumnarToRow
>                +- FileScan parquet default.t2[i#50,j#51,k#52] Batched: true, 
> DataFilters: [isnotnull(i#50)], Format: Parquet, Location: 
> InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)], 
> ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 4 out of 4
> {code}
> But one side can be coalesced to eliminate the shuffle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to