Terry Kim created SPARK-31350:
---------------------------------

             Summary: Coalesce bucketed tables for join if applicable
                 Key: SPARK-31350
                 URL: https://issues.apache.org/jira/browse/SPARK-31350
             Project: Spark
          Issue Type: New Feature
          Components: SQL
    Affects Versions: 3.1.0
            Reporter: Terry Kim


The following example of joining two bucketed tables introduces a full shuffle:
{code:java}
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
spark.conf.set("spark.sql.bucketing.coalesce", "true")
val df1 = (0 until 20).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
val df2 = (0 until 20).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k")
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
df2.write.format("parquet").bucketBy(4, "i").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val joined = t1.join(t2, t1("i") === t2("i"))
joined.explain(true)


== Physical Plan ==
*(5) SortMergeJoin [i#44], [i#50], Inner
:- *(2) Sort [i#44 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i#44, 200), true, [id=#105]
:     +- *(1) Project [i#44, j#45, k#46]
:        +- *(1) Filter isnotnull(i#44)
:           +- *(1) ColumnarToRow
:              +- FileScan parquet default.t1[i#44,j#45,k#46] Batched: true, 
DataFilters: [isnotnull(i#44)], Format: Parquet, Location: 
InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)], 
ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 8 out of 8
+- *(4) Sort [i#50 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i#50, 200), true, [id=#115]
      +- *(3) Project [i#50, j#51, k#52]
         +- *(3) Filter isnotnull(i#50)
            +- *(3) ColumnarToRow
               +- FileScan parquet default.t2[i#50,j#51,k#52] Batched: true, 
DataFilters: [isnotnull(i#50)], Format: Parquet, Location: 
InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)], 
ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 4 out of 4
{code}
But one side can be coalesced to eliminate the shuffle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to