[ https://issues.apache.org/jira/browse/SPARK-29549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16961043#comment-16961043 ]
Miguel Molina commented on SPARK-29549: --------------------------------------- Hi [~hyukjin.kwon]. In Spark 2.4 it works correctly, this only affects 2.3.x. > Union of DataSourceV2 datasources leads to duplicated results > ------------------------------------------------------------- > > Key: SPARK-29549 > URL: https://issues.apache.org/jira/browse/SPARK-29549 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.3.4 > Reporter: Miguel Molina > Priority: Major > > Hello! > I've discovered that when two DataSourceV2 data frames in a query of the > exact same shape are joined and there is an aggregation in the query, only > the first results are used. The rest get removed by the ReuseExchange rule > and reuse the results of the first data frame, leading to N times the first > data frame results. > > I've put together a repository with an example project where this can be > reproduced: [https://github.com/erizocosmico/spark-union-issue] > > Basically, doing this: > > {code:java} > val products = spark.sql("SELECT name, COUNT(*) as count FROM products GROUP > BY name") > val users = spark.sql("SELECT name, COUNT(*) as count FROM users GROUP BY > name") > products.union(users) > .select("name") > .show(truncate = false, numRows = 50){code} > > > Where products is: > {noformat} > +---------+---+ > |name |id | > +---------+---+ > |candy |1 | > |chocolate|2 | > |milk |3 | > |cinnamon |4 | > |pizza |5 | > |pineapple|6 | > +---------+---+{noformat} > And users is: > {noformat} > +-------+---+ > |name |id | > +-------+---+ > |andy |1 | > |alice |2 | > |mike |3 | > |mariah |4 | > |eleanor|5 | > |matthew|6 | > +-------+---+ {noformat} > > Results are incorrect: > {noformat} > +---------+ > |name | > +---------+ > |candy | > |pizza | > |chocolate| > |cinnamon | > |pineapple| > |milk | > |candy | > |pizza | > |chocolate| > |cinnamon | > |pineapple| > |milk | > +---------+{noformat} > > This is the plan explained: > > {noformat} > == Parsed Logical Plan == > 'Project [unresolvedalias('name, None)] > +- AnalysisBarrier > +- Union > :- Aggregate [name#0], [name#0, count(1) AS count#8L] > : +- SubqueryAlias products > : +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], > [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6])) > +- Aggregate [name#4], [name#4, count(1) AS count#12L] > +- SubqueryAlias users > +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], > [alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6])) > == Analyzed Logical Plan == > name: string > Project [name#0] > +- Union > :- Aggregate [name#0], [name#0, count(1) AS count#8L] > : +- SubqueryAlias products > : +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], > [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6])) > +- Aggregate [name#4], [name#4, count(1) AS count#12L] > +- SubqueryAlias users > +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], > [alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6])) > == Optimized Logical Plan == > Union > :- Aggregate [name#0], [name#0] > : +- Project [name#0] > : +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], > [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6])) > +- Aggregate [name#4], [name#4] > +- Project [name#4] > +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], > [alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6])) > == Physical Plan == > Union > :- *(2) HashAggregate(keys=[name#0], functions=[], output=[name#0]) > : +- Exchange hashpartitioning(name#0, 200) > : +- *(1) HashAggregate(keys=[name#0], functions=[], output=[name#0]) > : +- *(1) Project [name#0] > : +- *(1) DataSourceV2Scan [name#0, id#1], DefaultReader(List([candy,1], > [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6])) > +- *(4) HashAggregate(keys=[name#4], functions=[], output=[name#4]) > +- ReusedExchange [name#4], Exchange hashpartitioning(name#0, 200) > {noformat} > > > In the physical plan, the first exchange is reused, but it shouldn't be > because both sources are not the same. > > {noformat} > == Physical Plan == > Union > :- *(2) HashAggregate(keys=[name#0], functions=[], output=[name#0]) > : +- Exchange hashpartitioning(name#0, 200) > : +- *(1) HashAggregate(keys=[name#0], functions=[], output=[name#0]) > : +- *(1) Project [name#0] > : +- *(1) DataSourceV2Scan [name#0, id#1], DefaultReader(List([candy,1], > [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6])) > +- *(4) HashAggregate(keys=[name#4], functions=[], output=[name#4]) > +- ReusedExchange [name#4], Exchange hashpartitioning(name#0, 200){noformat} > > This seems to be fixed in 2.4.x, but affects, 2.3.x versions. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org