[
https://issues.apache.org/jira/browse/SPARK-31350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Apache Spark reassigned SPARK-31350:
------------------------------------
Assignee: (was: Apache Spark)
> Coalesce bucketed tables for join if applicable
> -----------------------------------------------
>
> Key: SPARK-31350
> URL: https://issues.apache.org/jira/browse/SPARK-31350
> Project: Spark
> Issue Type: New Feature
> Components: SQL
> Affects Versions: 3.1.0
> Reporter: Terry Kim
> Priority: Major
>
> The following example of joining two bucketed tables introduces a full
> shuffle:
> {code:java}
> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
> val df1 = (0 until 20).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j",
> "k")
> val df2 = (0 until 20).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j",
> "k")
> df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
> df2.write.format("parquet").bucketBy(4, "i").saveAsTable("t2")
> val t1 = spark.table("t1")
> val t2 = spark.table("t2")
> val joined = t1.join(t2, t1("i") === t2("i"))
> joined.explain(true)
> == Physical Plan ==
> *(5) SortMergeJoin [i#44], [i#50], Inner
> :- *(2) Sort [i#44 ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(i#44, 200), true, [id=#105]
> : +- *(1) Project [i#44, j#45, k#46]
> : +- *(1) Filter isnotnull(i#44)
> : +- *(1) ColumnarToRow
> : +- FileScan parquet default.t1[i#44,j#45,k#46] Batched: true,
> DataFilters: [isnotnull(i#44)], Format: Parquet, Location:
> InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)],
> ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 8 out of 8
> +- *(4) Sort [i#50 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(i#50, 200), true, [id=#115]
> +- *(3) Project [i#50, j#51, k#52]
> +- *(3) Filter isnotnull(i#50)
> +- *(3) ColumnarToRow
> +- FileScan parquet default.t2[i#50,j#51,k#52] Batched: true,
> DataFilters: [isnotnull(i#50)], Format: Parquet, Location:
> InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)],
> ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 4 out of 4
> {code}
> But one side can be coalesced to eliminate the shuffle.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]