[
https://issues.apache.org/jira/browse/SPARK-32753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Manu Zhang updated SPARK-32753:
-------------------------------
Description:
To reproduce:
{code:java}
spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")
val df = spark.sql("select id from v1 group by id distribute by id")
println(df.collect().toArray.mkString(","))
println(df.queryExecution.executedPlan)
// With AQE
[4],[0],[3],[2],[1],[7],[6],[8],[5],[9],[4],[0],[3],[2],[1],[7],[6],[8],[5],[9]
AdaptiveSparkPlan(isFinalPlan=true)
+- CustomShuffleReader local
+- ShuffleQueryStage 0
+- Exchange hashpartitioning(id#183L, 10), true
+- *(3) HashAggregate(keys=[id#183L], functions=[], output=[id#183L])
+- Union
:- *(1) Range (0, 10, step=1, splits=2)
+- *(2) Range (0, 10, step=1, splits=2)
// Without AQE
[4],[7],[0],[6],[8],[3],[2],[5],[1],[9]
*(4) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
+- Exchange hashpartitioning(id#206L, 10), true
+- *(3) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
+- Union
:- *(1) Range (0, 10, step=1, splits=2)
+- *(2) Range (0, 10, step=1, splits=2){code}
was:
To reproduce:
spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")
val df = spark.sql("select id from v1 group by id distribute by id")
println(df.collect().toArray.mkString(","))
println(df.queryExecution.executedPlan)// With
AQE[4],[0],[3],[2],[1],[7],[6],[8],[5],[9],[4],[0],[3],[2],[1],[7],[6],[8],[5],[9]
AdaptiveSparkPlan(isFinalPlan=true)
+- CustomShuffleReader local
+- ShuffleQueryStage 0
+- Exchange hashpartitioning(id#183L, 10), true +- *(3)
HashAggregate(keys=[id#183L], functions=[], output=[id#183L])
+- Union
:- *(1) Range (0, 10, step=1, splits=2)
+- *(2) Range (0, 10, step=1, splits=2)// Without
AQE[4],[7],[0],[6],[8],[3],[2],[5],[1],[9]
*(4) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
+- Exchange hashpartitioning(id#206L, 10), true +- *(3)
HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
+- Union
:- *(1) Range (0, 10, step=1, splits=2)
+- *(2) Range (0, 10, step=1, splits=2)
> Deduplicating and repartitioning the same column create duplicate rows with
> AQE
> -------------------------------------------------------------------------------
>
> Key: SPARK-32753
> URL: https://issues.apache.org/jira/browse/SPARK-32753
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.0.0
> Reporter: Manu Zhang
> Priority: Minor
>
> To reproduce:
> {code:java}
> spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")
> val df = spark.sql("select id from v1 group by id distribute by id")
> println(df.collect().toArray.mkString(","))
> println(df.queryExecution.executedPlan)
> // With AQE
> [4],[0],[3],[2],[1],[7],[6],[8],[5],[9],[4],[0],[3],[2],[1],[7],[6],[8],[5],[9]
> AdaptiveSparkPlan(isFinalPlan=true)
> +- CustomShuffleReader local
> +- ShuffleQueryStage 0
> +- Exchange hashpartitioning(id#183L, 10), true
> +- *(3) HashAggregate(keys=[id#183L], functions=[], output=[id#183L])
> +- Union
> :- *(1) Range (0, 10, step=1, splits=2)
> +- *(2) Range (0, 10, step=1, splits=2)
> // Without AQE
> [4],[7],[0],[6],[8],[3],[2],[5],[1],[9]
> *(4) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
> +- Exchange hashpartitioning(id#206L, 10), true
> +- *(3) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
> +- Union
> :- *(1) Range (0, 10, step=1, splits=2)
> +- *(2) Range (0, 10, step=1, splits=2){code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]