[
https://issues.apache.org/jira/browse/SPARK-23560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Bruce Robbins updated SPARK-23560:
----------------------------------
Summary: Group by on struct field can add extra shuffle (was: A joinWith
followed by groupBy requires extra shuffle)
> Group by on struct field can add extra shuffle
> ----------------------------------------------
>
> Key: SPARK-23560
> URL: https://issues.apache.org/jira/browse/SPARK-23560
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 2.3.0
> Environment: debian 8.9, macos x high sierra
> Reporter: Bruce Robbins
> Priority: Major
>
> Depending on the size of the input, a joinWith followed by a groupBy requires
> more shuffles than a join followed by a groupBy.
> For example, here's a joinWith on two CSV files, followed by a groupBy:
> {noformat}
> import org.apache.spark.sql.types._
> val schema = StructType(StructField("id1", LongType) :: StructField("id2",
> LongType) :: Nil)
> val df1 = spark.read.schema(schema).csv("ds1.csv")
> val df2 = spark.read.schema(schema).csv("ds2.csv")
> val result1 = df1.joinWith(df2, df1.col("id1") ===
> df2.col("id2")).groupBy("_1.id1").count
> result1.explain
> == Physical Plan ==
> *(6) HashAggregate(keys=[_1#8.id1#19L], functions=[count(1)])
> +- Exchange hashpartitioning(_1#8.id1#19L, 200)
> +- *(5) HashAggregate(keys=[_1#8.id1 AS _1#8.id1#19L],
> functions=[partial_count(1)])
> +- *(5) Project [_1#8]
> +- *(5) SortMergeJoin [_1#8.id1], [_2#9.id2], Inner
> :- *(2) Sort [_1#8.id1 ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(_1#8.id1, 200)
> : +- *(1) Project [named_struct(id1, id1#0L, id2, id2#1L) AS
> _1#8]
> : +- *(1) FileScan csv [id1#0L,id2#1L] Batched: false,
> Format: CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters:
> [], PushedFilters: [], ReadSchema: struct<id1:bigint,id2:bigint>
> +- *(4) Sort [_2#9.id2 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(_2#9.id2, 200)
> +- *(3) Project [named_struct(id1, id1#4L, id2, id2#5L) AS
> _2#9]
> +- *(3) FileScan csv [id1#4L,id2#5L] Batched: false,
> Format: CSV, Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters:
> [], PushedFilters: [], ReadSchema: struct<id1:bigint,id2:bigint>
> {noformat}
> Using join, there is one less shuffle:
> {noformat}
> val result2 = df1.join(df2, df1.col("id1") ===
> df2.col("id2")).groupBy(df1("id1")).count
> result2.explain
> == Physical Plan ==
> *(5) HashAggregate(keys=[id1#0L], functions=[count(1)])
> +- *(5) HashAggregate(keys=[id1#0L], functions=[partial_count(1)])
> +- *(5) Project [id1#0L]
> +- *(5) SortMergeJoin [id1#0L], [id2#5L], Inner
> :- *(2) Sort [id1#0L ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(id1#0L, 200)
> : +- *(1) Project [id1#0L]
> : +- *(1) Filter isnotnull(id1#0L)
> : +- *(1) FileScan csv [id1#0L] Batched: false, Format:
> CSV, Location: InMemoryFileIndex[file:.../ds1.csv], PartitionFilters: [],
> PushedFilters: [IsNotNull(id1)], ReadSchema: struct<id1:bigint>
> +- *(4) Sort [id2#5L ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(id2#5L, 200)
> +- *(3) Project [id2#5L]
> +- *(3) Filter isnotnull(id2#5L)
> +- *(3) FileScan csv [id2#5L] Batched: false, Format:
> CSV, Location: InMemoryFileIndex[file:...ds2.csv], PartitionFilters: [],
> PushedFilters: [IsNotNull(id2)], ReadSchema: struct<id2:bigint>
> {noformat}
> T-he extra exchange is reflected in the run time of the query.- Actually, I
> recant this bit. In my particular tests, the extra exchange has negligible
> impact on run time. All the difference is in stage 2.
> My tests were on inputs with more than 2 million records.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]