[
https://issues.apache.org/jira/browse/SPARK-46365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17795932#comment-17795932
]
Josh Rosen commented on SPARK-46365:
------------------------------------
This issue appears to be fixed in
[https://github.com/apache/spark/tree/branch-3.5] but I'm not sure by which
commit.
> Spark 3.5.0 Regression: Window Function Combination Yields Null Values
> -----------------------------------------------------------------------
>
> Key: SPARK-46365
> URL: https://issues.apache.org/jira/browse/SPARK-46365
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.5.0
> Reporter: Boris PEREVALOV
> Priority: Major
>
> When combining two window functions (first one to get the previous non-null
> value, second one to get the latest rows only), the result is not correct
> since version 3.5.0.
>
> Here is a simple Scala example:
>
> {code:java}
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql.functions._
>
> case class Event(timestamp: Long, id: Long, value: String)
>
> val events = Seq(
> Event(timestamp = 1702289001, id = 1 , value = "non-null value"),
> Event(timestamp = 1702289002, id = 1 , value = "new non-null value"),
> Event(timestamp = 1702289003, id = 1 , value = null),
> Event(timestamp = 1702289004, id = 2 , value = "non-null value"),
> Event(timestamp = 1702289005, id = 2 , value = null),
> ).toDF
>
> val window = Window.partitionBy("id").orderBy($"timestamp".desc)
>
> val eventsWithLatestNonNullValue = events
> .withColumn(
> "value",
> first("value", ignoreNulls = true) over
> window.rangeBetween(Window.currentRow, Window.unboundedFollowing)
> )
>
> eventsWithLatestNonNullValue.show
>
> val latestEvents = eventsWithLatestNonNullValue
> .withColumn("n", row_number over window)
> .where("n = 1")
> .drop("n")
>
> latestEvents.show
> {code}
>
>
> Current result (Spark 3.5.0)
>
> {code:java}
> +----------+---+-----+
> | timestamp| id|value|
> +----------+---+-----+
> |1702289003| 1| NULL|
> |1702289005| 2| NULL|
> +----------+---+-----+
> {code}
>
>
> Expected result (all versions > 3.5.0):
>
> {code:java}
> +----------+---+------------------+
> | timestamp| id| value|
> +----------+---+------------------+
> |1702289003| 1|new non-null value|
> |1702289005| 2| non-null value|
> +----------+---+------------------+
> {code}
>
> Execution plans are different.
>
> Spark 3.5.0:
>
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Project [timestamp#1856L, id#1857L, value#1867]
> +- Filter (n#1887 = 1)
> +- Window [first(value#1858, true) windowspecdefinition(id#1857L,
> timestamp#1856L DESC NULLS LAST, specifiedwindowframe(RangeFrame,
> currentrow$(), unboundedfollowing$())) AS value#1867, row_number()
> windowspecdefinition(id#1857L, timestamp#1856L DESC NULLS LAST,
> specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS
> n#1887], [id#1857L], [timestamp#1856L DESC NULLS LAST]
> +- WindowGroupLimit [id#1857L], [timestamp#1856L DESC NULLS LAST],
> row_number(), 1, Final
> +- Sort [id#1857L ASC NULLS FIRST, timestamp#1856L DESC NULLS
> LAST], false, 0
> +- Exchange hashpartitioning(id#1857L, 200),
> ENSURE_REQUIREMENTS, [plan_id=326]
> +- WindowGroupLimit [id#1857L], [timestamp#1856L DESC NULLS
> LAST], row_number(), 1, Partial
> +- Sort [id#1857L ASC NULLS FIRST, timestamp#1856L DESC
> NULLS LAST], false, 0
> +- LocalTableScan [timestamp#1856L, id#1857L,
> value#1858]
> {code}
>
> Spark 3.4.0:
>
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Project [timestamp#6L, id#7L, value#17]
> +- Filter (n#37 = 1)
> +- Window [first(value#8, true) windowspecdefinition(id#7L,
> timestamp#6L DESC NULLS LAST, specifiedwindowframe(RangeFrame, currentrow$(),
> unboundedfollowing$())) AS value#17, row_number() windowspecdefinition(id#7L,
> timestamp#6L DESC NULLS LAST, specifiedwindowframe(RowFrame,
> unboundedpreceding$(), currentrow$())) AS n#37], [id#7L], [timestamp#6L DESC
> NULLS LAST]
> +- Sort [id#7L ASC NULLS FIRST, timestamp#6L DESC NULLS LAST],
> false, 0
> +- Exchange hashpartitioning(id#7L, 200), ENSURE_REQUIREMENTS,
> [plan_id=60]
> +- LocalTableScan [timestamp#6L, id#7L, value#8]
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]