[ 
https://issues.apache.org/jira/browse/SPARK-38000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-38000.
----------------------------------
    Resolution: Cannot Reproduce

> Sort node incorrectly removed from the optimized logical plan
> -------------------------------------------------------------
>
>                 Key: SPARK-38000
>                 URL: https://issues.apache.org/jira/browse/SPARK-38000
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.2.0
>         Environment: Tested on:
> Ubuntu 18.04.2 LTS
> OpenJDK 1.8.0_312 64-Bit Server VM (build 25.312-b07, mixed mode)
>            Reporter: Antoine Wendlinger
>            Priority: Major
>              Labels: correctness
>
> When using a fairly involved combination of joins, windows, cache and 
> orderBy, the sorting phase disappears from the optimized logical plan and the 
> resulting dataframe is not sorted.
> You can find a reproduction of the bug in 
> [https://github.com/antoinewdg/spark-bug-report|http://example.com/].
> Use {{sbt run}} to get the results.
> The bug is very niche, I chose to report it because it looks like a 
> correctness issue, and may be a symptom of a larger one.
> The bug affects only 3.2.0, tests on 3.1.2 show the result correctly sorted.
> As far as I could test it, all steps in the reproduction are necessary for 
> the bug to happen:
>  * the join with an empty dataframe
>  * the distinct call on the empty dataframe
>  * the window function
>  * the cache after the order by
> h2. Code
>  
> {code:scala}
>   val players = (10 to 20).map(x => Player(id = x.toString)).toDS
>   val blacklist = sparkSession
>     .emptyDataset[BlacklistEntry]
>     .distinct()
>   val result = players
>     .join(blacklist, Seq("id"), "left_outer")
>     .withColumn("rank", 
> row_number().over(Window.partitionBy("id").orderBy("id")))
>     .orderBy("id")
>     .cache()
>   result.show()
>   result.explain(true)
> {code}
>  
> h2. Output
>  
> {code:java}
> +---+----+
> | id|rank|
> +---+----+
> | 15|   1|
> | 11|   1|
> | 16|   1|
> | 18|   1|
> | 17|   1|
> | 19|   1|
> | 20|   1|
> | 10|   1|
> | 12|   1|
> | 13|   1|
> | 14|   1|
> +---+----+
> == Parsed Logical Plan ==
> 'Sort ['id ASC NULLS FIRST], true
> +- Project [id#1, rank#10]
>    +- Project [id#1, rank#10, rank#10]
>       +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS 
> FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) 
> AS rank#10], [id#1], [id#1 ASC NULLS FIRST]
>          +- Project [id#1]
>             +- Project [id#1]
>                +- Join LeftOuter, (id#1 = id#5)
>                   :- LocalRelation [id#1]
>                   +- Deduplicate [id#5]
>                      +- LocalRelation <empty>, [id#5]
> == Analyzed Logical Plan ==
> id: string, rank: int
> Sort [id#1 ASC NULLS FIRST], true
> +- Project [id#1, rank#10]
>    +- Project [id#1, rank#10, rank#10]
>       +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS 
> FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) 
> AS rank#10], [id#1], [id#1 ASC NULLS FIRST]
>          +- Project [id#1]
>             +- Project [id#1]
>                +- Join LeftOuter, (id#1 = id#5)
>                   :- LocalRelation [id#1]
>                   +- Deduplicate [id#5]
>                      +- LocalRelation <empty>, [id#5]
> == Optimized Logical Plan ==
> InMemoryRelation [id#1, rank#10], StorageLevel(disk, memory, deserialized, 1 
> replicas)
>    +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS FIRST, 
> specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
> rank#10], [id#1], [id#1 ASC NULLS FIRST]
>       +- *(1) Sort [id#1 ASC NULLS FIRST, id#1 ASC NULLS FIRST], false, 0
>          +- Exchange hashpartitioning(id#1, 200), ENSURE_REQUIREMENTS, [id=#7]
>             +- LocalTableScan [id#1]
> == Physical Plan ==
> InMemoryTableScan [id#1, rank#10]
>    +- InMemoryRelation [id#1, rank#10], StorageLevel(disk, memory, 
> deserialized, 1 replicas)
>          +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS 
> FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) 
> AS rank#10], [id#1], [id#1 ASC NULLS FIRST]
>             +- *(1) Sort [id#1 ASC NULLS FIRST, id#1 ASC NULLS FIRST], false, > 0
>                +- Exchange hashpartitioning(id#1, 200), ENSURE_REQUIREMENTS, 
> [id=#7]
>                   +- LocalTableScan [id#1]{quote}
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to