[ 
https://issues.apache.org/jira/browse/SPARK-24495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-24495.
-----------------------------
       Resolution: Fixed
         Assignee: Marco Gaido
    Fix Version/s: 2.4.0
                   2.3.2

> SortMergeJoin with duplicate keys wrong results
> -----------------------------------------------
>
>                 Key: SPARK-24495
>                 URL: https://issues.apache.org/jira/browse/SPARK-24495
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0
>            Reporter: Bogdan Raducanu
>            Assignee: Marco Gaido
>            Priority: Major
>              Labels: correctness
>             Fix For: 2.3.2, 2.4.0
>
>
> To reproduce:
> {code:java}
> // the bug is in SortMergeJoin but the Shuffles are correct. with the default 
> 200 it might split the data in such small partitions that the SortMergeJoin 
> cannot return wrong results anymore
> spark.conf.set("spark.sql.shuffle.partitions", "1")
> // disable this, otherwise it would filter results before join, hiding the bug
> spark.conf.set("spark.sql.constraintPropagation.enabled", "false")
> sql("select id as a1 from range(1000)").createOrReplaceTempView("t1")
> sql("select id * 2 as b1, -id as b2 from 
> range(1000)").createOrReplaceTempView("t2")
> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
> sql("""select b1, a1, b2 FROM t1 INNER JOIN t2 ON b1 = a1 AND b2 = a1""").show
> {code}
> In the results, it's expected that all columns are equal (see join condition).
> But the result is:
> {code:java}
> +---+---+---+
> | b1| a1| b2|
> +---+---+---+
> |  0|  0|  0|
> |  2|  2| -1|
> |  4|  4| -2|
> |  6|  6| -3|
> |  8|  8| -4|
> ....
> {code}
> I traced it to {{EnsureRequirements.reorder}} which was introduced by 
> [https://github.com/apache/spark/pull/16985] and 
> [https://github.com/apache/spark/pull/20041]
> It leads to an incorrect plan:
> {code:java}
> == Physical Plan ==
> *(5) Project [b1#735672L, a1#735669L, b2#735673L]
> +- *(5) SortMergeJoin [a1#735669L, a1#735669L], [b1#735672L, b1#735672L], 
> Inner
>    :- *(2) Sort [a1#735669L ASC NULLS FIRST, a1#735669L ASC NULLS FIRST], 
> false, 0
>    :  +- Exchange hashpartitioning(a1#735669L, a1#735669L, 1)
>    :     +- *(1) Project [id#735670L AS a1#735669L]
>    :        +- *(1) Range (0, 1000, step=1, splits=8)
>    +- *(4) Sort [b1#735672L ASC NULLS FIRST, b2#735673L ASC NULLS FIRST], 
> false, 0
>       +- Exchange hashpartitioning(b1#735672L, b2#735673L, 1)
>          +- *(3) Project [(id#735674L * 2) AS b1#735672L, -id#735674L AS 
> b2#735673L]
>             +- *(3) Range (0, 1000, step=1, splits=8)
> {code}
> The SortMergeJoin keys are wrong: key b2 is missing completely.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to