Miguel Molina created SPARK-29549: ------------------------------------- Summary: Union of DataSourceV2 datasources leads to duplicated results Key: SPARK-29549 URL: https://issues.apache.org/jira/browse/SPARK-29549 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.4, 2.3.3, 2.3.2, 2.3.1, 2.3.0 Reporter: Miguel Molina
Hello! I've discovered that when two DataSourceV2 data frames in a query of the exact same shape are joined and there is an aggregation in the query, only the first results are used. The rest get removed by the ReuseExchange rule and reuse the results of the first data frame, leading to N times the first data frame results. I've put together a repository with an example project where this can be reproduced: [https://github.com/erizocosmico/spark-union-issue] Basically, doing this: {code:java} val products = spark.sql("SELECT name, COUNT(*) as count FROM products GROUP BY name") val users = spark.sql("SELECT name, COUNT(*) as count FROM users GROUP BY name") products.union(users) .select("name") .show(truncate = false, numRows = 50){code} Where products is: {noformat} +---------+---+ |name |id | +---------+---+ |candy |1 | |chocolate|2 | |milk |3 | |cinnamon |4 | |pizza |5 | |pineapple|6 | +---------+---+{noformat} And users is: {noformat} +-------+---+ |name |id | +-------+---+ |andy |1 | |alice |2 | |mike |3 | |mariah |4 | |eleanor|5 | |matthew|6 | +-------+---+ {noformat} Results are incorrect: {noformat} +---------+ |name | +---------+ |candy | |pizza | |chocolate| |cinnamon | |pineapple| |milk | |candy | |pizza | |chocolate| |cinnamon | |pineapple| |milk | +---------+{noformat} This is the plan explained: {noformat} == Parsed Logical Plan == 'Project [unresolvedalias('name, None)] +- AnalysisBarrier +- Union :- Aggregate [name#0], [name#0, count(1) AS count#8L] : +- SubqueryAlias products : +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6])) +- Aggregate [name#4], [name#4, count(1) AS count#12L] +- SubqueryAlias users +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], [alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6])) == Analyzed Logical Plan == name: string Project [name#0] +- Union :- Aggregate [name#0], [name#0, count(1) AS count#8L] : +- SubqueryAlias products : +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6])) +- Aggregate [name#4], [name#4, count(1) AS count#12L] +- SubqueryAlias users +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], [alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6])) == Optimized Logical Plan == Union :- Aggregate [name#0], [name#0] : +- Project [name#0] : +- DataSourceV2Relation [name#0, id#1], DefaultReader(List([candy,1], [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6])) +- Aggregate [name#4], [name#4] +- Project [name#4] +- DataSourceV2Relation [name#4, id#5], DefaultReader(List([andy,1], [alice,2], [mike,3], [mariah,4], [eleanor,5], [matthew,6])) == Physical Plan == Union :- *(2) HashAggregate(keys=[name#0], functions=[], output=[name#0]) : +- Exchange hashpartitioning(name#0, 200) : +- *(1) HashAggregate(keys=[name#0], functions=[], output=[name#0]) : +- *(1) Project [name#0] : +- *(1) DataSourceV2Scan [name#0, id#1], DefaultReader(List([candy,1], [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6])) +- *(4) HashAggregate(keys=[name#4], functions=[], output=[name#4]) +- ReusedExchange [name#4], Exchange hashpartitioning(name#0, 200) {noformat} In the physical plan, the first exchange is reused, but it shouldn't be because both sources are not the same. {noformat} == Physical Plan == Union :- *(2) HashAggregate(keys=[name#0], functions=[], output=[name#0]) : +- Exchange hashpartitioning(name#0, 200) : +- *(1) HashAggregate(keys=[name#0], functions=[], output=[name#0]) : +- *(1) Project [name#0] : +- *(1) DataSourceV2Scan [name#0, id#1], DefaultReader(List([candy,1], [chocolate,2], [milk,3], [cinnamon,4], [pizza,5], [pineapple,6])) +- *(4) HashAggregate(keys=[name#4], functions=[], output=[name#4]) +- ReusedExchange [name#4], Exchange hashpartitioning(name#0, 200){noformat} This seems to be fixed in 2.4.x, but affects, 2.3.x versions. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org