[
https://issues.apache.org/jira/browse/SPARK-26572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16759788#comment-16759788
]
Takeshi Yamamuro commented on SPARK-26572:
------------------------------------------
I checked I could reproduce this below and I set "correctness" in the Label.
It seems HashAggregate hits a bug if it has stateful expressions, e.g.,
monotonically_increasing_id, rand, ...
{code:java}
scala> val baseTable = Seq((1), (1)).toDF("idx")
scala> val distinctWithId = baseTable.distinct.withColumn("id",
functions.monotonically_increasing_id())
scala> baseTable.join(distinctWithId, "idx").show
+---+------------+
|idx| id|
+---+------------+
| 1|369367187456|
| 1|369367187457|
+---+------------+
scala> sql("SET spark.sql.codegen.wholeStage=false")
scala> baseTable.join(distinctWithId, "idx").show
+---+------------+
|idx| id|
+---+------------+
| 1|369367187456|
| 1|369367187456|
+---+------------+
{code}
This is pretty a corner case, so I didn't set a blocker.
> Join on distinct column with monotonically_increasing_id produces wrong output
> ------------------------------------------------------------------------------
>
> Key: SPARK-26572
> URL: https://issues.apache.org/jira/browse/SPARK-26572
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.2.2, 2.4.0
> Environment: Running on Ubuntu 18.04LTS and Intellij 2018.2.5
> Reporter: Sören Reichardt
> Priority: Major
>
> When joining a table with projected monotonically_increasing_id column after
> calling distinct with another table the operators do not get executed in the
> right order.
> Here is a minimal example:
> {code:java}
> import org.apache.spark.sql.{DataFrame, SparkSession, functions}
> object JoinBug extends App {
> // Spark session setup
> val session = SparkSession.builder().master("local[*]").getOrCreate()
> import session.sqlContext.implicits._
> session.sparkContext.setLogLevel("error")
> // Bug in Spark: "monotonically_increasing_id" is pushed down when it
> shouldn't be. Push down only happens when the
> // DF containing the "monotonically_increasing_id" expression is on the
> left side of the join.
> val baseTable = Seq((1), (1)).toDF("idx")
> val distinctWithId = baseTable.distinct.withColumn("id",
> functions.monotonically_increasing_id())
> val monotonicallyOnRight: DataFrame = baseTable.join(distinctWithId, "idx")
> val monotonicallyOnLeft: DataFrame = distinctWithId.join(baseTable, "idx")
> monotonicallyOnLeft.show // Wrong
> monotonicallyOnRight.show // Ok in Spark 2.2.2 - also wrong in Spark 2.4.0
> }
> {code}
> It produces the following output:
> {code:java}
> Wrong:
> +---+------------+
> |idx| id |
> +---+------------+
> | 1|369367187456 |
> | 1|369367187457 |
> +---+------------+
> Right:
> +---+------------+
> |idx| id |
> +---+------------+
> | 1|369367187456 |
> | 1|369367187456 |
> +---+------------+
> {code}
> We assume that the join operator triggers a pushdown of expressions
> (monotonically_increasing_id in this case) which gets pushed down to be
> executed before distinct. This produces non-distinct rows with unique id's.
> However it seems like this behavior only appears if the table with the
> projected expression is on the left side of the join in Spark 2.2.2 (for
> version 2.4.0 it fails on both joins).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]