[ 
https://issues.apache.org/jira/browse/SPARK-26572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16759788#comment-16759788
 ] 

Takeshi Yamamuro commented on SPARK-26572:
------------------------------------------

I checked I could reproduce this below and I set "correctness" in the Label.
It seems HashAggregate hits a bug if it has stateful expressions, e.g., 
monotonically_increasing_id, rand, ...
{code:java}
scala> val baseTable = Seq((1), (1)).toDF("idx")
scala> val distinctWithId = baseTable.distinct.withColumn("id", 
functions.monotonically_increasing_id())
scala> baseTable.join(distinctWithId, "idx").show
+---+------------+                                                              
|idx|          id|
+---+------------+
|  1|369367187456|
|  1|369367187457|
+---+------------+

scala> sql("SET spark.sql.codegen.wholeStage=false")
scala> baseTable.join(distinctWithId, "idx").show
+---+------------+
|idx|          id|
+---+------------+
|  1|369367187456|
|  1|369367187456|
+---+------------+
{code}
This is pretty a corner case, so I didn't set a blocker.

> Join on distinct column with monotonically_increasing_id produces wrong output
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-26572
>                 URL: https://issues.apache.org/jira/browse/SPARK-26572
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.2, 2.4.0
>         Environment: Running on Ubuntu 18.04LTS and Intellij 2018.2.5
>            Reporter: Sören Reichardt
>            Priority: Major
>
> When joining a table with projected monotonically_increasing_id column after 
> calling distinct with another table the operators do not get executed in the 
> right order. 
> Here is a minimal example:
> {code:java}
> import org.apache.spark.sql.{DataFrame, SparkSession, functions}
> object JoinBug extends App {
>   // Spark session setup
>   val session =  SparkSession.builder().master("local[*]").getOrCreate()
>   import session.sqlContext.implicits._
>   session.sparkContext.setLogLevel("error")
>   // Bug in Spark: "monotonically_increasing_id" is pushed down when it 
> shouldn't be. Push down only happens when the
>   // DF containing the "monotonically_increasing_id" expression is on the 
> left side of the join.
>   val baseTable = Seq((1), (1)).toDF("idx")
>   val distinctWithId = baseTable.distinct.withColumn("id", 
> functions.monotonically_increasing_id())
>   val monotonicallyOnRight: DataFrame = baseTable.join(distinctWithId, "idx")
>   val monotonicallyOnLeft: DataFrame = distinctWithId.join(baseTable, "idx")
>   monotonicallyOnLeft.show // Wrong
>   monotonicallyOnRight.show // Ok in Spark 2.2.2 - also wrong in Spark 2.4.0
> }
> {code}
> It produces the following output:
> {code:java}
> Wrong:
> +---+------------+
> |idx| id         |
> +---+------------+
> | 1|369367187456 |
> | 1|369367187457 |
> +---+------------+
> Right:
> +---+------------+
> |idx| id         |
> +---+------------+
> | 1|369367187456 |
> | 1|369367187456 |
> +---+------------+
> {code}
> We assume that the join operator triggers a pushdown of expressions 
> (monotonically_increasing_id in this case) which gets pushed down to be 
> executed before distinct. This produces non-distinct rows with unique id's. 
> However it seems like this behavior only appears if the table with the 
> projected expression is on the left side of the join in Spark 2.2.2 (for 
> version 2.4.0 it fails on both joins).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to