[
https://issues.apache.org/jira/browse/SPARK-42525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
zhuml updated SPARK-42525:
--------------------------
Description:
Extend the CollapseWindow rule to collapse Window nodes, when one window in
subquery and the qualifiers are different.
{code:java}
select a, b, c, row_number() over (partition by a order by b) as d from
( select a, b, rank() over (partition by a order by b) as c from t1) t2
== Optimized Logical Plan ==
before
Window [row_number() windowspecdefinition(a#11, b#12 ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#26],
[a#11], [b#12 ASC NULLS FIRST]
+- Window [rank(b#12) windowspecdefinition(a#11, b#12 ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS c#25],
[a#11], [b#12 ASC NULLS FIRST]
+- InMemoryRelation [a#11, b#12], StorageLevel(disk, memory, deserialized, 1
replicas)
+- *(1) Project [_1#6 AS a#11, _2#7 AS b#12]
+- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0,
scala.Tuple2, true]))._1 AS _1#6, knownnotnull(assertnotnull(input[0,
scala.Tuple2, true]))._2 AS _2#7]
+- *(1) MapElements
org.apache.spark.sql.DataFrameSuite$$Lambda$1517/1628848368@3a479fda, obj#5:
scala.Tuple2
+- *(1) DeserializeToObject staticinvoke(class
java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#0L, true, false,
true), obj#4: java.lang.Long
+- *(1) Range (0, 10, step=1, splits=2)
after
Window [rank(b#12) windowspecdefinition(a#11, b#12 ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS c#25,
row_number() windowspecdefinition(a#11, b#12 ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#26],
[a#11], [b#12 ASC NULLS FIRST]
+- InMemoryRelation [a#11, b#12], StorageLevel(disk, memory, deserialized, 1
replicas)
+- *(1) Project [_1#6 AS a#11, _2#7 AS b#12]
+- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0,
scala.Tuple2, true]))._1 AS _1#6, knownnotnull(assertnotnull(input[0,
scala.Tuple2, true]))._2 AS _2#7]
+- *(1) MapElements
org.apache.spark.sql.DataFrameSuite$$Lambda$1518/1928028672@4d7a64ca, obj#5:
scala.Tuple2
+- *(1) DeserializeToObject staticinvoke(class java.lang.Long,
ObjectType(class java.lang.Long), valueOf, id#0L, true, false, true), obj#4:
java.lang.Long
+- *(1) Range (0, 10, step=1, splits=2){code}
was:
Extend the CollapseWindow rule to collapse Window nodes, when one window in
subquery.
{code:java}
select a, b, c, row_number() over (partition by a order by b) as d from
( select a, b, rank() over (partition by a order by b) as c from t1) t2
== Optimized Logical Plan ==
before
Window [row_number() windowspecdefinition(a#11, b#12 ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#26],
[a#11], [b#12 ASC NULLS FIRST]
+- Window [rank(b#12) windowspecdefinition(a#11, b#12 ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS c#25],
[a#11], [b#12 ASC NULLS FIRST]
+- InMemoryRelation [a#11, b#12], StorageLevel(disk, memory, deserialized, 1
replicas)
+- *(1) Project [_1#6 AS a#11, _2#7 AS b#12]
+- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0,
scala.Tuple2, true]))._1 AS _1#6, knownnotnull(assertnotnull(input[0,
scala.Tuple2, true]))._2 AS _2#7]
+- *(1) MapElements
org.apache.spark.sql.DataFrameSuite$$Lambda$1517/1628848368@3a479fda, obj#5:
scala.Tuple2
+- *(1) DeserializeToObject staticinvoke(class
java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#0L, true, false,
true), obj#4: java.lang.Long
+- *(1) Range (0, 10, step=1, splits=2)
after
Window [rank(b#12) windowspecdefinition(a#11, b#12 ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS c#25,
row_number() windowspecdefinition(a#11, b#12 ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS d#26],
[a#11], [b#12 ASC NULLS FIRST]
+- InMemoryRelation [a#11, b#12], StorageLevel(disk, memory, deserialized, 1
replicas)
+- *(1) Project [_1#6 AS a#11, _2#7 AS b#12]
+- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0,
scala.Tuple2, true]))._1 AS _1#6, knownnotnull(assertnotnull(input[0,
scala.Tuple2, true]))._2 AS _2#7]
+- *(1) MapElements
org.apache.spark.sql.DataFrameSuite$$Lambda$1518/1928028672@4d7a64ca, obj#5:
scala.Tuple2
+- *(1) DeserializeToObject staticinvoke(class java.lang.Long,
ObjectType(class java.lang.Long), valueOf, id#0L, true, false, true), obj#4:
java.lang.Long
+- *(1) Range (0, 10, step=1, splits=2){code}
> Collapse two adjacent windows with semantically-same partition/order
> --------------------------------------------------------------------
>
> Key: SPARK-42525
> URL: https://issues.apache.org/jira/browse/SPARK-42525
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 3.2.3
> Reporter: zhuml
> Assignee: zhuml
> Priority: Major
> Fix For: 3.5.0
>
>
> Extend the CollapseWindow rule to collapse Window nodes, when one window in
> subquery and the qualifiers are different.
>
> {code:java}
> select a, b, c, row_number() over (partition by a order by b) as d from
> ( select a, b, rank() over (partition by a order by b) as c from t1) t2
> == Optimized Logical Plan ==
> before
> Window [row_number() windowspecdefinition(a#11, b#12 ASC NULLS FIRST,
> specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
> d#26], [a#11], [b#12 ASC NULLS FIRST]
> +- Window [rank(b#12) windowspecdefinition(a#11, b#12 ASC NULLS FIRST,
> specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
> c#25], [a#11], [b#12 ASC NULLS FIRST]
> +- InMemoryRelation [a#11, b#12], StorageLevel(disk, memory, deserialized,
> 1 replicas)
> +- *(1) Project [_1#6 AS a#11, _2#7 AS b#12]
> +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0,
> scala.Tuple2, true]))._1 AS _1#6, knownnotnull(assertnotnull(input[0,
> scala.Tuple2, true]))._2 AS _2#7]
> +- *(1) MapElements
> org.apache.spark.sql.DataFrameSuite$$Lambda$1517/1628848368@3a479fda, obj#5:
> scala.Tuple2
> +- *(1) DeserializeToObject staticinvoke(class
> java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#0L, true,
> false, true), obj#4: java.lang.Long
> +- *(1) Range (0, 10, step=1, splits=2)
> after
> Window [rank(b#12) windowspecdefinition(a#11, b#12 ASC NULLS FIRST,
> specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
> c#25, row_number() windowspecdefinition(a#11, b#12 ASC NULLS FIRST,
> specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
> d#26], [a#11], [b#12 ASC NULLS FIRST]
> +- InMemoryRelation [a#11, b#12], StorageLevel(disk, memory, deserialized, 1
> replicas)
> +- *(1) Project [_1#6 AS a#11, _2#7 AS b#12]
> +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0,
> scala.Tuple2, true]))._1 AS _1#6, knownnotnull(assertnotnull(input[0,
> scala.Tuple2, true]))._2 AS _2#7]
> +- *(1) MapElements
> org.apache.spark.sql.DataFrameSuite$$Lambda$1518/1928028672@4d7a64ca, obj#5:
> scala.Tuple2
> +- *(1) DeserializeToObject staticinvoke(class java.lang.Long,
> ObjectType(class java.lang.Long), valueOf, id#0L, true, false, true), obj#4:
> java.lang.Long
> +- *(1) Range (0, 10, step=1, splits=2){code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]