[
https://issues.apache.org/jira/browse/SPARK-31350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Terry Kim updated SPARK-31350:
------------------------------
Description:
The following example of joining two bucketed tables introduces a full shuffle:
{code:java}
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
val df1 = (0 until 20).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
val df2 = (0 until 20).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k")
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
df2.write.format("parquet").bucketBy(4, "i").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val joined = t1.join(t2, t1("i") === t2("i"))
joined.explain(true)
== Physical Plan ==
*(5) SortMergeJoin [i#44], [i#50], Inner
:- *(2) Sort [i#44 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(i#44, 200), true, [id=#105]
: +- *(1) Project [i#44, j#45, k#46]
: +- *(1) Filter isnotnull(i#44)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.t1[i#44,j#45,k#46] Batched: true,
DataFilters: [isnotnull(i#44)], Format: Parquet, Location:
InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)],
ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 8 out of 8
+- *(4) Sort [i#50 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(i#50, 200), true, [id=#115]
+- *(3) Project [i#50, j#51, k#52]
+- *(3) Filter isnotnull(i#50)
+- *(3) ColumnarToRow
+- FileScan parquet default.t2[i#50,j#51,k#52] Batched: true,
DataFilters: [isnotnull(i#50)], Format: Parquet, Location:
InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)],
ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 4 out of 4
{code}
But one side can be coalesced to eliminate the shuffle.
was:
The following example of joining two bucketed tables introduces a full shuffle:
{code:java}
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
spark.conf.set("spark.sql.bucketing.coalesce", "true")
val df1 = (0 until 20).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
val df2 = (0 until 20).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k")
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
df2.write.format("parquet").bucketBy(4, "i").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val joined = t1.join(t2, t1("i") === t2("i"))
joined.explain(true)
== Physical Plan ==
*(5) SortMergeJoin [i#44], [i#50], Inner
:- *(2) Sort [i#44 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(i#44, 200), true, [id=#105]
: +- *(1) Project [i#44, j#45, k#46]
: +- *(1) Filter isnotnull(i#44)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.t1[i#44,j#45,k#46] Batched: true,
DataFilters: [isnotnull(i#44)], Format: Parquet, Location:
InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)],
ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 8 out of 8
+- *(4) Sort [i#50 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(i#50, 200), true, [id=#115]
+- *(3) Project [i#50, j#51, k#52]
+- *(3) Filter isnotnull(i#50)
+- *(3) ColumnarToRow
+- FileScan parquet default.t2[i#50,j#51,k#52] Batched: true,
DataFilters: [isnotnull(i#50)], Format: Parquet, Location:
InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)],
ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 4 out of 4
{code}
But one side can be coalesced to eliminate the shuffle.
> Coalesce bucketed tables for join if applicable
> -----------------------------------------------
>
> Key: SPARK-31350
> URL: https://issues.apache.org/jira/browse/SPARK-31350
> Project: Spark
> Issue Type: New Feature
> Components: SQL
> Affects Versions: 3.1.0
> Reporter: Terry Kim
> Priority: Major
>
> The following example of joining two bucketed tables introduces a full
> shuffle:
> {code:java}
> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
> val df1 = (0 until 20).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j",
> "k")
> val df2 = (0 until 20).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j",
> "k")
> df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
> df2.write.format("parquet").bucketBy(4, "i").saveAsTable("t2")
> val t1 = spark.table("t1")
> val t2 = spark.table("t2")
> val joined = t1.join(t2, t1("i") === t2("i"))
> joined.explain(true)
> == Physical Plan ==
> *(5) SortMergeJoin [i#44], [i#50], Inner
> :- *(2) Sort [i#44 ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(i#44, 200), true, [id=#105]
> : +- *(1) Project [i#44, j#45, k#46]
> : +- *(1) Filter isnotnull(i#44)
> : +- *(1) ColumnarToRow
> : +- FileScan parquet default.t1[i#44,j#45,k#46] Batched: true,
> DataFilters: [isnotnull(i#44)], Format: Parquet, Location:
> InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)],
> ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 8 out of 8
> +- *(4) Sort [i#50 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(i#50, 200), true, [id=#115]
> +- *(3) Project [i#50, j#51, k#52]
> +- *(3) Filter isnotnull(i#50)
> +- *(3) ColumnarToRow
> +- FileScan parquet default.t2[i#50,j#51,k#52] Batched: true,
> DataFilters: [isnotnull(i#50)], Format: Parquet, Location:
> InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(i)],
> ReadSchema: struct<i:int,j:int,k:string>, SelectedBucketsCount: 4 out of 4
> {code}
> But one side can be coalesced to eliminate the shuffle.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]