[ 
https://issues.apache.org/jira/browse/SPARK-19954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arun Allamsetty updated SPARK-19954:
------------------------------------
    Description: 
I found this bug while trying to update from Spark 1.6.1 to 2.1.0. The bug is 
that when we try to join two DataFrames, one of which is a result of a union 
operation, the result of the join results in data as if the table was joined 
only to the first table in the union. This issue is not present in Spark 2.0.0 
or 2.0.1 or 2.0.2, only in 2.1.0. Here's how to reproduce it.

{noformat}
import spark.implicits._
import org.apache.spark.sql.functions.lit

case class A(id: Long, colA: Boolean)
case class B(id: Long, colB: Int)
case class C(id: Long, colC: Double)
case class X(id: Long, name: String)

val aData = A(1, true) :: Nil
val bData = B(2, 10) :: Nil
val cData = C(3, 9.73D) :: Nil
val xData = X(1, "a") :: X(2, "b") :: X(3, "c") :: Nil

val aDf = spark.createDataset(aData).toDF
val bDf = spark.createDataset(bData).toDF
val cDf = spark.createDataset(cData).toDF
val xDf = spark.createDataset(xData).toDF

val unionDf =
  aDf.select($"id", lit("a").as("name"), $"colA", lit(null).as("colB"), 
lit(null).as("colC")).union(
  bDf.select($"id", lit("b").as("name"), lit(null).as("colA"), $"colB", 
lit(null).as("colC"))).union(
  cDf.select($"id", lit("c").as("name"), lit(null).as("colA"), 
lit(null).as("colB"), $"colC"))
val result = xDf.join(unionDf, unionDf("name") === xDf("name") && unionDf("id") 
=== xDf("id"))
result.show
{noformat}

The result being
{noformat}
+---+----+---+----+----+----+----+
| id|name| id|name|colA|colB|colC|
+---+----+---+----+----+----+----+
|  1|   a|  1|   a|true|null|null|
+---+----+---+----+----+----+----+
{noformat}

Force computing {{unionDf}} using {{count}} does not help change the result of 
the join. However, writing the data to disk and reading it back does give the 
correct result. But it is definitely not ideal. Interestingly caching the 
{{unionDf}} also gives the correct result.

{noformat}
+---+----+---+----+----+----+----+
| id|name| id|name|colA|colB|colC|
+---+----+---+----+----+----+----+
|  1|   a|  1|   a|true|null|null|
|  2|   b|  2|   b|null|  10|null|
|  3|   c|  3|   c|null|null|9.73|
+---+----+---+----+----+----+----+
{noformat}

  was:
I found this bug while trying to update from Spark 1.6.1 to 2.1.0. The bug is 
that when we try to join two DataFrames, one of which is a result of a union 
operation, the result of the join results in data as if the table was joined 
only to the first table in the union. This issue is not present in Spark 2.0.0 
or 2.0.1 or 2.0.2, only in 2.1.0. Here's how to reproduce it.

{{{noformat}}}
import spark.implicits._
import org.apache.spark.sql.functions.lit

case class A(id: Long, colA: Boolean)
case class B(id: Long, colB: Int)
case class C(id: Long, colC: Double)
case class X(id: Long, name: String)

val aData = A(1, true) :: Nil
val bData = B(2, 10) :: Nil
val cData = C(3, 9.73D) :: Nil
val xData = X(1, "a") :: X(2, "b") :: X(3, "c") :: Nil

val aDf = spark.createDataset(aData).toDF
val bDf = spark.createDataset(bData).toDF
val cDf = spark.createDataset(cData).toDF
val xDf = spark.createDataset(xData).toDF

val unionDf =
  aDf.select($"id", lit("a").as("name"), $"colA", lit(null).as("colB"), 
lit(null).as("colC")).union(
  bDf.select($"id", lit("b").as("name"), lit(null).as("colA"), $"colB", 
lit(null).as("colC"))).union(
  cDf.select($"id", lit("c").as("name"), lit(null).as("colA"), 
lit(null).as("colB"), $"colC"))
val result = xDf.join(unionDf, unionDf("name") === xDf("name") && unionDf("id") 
=== xDf("id"))
result.show
{{{noformat}}}

The result being
{{{noformat}}}
+---+----+---+----+----+----+----+
| id|name| id|name|colA|colB|colC|
+---+----+---+----+----+----+----+
|  1|   a|  1|   a|true|null|null|
+---+----+---+----+----+----+----+
{{{noformat}}}

Force computing {{unionDf}} using {{count}} does not help change the result of 
the join. However, writing the data to disk and reading it back does give the 
correct result. But it is definitely not ideal. Interestingly caching the 
{{unionDf}} also gives the correct result.

{{{noformat}}}
+---+----+---+----+----+----+----+
| id|name| id|name|colA|colB|colC|
+---+----+---+----+----+----+----+
|  1|   a|  1|   a|true|null|null|
|  2|   b|  2|   b|null|  10|null|
|  3|   c|  3|   c|null|null|9.73|
+---+----+---+----+----+----+----+
{{{noformat}}}


> Joining to a unioned DataFrame does not produce expected result.
> ----------------------------------------------------------------
>
>                 Key: SPARK-19954
>                 URL: https://issues.apache.org/jira/browse/SPARK-19954
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0
>            Reporter: Arun Allamsetty
>            Priority: Blocker
>
> I found this bug while trying to update from Spark 1.6.1 to 2.1.0. The bug is 
> that when we try to join two DataFrames, one of which is a result of a union 
> operation, the result of the join results in data as if the table was joined 
> only to the first table in the union. This issue is not present in Spark 
> 2.0.0 or 2.0.1 or 2.0.2, only in 2.1.0. Here's how to reproduce it.
> {noformat}
> import spark.implicits._
> import org.apache.spark.sql.functions.lit
> case class A(id: Long, colA: Boolean)
> case class B(id: Long, colB: Int)
> case class C(id: Long, colC: Double)
> case class X(id: Long, name: String)
> val aData = A(1, true) :: Nil
> val bData = B(2, 10) :: Nil
> val cData = C(3, 9.73D) :: Nil
> val xData = X(1, "a") :: X(2, "b") :: X(3, "c") :: Nil
> val aDf = spark.createDataset(aData).toDF
> val bDf = spark.createDataset(bData).toDF
> val cDf = spark.createDataset(cData).toDF
> val xDf = spark.createDataset(xData).toDF
> val unionDf =
>   aDf.select($"id", lit("a").as("name"), $"colA", lit(null).as("colB"), 
> lit(null).as("colC")).union(
>   bDf.select($"id", lit("b").as("name"), lit(null).as("colA"), $"colB", 
> lit(null).as("colC"))).union(
>   cDf.select($"id", lit("c").as("name"), lit(null).as("colA"), 
> lit(null).as("colB"), $"colC"))
> val result = xDf.join(unionDf, unionDf("name") === xDf("name") && 
> unionDf("id") === xDf("id"))
> result.show
> {noformat}
> The result being
> {noformat}
> +---+----+---+----+----+----+----+
> | id|name| id|name|colA|colB|colC|
> +---+----+---+----+----+----+----+
> |  1|   a|  1|   a|true|null|null|
> +---+----+---+----+----+----+----+
> {noformat}
> Force computing {{unionDf}} using {{count}} does not help change the result 
> of the join. However, writing the data to disk and reading it back does give 
> the correct result. But it is definitely not ideal. Interestingly caching the 
> {{unionDf}} also gives the correct result.
> {noformat}
> +---+----+---+----+----+----+----+
> | id|name| id|name|colA|colB|colC|
> +---+----+---+----+----+----+----+
> |  1|   a|  1|   a|true|null|null|
> |  2|   b|  2|   b|null|  10|null|
> |  3|   c|  3|   c|null|null|9.73|
> +---+----+---+----+----+----+----+
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to