[ 
https://issues.apache.org/jira/browse/SPARK-46365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17796054#comment-17796054
 ] 

Josh Rosen commented on SPARK-46365:
------------------------------------

I think that this is a duplicate of SPARK-45543, which is fixed in the 
forthcoming Spark 3.5.1.

I figured this out by running `git bisect` in `branch-3.5` with the above 
reproduction.

> Spark 3.5.0 Regression: Window Function Combination Yields Null Values 
> -----------------------------------------------------------------------
>
>                 Key: SPARK-46365
>                 URL: https://issues.apache.org/jira/browse/SPARK-46365
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.5.0
>            Reporter: Boris PEREVALOV
>            Priority: Major
>
> When combining two window functions (first one to get the previous non-null 
> value, second one to get the latest rows only), the result is not correct 
> since version 3.5.0.
>  
> Here is a simple Scala example: 
>  
> {code:java}
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql.functions._
>  
> case class Event(timestamp: Long, id: Long, value: String)
>  
> val events = Seq(
>  Event(timestamp = 1702289001, id = 1 , value = "non-null value"),
>  Event(timestamp = 1702289002, id = 1 , value = "new non-null value"),
>  Event(timestamp = 1702289003, id = 1 , value = null),
>  Event(timestamp = 1702289004, id = 2 , value = "non-null value"),
>  Event(timestamp = 1702289005, id = 2 , value = null),
> ).toDF
>  
> val window = Window.partitionBy("id").orderBy($"timestamp".desc)
>  
> val eventsWithLatestNonNullValue = events
>  .withColumn(
>    "value",
>    first("value", ignoreNulls = true) over 
> window.rangeBetween(Window.currentRow, Window.unboundedFollowing)
>  )
>  
> eventsWithLatestNonNullValue.show
>  
> val latestEvents = eventsWithLatestNonNullValue
>  .withColumn("n", row_number over window)
>  .where("n = 1")
>  .drop("n")
>  
> latestEvents.show
> {code}
>  
>  
> Current result (Spark 3.5.0)
>  
> {code:java}
> +----------+---+-----+
> | timestamp| id|value|
> +----------+---+-----+
> |1702289003|  1| NULL|
> |1702289005|  2| NULL|
> +----------+---+-----+
> {code}
>  
>  
> Expected result (all versions > 3.5.0):
>  
> {code:java}
> +----------+---+------------------+
> | timestamp| id|             value|
> +----------+---+------------------+
> |1702289003|  1|new non-null value|
> |1702289005|  2|    non-null value|
> +----------+---+------------------+
> {code}
>  
> Execution plans are different.
>  
> Spark 3.5.0: 
>  
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Project [timestamp#1856L, id#1857L, value#1867]
>   +- Filter (n#1887 = 1)
>      +- Window [first(value#1858, true) windowspecdefinition(id#1857L, 
> timestamp#1856L DESC NULLS LAST, specifiedwindowframe(RangeFrame, 
> currentrow$(), unboundedfollowing$())) AS value#1867, row_number() 
> windowspecdefinition(id#1857L, timestamp#1856L DESC NULLS LAST, 
> specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
> n#1887], [id#1857L], [timestamp#1856L DESC NULLS LAST]
>         +- WindowGroupLimit [id#1857L], [timestamp#1856L DESC NULLS LAST], 
> row_number(), 1, Final
>            +- Sort [id#1857L ASC NULLS FIRST, timestamp#1856L DESC NULLS 
> LAST], false, 0
>               +- Exchange hashpartitioning(id#1857L, 200), 
> ENSURE_REQUIREMENTS, [plan_id=326]
>                  +- WindowGroupLimit [id#1857L], [timestamp#1856L DESC NULLS 
> LAST], row_number(), 1, Partial
>                     +- Sort [id#1857L ASC NULLS FIRST, timestamp#1856L DESC 
> NULLS LAST], false, 0
>                        +- LocalTableScan [timestamp#1856L, id#1857L, 
> value#1858]
> {code}
>  
> Spark 3.4.0:
>  
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Project [timestamp#6L, id#7L, value#17]
>    +- Filter (n#37 = 1)
>       +- Window [first(value#8, true) windowspecdefinition(id#7L, 
> timestamp#6L DESC NULLS LAST, specifiedwindowframe(RangeFrame, currentrow$(), 
> unboundedfollowing$())) AS value#17, row_number() windowspecdefinition(id#7L, 
> timestamp#6L DESC NULLS LAST, specifiedwindowframe(RowFrame, 
> unboundedpreceding$(), currentrow$())) AS n#37], [id#7L], [timestamp#6L DESC 
> NULLS LAST]
>          +- Sort [id#7L ASC NULLS FIRST, timestamp#6L DESC NULLS LAST], 
> false, 0
>             +- Exchange hashpartitioning(id#7L, 200), ENSURE_REQUIREMENTS, 
> [plan_id=60]
>                +- LocalTableScan [timestamp#6L, id#7L, value#8]
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to