[
https://issues.apache.org/jira/browse/SPARK-38282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tanel Kiis updated SPARK-38282:
-------------------------------
Description:
Spark will duplicate all non-trivial expressions in Window.partitionBy, that
will result in duplicate exchanges and WindowExec nodes.
An example unit test:
{code}
test("SPARK-38282: Avoid duplicating complex partitioning expressions") {
val group = functions.col("id") % 2
val min = functions.min("id").over(Window.partitionBy(group))
val max = functions.max("id").over(Window.partitionBy(group))
val df1 = spark.range(1, 4)
.withColumn("ratio", max / min)
val df2 = spark.range(1, 4)
.withColumn("min", min)
.withColumn("max", max)
.select(col("id"), (col("max") / col("min")).as("ratio"))
Seq(df1, df2).foreach { df =>
checkAnswer(
df,
Seq(Row(1L, 3.0), Row(2L, 1.0), Row(3L, 3.0)))
val windows = collect(df.queryExecution.executedPlan) {
case w: WindowExec => w
}
assert(windows.size == 1)
}
}
{code}
The query plan for this (_w0#5L and _w1#6L are duplicates):
{code}
Window [min(id#2L) windowspecdefinition(_w1#6L, specifiedwindowframe(RowFrame,
unboundedpreceding$(), unboundedfollowing$())) AS _we1#8L], [_w1#6L]
+- *(4) Sort [_w1#6L ASC NULLS FIRST], false, 0
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 1
+- Exchange hashpartitioning(_w1#6L, 5), ENSURE_REQUIREMENTS,
[id=#256]
+- *(3) Project [id#2L, _w1#6L, _we0#7L]
+- Window [max(id#2L) windowspecdefinition(_w0#5L,
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
AS _we0#7L], [_w0#5L]
+- *(2) Sort [_w0#5L ASC NULLS FIRST], false, 0
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 0
+- Exchange hashpartitioning(_w0#5L, 5),
ENSURE_REQUIREMENTS, [id=#203]
+- *(1) Project [id#2L, (id#2L % 2) AS _w0#5L,
(id#2L % 2) AS _w1#6L]
+- *(1) Range (1, 4, step=1, splits=2)
{code}
was:
{code}
test("SPARK-XXXXX: Avoid duplicating complex partitioning expressions") {
val group = functions.col("id") % 2
val min = functions.min("id").over(Window.partitionBy(group))
val max = functions.max("id").over(Window.partitionBy(group))
val df1 = spark.range(1, 4)
.withColumn("ratio", max / min)
val df2 = spark.range(1, 4)
.withColumn("min", min)
.withColumn("max", max)
.select(col("id"), (col("max") / col("min")).as("ratio"))
Seq(df1, df2).foreach { df =>
checkAnswer(
df,
Seq(Row(1L, 3.0), Row(2L, 1.0), Row(3L, 3.0)))
val windows = collect(df.queryExecution.executedPlan) {
case w: WindowExec => w
}
assert(windows.size == 1)
}
}
{code}
> Avoid duplicating complex partitioning expressions
> --------------------------------------------------
>
> Key: SPARK-38282
> URL: https://issues.apache.org/jira/browse/SPARK-38282
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.3.0
> Reporter: Tanel Kiis
> Priority: Major
>
> Spark will duplicate all non-trivial expressions in Window.partitionBy, that
> will result in duplicate exchanges and WindowExec nodes.
> An example unit test:
> {code}
> test("SPARK-38282: Avoid duplicating complex partitioning expressions") {
> val group = functions.col("id") % 2
> val min = functions.min("id").over(Window.partitionBy(group))
> val max = functions.max("id").over(Window.partitionBy(group))
> val df1 = spark.range(1, 4)
> .withColumn("ratio", max / min)
> val df2 = spark.range(1, 4)
> .withColumn("min", min)
> .withColumn("max", max)
> .select(col("id"), (col("max") / col("min")).as("ratio"))
> Seq(df1, df2).foreach { df =>
> checkAnswer(
> df,
> Seq(Row(1L, 3.0), Row(2L, 1.0), Row(3L, 3.0)))
> val windows = collect(df.queryExecution.executedPlan) {
> case w: WindowExec => w
> }
> assert(windows.size == 1)
> }
> }
> {code}
> The query plan for this (_w0#5L and _w1#6L are duplicates):
> {code}
> Window [min(id#2L) windowspecdefinition(_w1#6L,
> specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
> AS _we1#8L], [_w1#6L]
> +- *(4) Sort [_w1#6L ASC NULLS FIRST], false, 0
> +- AQEShuffleRead coalesced
> +- ShuffleQueryStage 1
> +- Exchange hashpartitioning(_w1#6L, 5), ENSURE_REQUIREMENTS,
> [id=#256]
> +- *(3) Project [id#2L, _w1#6L, _we0#7L]
> +- Window [max(id#2L) windowspecdefinition(_w0#5L,
> specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
> AS _we0#7L], [_w0#5L]
> +- *(2) Sort [_w0#5L ASC NULLS FIRST], false, 0
> +- AQEShuffleRead coalesced
> +- ShuffleQueryStage 0
> +- Exchange hashpartitioning(_w0#5L, 5),
> ENSURE_REQUIREMENTS, [id=#203]
> +- *(1) Project [id#2L, (id#2L % 2) AS
> _w0#5L, (id#2L % 2) AS _w1#6L]
> +- *(1) Range (1, 4, step=1, splits=2)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]