[
https://issues.apache.org/jira/browse/SPARK-27573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Josh Rosen updated SPARK-27573:
-------------------------------
Description:
When an aggregation requires a shuffle, Spark SQL performs separate partial and
final aggregations:
{code:java}
sql("select id % 100 as k, id as v from range(100000)")
.groupBy("k")
.sum("v")
.explain
== Physical Plan ==
*(2) HashAggregate(keys=[k#64L], functions=[sum(v#65L)])
+- Exchange(coordinator id: 2031684357) hashpartitioning(k#64L, 5340),
coordinator[target post-shuffle partition size: 67108864]
+- *(1) HashAggregate(keys=[k#64L], functions=[partial_sum(v#65L)])
+- *(1) Project [(id#66L % 100) AS k#64L, id#66L AS v#65L]
+- *(1) Range (0, 100000, step=1, splits=10)
{code}
However, consider what happens if the dataset being aggregated is already
pre-partitioned by the aggregate's grouping columns:
{code:java}
sql("select id % 100 as k, id as v from range(100000)")
.repartition(10, $"k")
.groupBy("k")
.sum("v")
.explain
== Physical Plan ==
*(2) HashAggregate(keys=[k#50L], functions=[sum(v#51L)], output=[k#50L,
sum(v)#58L])
+- *(2) HashAggregate(keys=[k#50L], functions=[partial_sum(v#51L)],
output=[k#50L, sum#63L])
+- Exchange(coordinator id: 39015877) hashpartitioning(k#50L, 10),
coordinator[target post-shuffle partition size: 67108864]
+- *(1) Project [(id#52L % 100) AS k#50L, id#52L AS v#51L]
+- *(1) Range (0, 100000, step=1, splits=10)
{code}
Here, we end up with back-to-back HashAggregate operators which are performed
as part of the same stage.
For certain aggregates (e.g. _sum_, _count_), this duplication is unnecessary:
we could have just performed a total aggregation instead (since we already have
all of the data co-located)!
The duplicate aggregate is problematic in cases where the aggregate inputs and
outputs are the same order of magnitude (e.g.counting the number of duplicate
records in a dataset where duplicates are extremely rare).
My motivation for this optimization is similar to SPARK-1412: I know that
partial aggregation doesn't help for my workload, so I wanted to somehow coerce
Spark into skipping the ineffective partial aggregation and jumping directly to
total aggregation. I thought that pre-partitioning would accomplish this, but
doing so didn't achieve my goal due to the missing aggregation-collapsing (or
partial-aggregate skipping) optimization.
was:
When an aggregation requires a shuffle, Spark SQL performs separate partial and
final aggregations:
{code:java}
sql("select id % 100 as k, id as v from range(100000)")
.groupBy("k")
.sum("v")
.explain
== Physical Plan ==
*(2) HashAggregate(keys=[k#64L], functions=[sum(v#65L)])
+- Exchange(coordinator id: 2031684357) hashpartitioning(k#64L, 5340),
coordinator[target post-shuffle partition size: 67108864]
+- *(1) HashAggregate(keys=[k#64L], functions=[partial_sum(v#65L)])
+- *(1) Project [(id#66L % 100) AS k#64L, id#66L AS v#65L]
+- *(1) Range (0, 100000, step=1, splits=10)
{code}
However, consider what happens if the dataset being aggregated is already
pre-partitioned by the aggregate's grouping columns:
{code:java}
sql("select id % 100 as k, id as v from range(100000)")
.repartition(10, $"k")
.groupBy("k")
.sum("v")
.explain
== Physical Plan ==
*(2) HashAggregate(keys=[k#50L], functions=[sum(v#51L)], output=[k#50L,
sum(v)#58L])
+- *(2) HashAggregate(keys=[k#50L], functions=[partial_sum(v#51L)],
output=[k#50L, sum#63L])
+- Exchange(coordinator id: 39015877) hashpartitioning(k#50L, 10),
coordinator[target post-shuffle partition size: 67108864]
+- *(1) Project [(id#52L % 100) AS k#50L, id#52L AS v#51L]
+- *(1) Range (0, 100000, step=1, splits=10)
{code}
Here, we end up with back-to-back HashAggregate operators which are performed
as part of the same stage.
For certain aggregates (e.g. _sum_, _count_), this duplication is unnecessary:
we could have just performed a total aggregation instead (since we already have
all of the data co-located)!
The duplicate aggregate is problematic in cases where the aggregate inputs and
outputs are the same order of magnitude (e.g.counting the number of duplicate
records in a dataset where duplicates are extremely rare).
My motivation for this optimization is similar to SPARK-1412: I know that
partial aggregation doesn't help for my workload, so I wanted to somehow coerce
Spark into skipping the ineffective partial aggregation and jumping directly to
total aggregation. I thought that pre-partitioning would accomplish this, but
doing so didn't achieve my goal due to the missing aggregation-collapsing
optimization.
> Skip partial aggregation when data is already partitioned (or collapse
> adjacent partial and final aggregates)
> -------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-27573
> URL: https://issues.apache.org/jira/browse/SPARK-27573
> Project: Spark
> Issue Type: Improvement
> Components: Optimizer, SQL
> Affects Versions: 2.4.0
> Reporter: Josh Rosen
> Priority: Major
>
> When an aggregation requires a shuffle, Spark SQL performs separate partial
> and final aggregations:
> {code:java}
> sql("select id % 100 as k, id as v from range(100000)")
> .groupBy("k")
> .sum("v")
> .explain
> == Physical Plan ==
> *(2) HashAggregate(keys=[k#64L], functions=[sum(v#65L)])
> +- Exchange(coordinator id: 2031684357) hashpartitioning(k#64L, 5340),
> coordinator[target post-shuffle partition size: 67108864]
> +- *(1) HashAggregate(keys=[k#64L], functions=[partial_sum(v#65L)])
> +- *(1) Project [(id#66L % 100) AS k#64L, id#66L AS v#65L]
> +- *(1) Range (0, 100000, step=1, splits=10)
> {code}
> However, consider what happens if the dataset being aggregated is already
> pre-partitioned by the aggregate's grouping columns:
> {code:java}
> sql("select id % 100 as k, id as v from range(100000)")
> .repartition(10, $"k")
> .groupBy("k")
> .sum("v")
> .explain
> == Physical Plan ==
> *(2) HashAggregate(keys=[k#50L], functions=[sum(v#51L)], output=[k#50L,
> sum(v)#58L])
> +- *(2) HashAggregate(keys=[k#50L], functions=[partial_sum(v#51L)],
> output=[k#50L, sum#63L])
> +- Exchange(coordinator id: 39015877) hashpartitioning(k#50L, 10),
> coordinator[target post-shuffle partition size: 67108864]
> +- *(1) Project [(id#52L % 100) AS k#50L, id#52L AS v#51L]
> +- *(1) Range (0, 100000, step=1, splits=10)
> {code}
> Here, we end up with back-to-back HashAggregate operators which are performed
> as part of the same stage.
> For certain aggregates (e.g. _sum_, _count_), this duplication is
> unnecessary: we could have just performed a total aggregation instead (since
> we already have all of the data co-located)!
> The duplicate aggregate is problematic in cases where the aggregate inputs
> and outputs are the same order of magnitude (e.g.counting the number of
> duplicate records in a dataset where duplicates are extremely rare).
> My motivation for this optimization is similar to SPARK-1412: I know that
> partial aggregation doesn't help for my workload, so I wanted to somehow
> coerce Spark into skipping the ineffective partial aggregation and jumping
> directly to total aggregation. I thought that pre-partitioning would
> accomplish this, but doing so didn't achieve my goal due to the missing
> aggregation-collapsing (or partial-aggregate skipping) optimization.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]