[
https://issues.apache.org/jira/browse/SPARK-38034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
zhou xiang updated SPARK-38034:
-------------------------------
Description:
TransposeWindow rule will try to eliminate unnecessary shuffle:
{code:java}
/**
* Transpose Adjacent Window Expressions.
* - If the partition spec of the parent Window expression is compatible with
the partition spec
* of the child window expression, transpose them.
*/
object TransposeWindow extends Rule[LogicalPlan] {
private def compatiblePartitions(ps1 : Seq[Expression], ps2:
Seq[Expression]): Boolean = {
ps1.length < ps2.length &&
ps2.take(ps1.length).permutations.exists(ps1.zip(_).forall {
case (l, r) => l.semanticEquals(r)
})
}
private def windowsCompatible(w1: Window, w2: Window): Boolean = {
w1.references.intersect(w2.windowOutputSet).isEmpty &&
w1.expressions.forall(_.deterministic) &&
w2.expressions.forall(_.deterministic) &&
compatiblePartitions(w1.partitionSpec, w2.partitionSpec)
}
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
_.containsPattern(WINDOW), ruleId) {
case w1 @ Window(_, _, _, w2 @ Window(_, _, _, grandChild))
if windowsCompatible(w1, w2) =>
Project(w1.output, w2.copy(child = w1.copy(child = grandChild)))
case w1 @ Window(_, _, _, Project(pl, w2 @ Window(_, _, _, grandChild)))
if windowsCompatible(w1, w2) &&
w1.references.subsetOf(grandChild.outputSet) =>
Project(
pl ++ w1.windowOutputSet,
w2.copy(child = w1.copy(child = grandChild)))
}
} {code}
but the function compatiblePartitions will only take the first n elements of
the ps2 sequence, for some cases, this will not take effect, like the case
below:
{code:java}
val df = spark.range(10).selectExpr("id AS a", "id AS b", "id AS c", "id AS d")
df.selectExpr(
"sum(`d`) OVER(PARTITION BY `b`,`a`) as e",
"sum(`c`) OVER(PARTITION BY `a`) as f"
).explain
{code}
Current plan
{code:java}
== Physical Plan ==
*(5) Project [e#10L, f#11L]
+- Window [sum(c#4L) windowspecdefinition(a#2L, specifiedwindowframe(RowFrame,
unboundedpreceding$(), unboundedfollowing$())) AS f#11L], [a#2L]
+- *(4) Sort [a#2L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(a#2L, 200), true, [id=#41]
+- *(3) Project [a#2L, c#4L, e#10L]
+- Window [sum(d#5L) windowspecdefinition(b#3L, a#2L,
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
AS e#10L], [b#3L, a#2L]
+- *(2) Sort [b#3L ASC NULLS FIRST, a#2L ASC NULLS FIRST],
false, 0
+- Exchange hashpartitioning(b#3L, a#2L, 200), true, [id=#33]
+- *(1) Project [id#0L AS d#5L, id#0L AS b#3L, id#0L AS
a#2L, id#0L AS c#4L]
+- *(1) Range (0, 10, step=1, splits=10) {code}
Expected plan:
{code:java}
== Physical Plan ==
*(4) Project [e#924L, f#925L]
+- Window [sum(d#43L) windowspecdefinition(b#41L, a#40L,
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
AS e#924L], [b#41L, a#40L]
+- *(3) Sort [b#41L ASC NULLS FIRST, a#40L ASC NULLS FIRST], false, 0
+- *(3) Project [d#43L, b#41L, a#40L, f#925L]
+- Window [sum(c#42L) windowspecdefinition(a#40L,
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
AS f#925L], [a#40L]
+- *(2) Sort [a#40L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(a#40L, 200), true, [id=#282]
+- *(1) Project [id#38L AS d#43L, id#38L AS b#41L, id#38L AS
a#40L, id#38L AS c#42L]
+- *(1) Range (0, 10, step=1, splits=10) {code}
Also the permutations method has a O(n!) time complexity, which is very
expensive when we have many partition columns, we could try to optimize it.
was:
TransposeWindow rule will try to eliminate unnecessary shuffle:
{code:java}
/**
* Transpose Adjacent Window Expressions.
* - If the partition spec of the parent Window expression is compatible with
the partition spec
* of the child window expression, transpose them.
*/
object TransposeWindow extends Rule[LogicalPlan] {
private def compatiblePartitions(ps1 : Seq[Expression], ps2:
Seq[Expression]): Boolean = {
ps1.length < ps2.length &&
ps2.take(ps1.length).permutations.exists(ps1.zip(_).forall {
case (l, r) => l.semanticEquals(r)
})
}
private def windowsCompatible(w1: Window, w2: Window): Boolean = {
w1.references.intersect(w2.windowOutputSet).isEmpty &&
w1.expressions.forall(_.deterministic) &&
w2.expressions.forall(_.deterministic) &&
compatiblePartitions(w1.partitionSpec, w2.partitionSpec)
}
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
_.containsPattern(WINDOW), ruleId) {
case w1 @ Window(_, _, _, w2 @ Window(_, _, _, grandChild))
if windowsCompatible(w1, w2) =>
Project(w1.output, w2.copy(child = w1.copy(child = grandChild)))
case w1 @ Window(_, _, _, Project(pl, w2 @ Window(_, _, _, grandChild)))
if windowsCompatible(w1, w2) &&
w1.references.subsetOf(grandChild.outputSet) =>
Project(
pl ++ w1.windowOutputSet,
w2.copy(child = w1.copy(child = grandChild)))
}
} {code}
but the function compatiblePartitions will only take the first n elements of
the ps2 sequence, for some cases, this will not take effect, like the case
below:
{code:java}
val df = spark.range(10).selectExpr("id AS a", "id AS b", "id AS c", "id AS d")
df.selectExpr(
"sum(`d`) OVER(PARTITION BY `b`,`a`) as e",
"sum(`c`) OVER(PARTITION BY `a`) as f"
).explain
{code}
Current plan
{code:java}
== Physical Plan ==
*(5) Project [e#10L, f#11L]
+- Window [sum(c#4L) windowspecdefinition(a#2L, specifiedwindowframe(RowFrame,
unboundedpreceding$(), unboundedfollowing$())) AS f#11L], [a#2L]
+- *(4) Sort [a#2L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(a#2L, 200), true, [id=#41]
+- *(3) Project [a#2L, c#4L, e#10L]
+- Window [sum(d#5L) windowspecdefinition(b#3L, a#2L,
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
AS e#10L], [b#3L, a#2L]
+- *(2) Sort [b#3L ASC NULLS FIRST, a#2L ASC NULLS FIRST],
false, 0
+- Exchange hashpartitioning(b#3L, a#2L, 200), true, [id=#33]
+- *(1) Project [id#0L AS d#5L, id#0L AS b#3L, id#0L AS
a#2L, id#0L AS c#4L]
+- *(1) Range (0, 10, step=1, splits=10) {code}
Expected plan:
{code:java}
== Physical Plan ==
*(4) Project [e#924L, f#925L]
+- Window [sum(d#43L) windowspecdefinition(b#41L, a#40L,
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
AS e#924L], [b#41L, a#40L]
+- *(3) Sort [b#41L ASC NULLS FIRST, a#40L ASC NULLS FIRST], false, 0
+- *(3) Project [d#43L, b#41L, a#40L, f#925L]
+- Window [sum(c#42L) windowspecdefinition(a#40L,
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
AS f#925L], [a#40L]
+- *(2) Sort [a#40L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(a#40L, 200), true, [id=#282]
+- *(1) Project [id#38L AS d#43L, id#38L AS b#41L, id#38L AS
a#40L, id#38L AS c#42L]
+- *(1) Range (0, 10, step=1, splits=10) {code}
Also the permutations method has a O(n!) time complexity, which is very
expensive when we have many partition columns, we could try to optimize it.
> Optimize time complexity and extend applicable cases for TransposeWindow
> -------------------------------------------------------------------------
>
> Key: SPARK-38034
> URL: https://issues.apache.org/jira/browse/SPARK-38034
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.2.0
> Reporter: zhou xiang
> Priority: Minor
>
> TransposeWindow rule will try to eliminate unnecessary shuffle:
> {code:java}
> /**
> * Transpose Adjacent Window Expressions.
> * - If the partition spec of the parent Window expression is compatible with
> the partition spec
> * of the child window expression, transpose them.
> */
> object TransposeWindow extends Rule[LogicalPlan] {
> private def compatiblePartitions(ps1 : Seq[Expression], ps2:
> Seq[Expression]): Boolean = {
> ps1.length < ps2.length &&
> ps2.take(ps1.length).permutations.exists(ps1.zip(_).forall {
> case (l, r) => l.semanticEquals(r)
> })
> }
> private def windowsCompatible(w1: Window, w2: Window): Boolean = {
> w1.references.intersect(w2.windowOutputSet).isEmpty &&
> w1.expressions.forall(_.deterministic) &&
> w2.expressions.forall(_.deterministic) &&
> compatiblePartitions(w1.partitionSpec, w2.partitionSpec)
> }
> def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
> _.containsPattern(WINDOW), ruleId) {
> case w1 @ Window(_, _, _, w2 @ Window(_, _, _, grandChild))
> if windowsCompatible(w1, w2) =>
> Project(w1.output, w2.copy(child = w1.copy(child = grandChild)))
> case w1 @ Window(_, _, _, Project(pl, w2 @ Window(_, _, _, grandChild)))
> if windowsCompatible(w1, w2) &&
> w1.references.subsetOf(grandChild.outputSet) =>
> Project(
> pl ++ w1.windowOutputSet,
> w2.copy(child = w1.copy(child = grandChild)))
> }
> } {code}
> but the function compatiblePartitions will only take the first n elements of
> the ps2 sequence, for some cases, this will not take effect, like the case
> below:
> {code:java}
> val df = spark.range(10).selectExpr("id AS a", "id AS b", "id AS c", "id AS
> d")
> df.selectExpr(
> "sum(`d`) OVER(PARTITION BY `b`,`a`) as e",
> "sum(`c`) OVER(PARTITION BY `a`) as f"
> ).explain
> {code}
> Current plan
> {code:java}
> == Physical Plan ==
> *(5) Project [e#10L, f#11L]
> +- Window [sum(c#4L) windowspecdefinition(a#2L,
> specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
> AS f#11L], [a#2L]
> +- *(4) Sort [a#2L ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(a#2L, 200), true, [id=#41]
> +- *(3) Project [a#2L, c#4L, e#10L]
> +- Window [sum(d#5L) windowspecdefinition(b#3L, a#2L,
> specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
> AS e#10L], [b#3L, a#2L]
> +- *(2) Sort [b#3L ASC NULLS FIRST, a#2L ASC NULLS FIRST],
> false, 0
> +- Exchange hashpartitioning(b#3L, a#2L, 200), true,
> [id=#33]
> +- *(1) Project [id#0L AS d#5L, id#0L AS b#3L, id#0L AS
> a#2L, id#0L AS c#4L]
> +- *(1) Range (0, 10, step=1, splits=10) {code}
> Expected plan:
> {code:java}
> == Physical Plan ==
> *(4) Project [e#924L, f#925L]
> +- Window [sum(d#43L) windowspecdefinition(b#41L, a#40L,
> specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
> AS e#924L], [b#41L, a#40L]
> +- *(3) Sort [b#41L ASC NULLS FIRST, a#40L ASC NULLS FIRST], false, 0
> +- *(3) Project [d#43L, b#41L, a#40L, f#925L]
> +- Window [sum(c#42L) windowspecdefinition(a#40L,
> specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()))
> AS f#925L], [a#40L]
> +- *(2) Sort [a#40L ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(a#40L, 200), true, [id=#282]
> +- *(1) Project [id#38L AS d#43L, id#38L AS b#41L, id#38L
> AS a#40L, id#38L AS c#42L]
> +- *(1) Range (0, 10, step=1, splits=10) {code}
> Also the permutations method has a O(n!) time complexity, which is very
> expensive when we have many partition columns, we could try to optimize it.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]