[ 
https://issues.apache.org/jira/browse/SPARK-17377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davies Liu updated SPARK-17377:
-------------------------------
    Description: 
Reproduction: 

1) Read two Datasets from a partitioned Parquet file with different filter 
conditions on the partitioning column
2) Group by a column and aggregate the two data sets
3) Join the aggregated Datasets on the group by column
4) In the joined dataset, the aggregated values from the right Dataset have 
been replaced with the aggregated values from the left Dataset 

The issue is only reproduced when the input parquet file is partitioned.

Example: 
{code}

val dataPath= "/your/data/path/" 

case class InputData(id: Int, value: Int, filterColumn: Int)

val inputDS = Seq(InputData(1, 1, 1), InputData(2, 2, 1), InputData(3, 3, 1), 
InputData(4, 4, 1), InputData(1, 10, 2), InputData(2, 20, 2), InputData(3, 30, 
2), InputData(4, 40, 2)).toDS()

inputDS.show
| id|value|filterColumn|
|  1|    1|           1|
|  2|    2|           1|
|  3|    3|           1|
|  4|    4|           1|
|  1|   10|           2|
|  2|   20|           2|
|  3|   30|           2|
|  4|   40|           2|

inputDS.write.partitionBy("filterColumn").parquet(dataPath)

val dataDF = spark.read.parquet(dataPath)

case class LeftClass(id: Int, aggLeft: Long)

case class RightClass(id: Int, aggRight: Long)

val leftDS = dataDF.filter("filterColumn = 1").groupBy("id").agg(sum("value") 
as "aggLeft").as[LeftClass]

val rightDS = dataDF.filter("filterColumn = 2").groupBy("id").agg(sum("value") 
as "aggRight").as[RightClass]

leftDS.show
| id|aggLeft|
|  1|      1|
|  3|      3|
|  4|      4|
|  2|      2|

rightDS.show
| id|aggRight|
|  1|      10|
|  3|      30|
|  4|      40|
|  2|      20|

val joinedDS = leftDS.join(rightDS,"id")
joinedDS.show
| id|aggLeft|aggRight|
|  1|      1|       1|
|  3|      3|       3|
|  4|      4|       4|
|  2|      2|       2|
{code}

  was:
Reproduction: 

1) Read two Datasets from a partitioned Parquet file with different filter 
conditions on the partitioning column
2) Group by a column and aggregate the two data sets
3) Join the aggregated Datasets on the group by column
4) In the joined dataset, the aggregated values from the right Dataset have 
been replaced with the aggregated values from the left Dataset 

The issue is only reproduced when the input parquet file is partitioned.

Example: 

val dataPath= "/your/data/path/" 

case class InputData(id: Int, value: Int, filterColumn: Int)

val inputDS = Seq(InputData(1, 1, 1), InputData(2, 2, 1), InputData(3, 3, 1), 
InputData(4, 4, 1), InputData(1, 10, 2), InputData(2, 20, 2), InputData(3, 30, 
2), InputData(4, 40, 2)).toDS()

inputDS.show
| id|value|filterColumn|
|  1|    1|           1|
|  2|    2|           1|
|  3|    3|           1|
|  4|    4|           1|
|  1|   10|           2|
|  2|   20|           2|
|  3|   30|           2|
|  4|   40|           2|

inputDS.write.partitionBy("filterColumn").parquet(dataPath)

val dataDF = spark.read.parquet(dataPath)

case class LeftClass(id: Int, aggLeft: Long)

case class RightClass(id: Int, aggRight: Long)

val leftDS = dataDF.filter("filterColumn = 1").groupBy("id").agg(sum("value") 
as "aggLeft").as[LeftClass]

val rightDS = dataDF.filter("filterColumn = 2").groupBy("id").agg(sum("value") 
as "aggRight").as[RightClass]

leftDS.show
| id|aggLeft|
|  1|      1|
|  3|      3|
|  4|      4|
|  2|      2|

rightDS.show
| id|aggRight|
|  1|      10|
|  3|      30|
|  4|      40|
|  2|      20|

val joinedDS = leftDS.join(rightDS,"id")
joinedDS.show
| id|aggLeft|aggRight|
|  1|      1|       1|
|  3|      3|       3|
|  4|      4|       4|
|  2|      2|       2|


> Joining Datasets read and aggregated from a partitioned Parquet file gives 
> wrong results
> ----------------------------------------------------------------------------------------
>
>                 Key: SPARK-17377
>                 URL: https://issues.apache.org/jira/browse/SPARK-17377
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 2.0.0
>            Reporter: Hanna Mäki
>
> Reproduction: 
> 1) Read two Datasets from a partitioned Parquet file with different filter 
> conditions on the partitioning column
> 2) Group by a column and aggregate the two data sets
> 3) Join the aggregated Datasets on the group by column
> 4) In the joined dataset, the aggregated values from the right Dataset have 
> been replaced with the aggregated values from the left Dataset 
> The issue is only reproduced when the input parquet file is partitioned.
> Example: 
> {code}
> val dataPath= "/your/data/path/" 
> case class InputData(id: Int, value: Int, filterColumn: Int)
> val inputDS = Seq(InputData(1, 1, 1), InputData(2, 2, 1), InputData(3, 3, 1), 
> InputData(4, 4, 1), InputData(1, 10, 2), InputData(2, 20, 2), InputData(3, 
> 30, 2), InputData(4, 40, 2)).toDS()
> inputDS.show
> | id|value|filterColumn|
> |  1|    1|           1|
> |  2|    2|           1|
> |  3|    3|           1|
> |  4|    4|           1|
> |  1|   10|           2|
> |  2|   20|           2|
> |  3|   30|           2|
> |  4|   40|           2|
> inputDS.write.partitionBy("filterColumn").parquet(dataPath)
> val dataDF = spark.read.parquet(dataPath)
> case class LeftClass(id: Int, aggLeft: Long)
> case class RightClass(id: Int, aggRight: Long)
> val leftDS = dataDF.filter("filterColumn = 1").groupBy("id").agg(sum("value") 
> as "aggLeft").as[LeftClass]
> val rightDS = dataDF.filter("filterColumn = 
> 2").groupBy("id").agg(sum("value") as "aggRight").as[RightClass]
> leftDS.show
> | id|aggLeft|
> |  1|      1|
> |  3|      3|
> |  4|      4|
> |  2|      2|
> rightDS.show
> | id|aggRight|
> |  1|      10|
> |  3|      30|
> |  4|      40|
> |  2|      20|
> val joinedDS = leftDS.join(rightDS,"id")
> joinedDS.show
> | id|aggLeft|aggRight|
> |  1|      1|       1|
> |  3|      3|       3|
> |  4|      4|       4|
> |  2|      2|       2|
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to