[ https://issues.apache.org/jira/browse/SPARK-46365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17796054#comment-17796054 ]
Josh Rosen commented on SPARK-46365: ------------------------------------ I think that this is a duplicate of SPARK-45543, which is fixed in the forthcoming Spark 3.5.1. I figured this out by running `git bisect` in `branch-3.5` with the above reproduction. > Spark 3.5.0 Regression: Window Function Combination Yields Null Values > ----------------------------------------------------------------------- > > Key: SPARK-46365 > URL: https://issues.apache.org/jira/browse/SPARK-46365 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.5.0 > Reporter: Boris PEREVALOV > Priority: Major > > When combining two window functions (first one to get the previous non-null > value, second one to get the latest rows only), the result is not correct > since version 3.5.0. > > Here is a simple Scala example: > > {code:java} > import org.apache.spark.sql.expressions.Window > import org.apache.spark.sql.functions._ > > case class Event(timestamp: Long, id: Long, value: String) > > val events = Seq( > Event(timestamp = 1702289001, id = 1 , value = "non-null value"), > Event(timestamp = 1702289002, id = 1 , value = "new non-null value"), > Event(timestamp = 1702289003, id = 1 , value = null), > Event(timestamp = 1702289004, id = 2 , value = "non-null value"), > Event(timestamp = 1702289005, id = 2 , value = null), > ).toDF > > val window = Window.partitionBy("id").orderBy($"timestamp".desc) > > val eventsWithLatestNonNullValue = events > .withColumn( > "value", > first("value", ignoreNulls = true) over > window.rangeBetween(Window.currentRow, Window.unboundedFollowing) > ) > > eventsWithLatestNonNullValue.show > > val latestEvents = eventsWithLatestNonNullValue > .withColumn("n", row_number over window) > .where("n = 1") > .drop("n") > > latestEvents.show > {code} > > > Current result (Spark 3.5.0) > > {code:java} > +----------+---+-----+ > | timestamp| id|value| > +----------+---+-----+ > |1702289003| 1| NULL| > |1702289005| 2| NULL| > +----------+---+-----+ > {code} > > > Expected result (all versions > 3.5.0): > > {code:java} > +----------+---+------------------+ > | timestamp| id| value| > +----------+---+------------------+ > |1702289003| 1|new non-null value| > |1702289005| 2| non-null value| > +----------+---+------------------+ > {code} > > Execution plans are different. > > Spark 3.5.0: > > {code:java} > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- Project [timestamp#1856L, id#1857L, value#1867] > +- Filter (n#1887 = 1) > +- Window [first(value#1858, true) windowspecdefinition(id#1857L, > timestamp#1856L DESC NULLS LAST, specifiedwindowframe(RangeFrame, > currentrow$(), unboundedfollowing$())) AS value#1867, row_number() > windowspecdefinition(id#1857L, timestamp#1856L DESC NULLS LAST, > specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS > n#1887], [id#1857L], [timestamp#1856L DESC NULLS LAST] > +- WindowGroupLimit [id#1857L], [timestamp#1856L DESC NULLS LAST], > row_number(), 1, Final > +- Sort [id#1857L ASC NULLS FIRST, timestamp#1856L DESC NULLS > LAST], false, 0 > +- Exchange hashpartitioning(id#1857L, 200), > ENSURE_REQUIREMENTS, [plan_id=326] > +- WindowGroupLimit [id#1857L], [timestamp#1856L DESC NULLS > LAST], row_number(), 1, Partial > +- Sort [id#1857L ASC NULLS FIRST, timestamp#1856L DESC > NULLS LAST], false, 0 > +- LocalTableScan [timestamp#1856L, id#1857L, > value#1858] > {code} > > Spark 3.4.0: > > {code:java} > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- Project [timestamp#6L, id#7L, value#17] > +- Filter (n#37 = 1) > +- Window [first(value#8, true) windowspecdefinition(id#7L, > timestamp#6L DESC NULLS LAST, specifiedwindowframe(RangeFrame, currentrow$(), > unboundedfollowing$())) AS value#17, row_number() windowspecdefinition(id#7L, > timestamp#6L DESC NULLS LAST, specifiedwindowframe(RowFrame, > unboundedpreceding$(), currentrow$())) AS n#37], [id#7L], [timestamp#6L DESC > NULLS LAST] > +- Sort [id#7L ASC NULLS FIRST, timestamp#6L DESC NULLS LAST], > false, 0 > +- Exchange hashpartitioning(id#7L, 200), ENSURE_REQUIREMENTS, > [plan_id=60] > +- LocalTableScan [timestamp#6L, id#7L, value#8] > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org