[ https://issues.apache.org/jira/browse/SPARK-38000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17481392#comment-17481392 ]
Bruce Robbins commented on SPARK-38000: --------------------------------------- I can reproduce on 3.2.0, but it seems to be fixed on 3.2.1-RC2: {noformat} +---+----+ | id|rank| +---+----+ | 10| 1| | 11| 1| | 12| 1| | 13| 1| | 14| 1| | 15| 1| | 16| 1| | 17| 1| | 18| 1| | 19| 1| | 20| 1| +---+----+ scala> == Parsed Logical Plan == 'Sort ['id ASC NULLS FIRST], true +- Project [id#1, rank#10] +- Project [id#1, rank#10, rank#10] +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10], [id#1], [id#1 ASC NULLS FIRST] +- Project [id#1] +- Project [id#1] +- Join LeftOuter, (id#1 = id#5) :- LocalRelation [id#1] +- Deduplicate [id#5] +- LocalRelation <empty>, [id#5] == Analyzed Logical Plan == id: string, rank: int Sort [id#1 ASC NULLS FIRST], true +- Project [id#1, rank#10] +- Project [id#1, rank#10, rank#10] +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10], [id#1], [id#1 ASC NULLS FIRST] +- Project [id#1] +- Project [id#1] +- Join LeftOuter, (id#1 = id#5) :- LocalRelation [id#1] +- Deduplicate [id#5] +- LocalRelation <empty>, [id#5] == Optimized Logical Plan == InMemoryRelation [id#1, rank#10], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(2) Sort [id#1 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(id#1 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#21] +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10], [id#1], [id#1 ASC NULLS FIRST] +- *(1) Sort [id#1 ASC NULLS FIRST, id#1 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#1, 200), ENSURE_REQUIREMENTS, [id=#11] +- LocalTableScan [id#1] == Physical Plan == InMemoryTableScan [id#1, rank#10] +- InMemoryRelation [id#1, rank#10], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(2) Sort [id#1 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(id#1 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#21] +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10], [id#1], [id#1 ASC NULLS FIRST] +- *(1) Sort [id#1 ASC NULLS FIRST, id#1 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#1, 200), ENSURE_REQUIREMENTS, [id=#11] +- LocalTableScan [id#1] {noformat} > Sort node incorrectly removed from the optimized logical plan > ------------------------------------------------------------- > > Key: SPARK-38000 > URL: https://issues.apache.org/jira/browse/SPARK-38000 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.2.0 > Environment: Tested on: > Ubuntu 18.04.2 LTS > OpenJDK 1.8.0_312 64-Bit Server VM (build 25.312-b07, mixed mode) > Reporter: Antoine Wendlinger > Priority: Major > Labels: correctness > > When using a fairly involved combination of joins, windows, cache and > orderBy, the sorting phase disappears from the optimized logical plan and the > resulting dataframe is not sorted. > You can find a reproduction of the bug in > [https://github.com/antoinewdg/spark-bug-report|http://example.com/]. > Use {{sbt run}} to get the results. > The bug is very niche, I chose to report it because it looks like a > correctness issue, and may be a symptom of a larger one. > The bug affects only 3.2.0, tests on 3.1.2 show the result correctly sorted. > As far as I could test it, all steps in the reproduction are necessary for > the bug to happen: > * the join with an empty dataframe > * the distinct call on the empty dataframe > * the window function > * the cache after the order by > h2. Code > > {code:scala} > val players = (10 to 20).map(x => Player(id = x.toString)).toDS > val blacklist = sparkSession > .emptyDataset[BlacklistEntry] > .distinct() > val result = players > .join(blacklist, Seq("id"), "left_outer") > .withColumn("rank", > row_number().over(Window.partitionBy("id").orderBy("id"))) > .orderBy("id") > .cache() > result.show() > result.explain(true) > {code} > > h2. Output > > {code:java} > +---+----+ > | id|rank| > +---+----+ > | 15| 1| > | 11| 1| > | 16| 1| > | 18| 1| > | 17| 1| > | 19| 1| > | 20| 1| > | 10| 1| > | 12| 1| > | 13| 1| > | 14| 1| > +---+----+ > == Parsed Logical Plan == > 'Sort ['id ASC NULLS FIRST], true > +- Project [id#1, rank#10] > +- Project [id#1, rank#10, rank#10] > +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS > FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) > AS rank#10], [id#1], [id#1 ASC NULLS FIRST] > +- Project [id#1] > +- Project [id#1] > +- Join LeftOuter, (id#1 = id#5) > :- LocalRelation [id#1] > +- Deduplicate [id#5] > +- LocalRelation <empty>, [id#5] > == Analyzed Logical Plan == > id: string, rank: int > Sort [id#1 ASC NULLS FIRST], true > +- Project [id#1, rank#10] > +- Project [id#1, rank#10, rank#10] > +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS > FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) > AS rank#10], [id#1], [id#1 ASC NULLS FIRST] > +- Project [id#1] > +- Project [id#1] > +- Join LeftOuter, (id#1 = id#5) > :- LocalRelation [id#1] > +- Deduplicate [id#5] > +- LocalRelation <empty>, [id#5] > == Optimized Logical Plan == > InMemoryRelation [id#1, rank#10], StorageLevel(disk, memory, deserialized, 1 > replicas) > +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS FIRST, > specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS > rank#10], [id#1], [id#1 ASC NULLS FIRST] > +- *(1) Sort [id#1 ASC NULLS FIRST, id#1 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#1, 200), ENSURE_REQUIREMENTS, [id=#7] > +- LocalTableScan [id#1] > == Physical Plan == > InMemoryTableScan [id#1, rank#10] > +- InMemoryRelation [id#1, rank#10], StorageLevel(disk, memory, > deserialized, 1 replicas) > +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS > FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) > AS rank#10], [id#1], [id#1 ASC NULLS FIRST] > +- *(1) Sort [id#1 ASC NULLS FIRST, id#1 ASC NULLS FIRST], false, > 0 > +- Exchange hashpartitioning(id#1, 200), ENSURE_REQUIREMENTS, > [id=#7] > +- LocalTableScan [id#1]{quote} > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org