[ 
https://issues.apache.org/jira/browse/SPARK-29549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16961043#comment-16961043
 ] 

Miguel Molina commented on SPARK-29549:
---------------------------------------

Hi [~hyukjin.kwon]. In Spark 2.4 it works correctly, this only affects 2.3.x.

> Union of DataSourceV2 datasources leads to duplicated results
> -------------------------------------------------------------
>
>                 Key: SPARK-29549
>                 URL: https://issues.apache.org/jira/browse/SPARK-29549
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.3.4
>            Reporter: Miguel Molina
>            Priority: Major
>
> Hello!
> I've discovered that when two DataSourceV2 data frames in a query of the 
> exact same shape are joined and there is an aggregation in the query, only 
> the first results are used. The rest get removed by the ReuseExchange rule 
> and reuse the results of the first data frame, leading to N times the first 
> data frame results.
>  
> I've put together a repository with an example project where this can be 
> reproduced: [https://github.com/erizocosmico/spark-union-issue]
>  
> Basically, doing this:
>  
> {code:java}
> val products = spark.sql("SELECT name, COUNT(*) as count FROM products GROUP 
> BY name")
> val users = spark.sql("SELECT name, COUNT(*) as count FROM users GROUP BY 
> name")
> products.union(users)
>  .select("name")
>  .show(truncate = false, numRows = 50){code}
>  
>  
> Where products is:
> {noformat}
> +---------+---+
> |name |id |
> +---------+---+
> |candy |1 |
> |chocolate|2 |
> |milk |3 |
> |cinnamon |4 |
> |pizza |5 |
> |pineapple|6 |
> +---------+---+{noformat}
> And users is:
> {noformat}
> +-------+---+
> |name |id |
> +-------+---+
> |andy |1 |
> |alice |2 |
> |mike |3 |
> |mariah |4 |
> |eleanor|5 |
> |matthew|6 |
> +-------+---+ {noformat}
>  
> Results are incorrect:
> {noformat}
> +---------+
> |name |
> +---------+
> |candy |
> |pizza |
> |chocolate|
> |cinnamon |
> |pineapple|
> |milk |
> |candy |
> |pizza |
> |chocolate|
> |cinnamon |
> |pineapple|
> |milk |
> +---------+{noformat}
>  
> This is the plan explained:
>  
> {noformat}
> == Parsed Logical Plan ==
> 'Project [unresolvedalias('name, None)]
> +- AnalysisBarrier
>  +- Union
>  :- Aggregate [name#0], [name#0, count(1) AS count#8L]
>  : +- SubqueryAlias products
>  : +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], 
> [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
>  +- Aggregate [name#4], [name#4, count(1) AS count#12L]
>  +- SubqueryAlias users
>  +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], 
> [alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6]))
> == Analyzed Logical Plan ==
> name: string
> Project [name#0]
> +- Union
>  :- Aggregate [name#0], [name#0, count(1) AS count#8L]
>  : +- SubqueryAlias products
>  : +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], 
> [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
>  +- Aggregate [name#4], [name#4, count(1) AS count#12L]
>  +- SubqueryAlias users
>  +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], 
> [alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6]))
> == Optimized Logical Plan ==
> Union
> :- Aggregate [name#0], [name#0]
> : +- Project [name#0]
> : +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], 
> [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
> +- Aggregate [name#4], [name#4]
>  +- Project [name#4]
>  +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], 
> [alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6]))
> == Physical Plan ==
> Union
> :- *(2) HashAggregate(keys=[name#0], functions=[], output=[name#0])
> : +- Exchange hashpartitioning(name#0, 200)
> : +- *(1) HashAggregate(keys=[name#0], functions=[], output=[name#0])
> : +- *(1) Project [name#0]
> : +- *(1) DataSourceV2Scan [name#0, id#1], DefaultReader(List([candy,1], 
> [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
> +- *(4) HashAggregate(keys=[name#4], functions=[], output=[name#4])
>  +- ReusedExchange [name#4], Exchange hashpartitioning(name#0, 200)
> {noformat}
>  
>  
> In the physical plan, the first exchange is reused, but it shouldn't be 
> because both sources are not the same.
>  
> {noformat}
> == Physical Plan ==
> Union
> :- *(2) HashAggregate(keys=[name#0], functions=[], output=[name#0])
> : +- Exchange hashpartitioning(name#0, 200)
> : +- *(1) HashAggregate(keys=[name#0], functions=[], output=[name#0])
> : +- *(1) Project [name#0]
> : +- *(1) DataSourceV2Scan [name#0, id#1], DefaultReader(List([candy,1], 
> [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6]))
> +- *(4) HashAggregate(keys=[name#4], functions=[], output=[name#4])
>  +- ReusedExchange [name#4], Exchange hashpartitioning(name#0, 200){noformat}
>  
> This seems to be fixed in 2.4.x, but affects, 2.3.x versions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to