[ 
https://issues.apache.org/jira/browse/SPARK-26572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sören Reichardt updated SPARK-26572:
------------------------------------
    Description: 
When joining a table with projected monotonically_increasing_id column after 
calling distinct with another table the operators do not get executed in the 
right order. 

Here is a minimal example:
{code:java}
import org.apache.spark.sql.{DataFrame, SparkSession, functions}

object JoinBug extends App {

  // Spark session setup
  val session =  SparkSession.builder().master("local[*]").getOrCreate()
  import session.sqlContext.implicits._
  session.sparkContext.setLogLevel("error")

  // Bug in Spark: "monotonically_increasing_id" is pushed down when it 
shouldn't be. Push down only happens when the
  // DF containing the "monotonically_increasing_id" expression is on the left 
side of the join.
  val baseTable = Seq((1), (1)).toDF("idx")
  val distinctWithId = baseTable.distinct.withColumn("id", 
functions.monotonically_increasing_id())
  val monotonicallyOnRight: DataFrame = baseTable.join(distinctWithId, "idx")
  val monotonicallyOnLeft: DataFrame = distinctWithId.join(baseTable, "idx")

  monotonicallyOnLeft.show // Wrong
  monotonicallyOnRight.show // Ok in Spark 2.2.2 - also wrong in Spark 2.4.0

}

{code}
It produces the following output:
{code:java}
Wrong:
+---+------------+
|idx| id         |
+---+------------+
| 1|369367187456 |
| 1|369367187457 |
+---+------------+

Right:
+---+------------+
|idx| id         |
+---+------------+
| 1|369367187456 |
| 1|369367187456 |
+---+------------+
{code}
We assume that the join operator triggers a pushdown of expressions 
(monotonically_increasing_id in this case) which gets pushed down to be 
executed before distinct. This produces non-distinct rows with unique id's. 
However it seems like this behavior only appears if the table with the 
projected expression is on the left side of the join in Spark 2.2.2 (for 
version 2.4.0 it fails on both joins).

  was:
When joining a table with projected monotonically_increasing_id column after 
calling distinct with another table the operators do not get executed in the 
right order. 

Here is a minimal example:
{code:java}
import org.apache.spark.sql.{DataFrame, SparkSession, functions}

object JoinBug extends App {

  // Spark session setup
  val session =  SparkSession.builder().master("local[*]").getOrCreate()
  import session.sqlContext.implicits._
  session.sparkContext.setLogLevel("error")

  // Bug in Spark: "monotonically_increasing_id" is pushed down when it 
shouldn't be. Push down only happens when the
  // DF containing the "monotonically_increasing_id" expression is on the left 
side of the join.
  val baseTable = Seq((1), (1)).toDF("idx")
  val distinctWithId = baseTable.distinct.withColumn("id", 
functions.monotonically_increasing_id())
  val monotonicallyOnRight: DataFrame = baseTable.join(distinctWithId, "idx")
  val monotonicallyOnLeft: DataFrame = distinctWithId.join(baseTable, "idx")

  monotonicallyOnLeft.show // Wrong
  monotonicallyOnRight.show // Ok in Spark 2.2.2 - also wrong in Spark 2.4.0

}

{code}
It produces the following output:
{code:java}
Wrong:
+---+------------+
|idx| id         |
+---+------------+
| 1|369367187456 |
| 1|369367187457 |
+---+------------+

Right:
+---+------------+
|idx| id         |
+---+------------+
| 1|369367187456 |
| 1|369367187456 |
+---+------------+
{code}
We assume that the join operator triggers a pushdown of expressions 
(monotonically_increasing_id in this case) which gets pushed down to be 
executed before distinct. This produces non-distinct rows with unique id's. 
However it seems like this behavior only appears if the table with the 
projected expression is on the left side of the join.


> Join on distinct column with monotonically_increasing_id produces wrong output
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-26572
>                 URL: https://issues.apache.org/jira/browse/SPARK-26572
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.2, 2.4.0
>         Environment: Running on Ubuntu 18.04LTS and Intellij 2018.2.5
>            Reporter: Sören Reichardt
>            Priority: Major
>
> When joining a table with projected monotonically_increasing_id column after 
> calling distinct with another table the operators do not get executed in the 
> right order. 
> Here is a minimal example:
> {code:java}
> import org.apache.spark.sql.{DataFrame, SparkSession, functions}
> object JoinBug extends App {
>   // Spark session setup
>   val session =  SparkSession.builder().master("local[*]").getOrCreate()
>   import session.sqlContext.implicits._
>   session.sparkContext.setLogLevel("error")
>   // Bug in Spark: "monotonically_increasing_id" is pushed down when it 
> shouldn't be. Push down only happens when the
>   // DF containing the "monotonically_increasing_id" expression is on the 
> left side of the join.
>   val baseTable = Seq((1), (1)).toDF("idx")
>   val distinctWithId = baseTable.distinct.withColumn("id", 
> functions.monotonically_increasing_id())
>   val monotonicallyOnRight: DataFrame = baseTable.join(distinctWithId, "idx")
>   val monotonicallyOnLeft: DataFrame = distinctWithId.join(baseTable, "idx")
>   monotonicallyOnLeft.show // Wrong
>   monotonicallyOnRight.show // Ok in Spark 2.2.2 - also wrong in Spark 2.4.0
> }
> {code}
> It produces the following output:
> {code:java}
> Wrong:
> +---+------------+
> |idx| id         |
> +---+------------+
> | 1|369367187456 |
> | 1|369367187457 |
> +---+------------+
> Right:
> +---+------------+
> |idx| id         |
> +---+------------+
> | 1|369367187456 |
> | 1|369367187456 |
> +---+------------+
> {code}
> We assume that the join operator triggers a pushdown of expressions 
> (monotonically_increasing_id in this case) which gets pushed down to be 
> executed before distinct. This produces non-distinct rows with unique id's. 
> However it seems like this behavior only appears if the table with the 
> projected expression is on the left side of the join in Spark 2.2.2 (for 
> version 2.4.0 it fails on both joins).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to