This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 9b39e4b [SPARK-32753][SQL][3.0] Only copy tags to node with no tags
9b39e4b is described below
commit 9b39e4b7aefcedf09764b3528a32bdf0b77e331b
Author: manuzhang <[email protected]>
AuthorDate: Tue Sep 8 13:36:05 2020 +0000
[SPARK-32753][SQL][3.0] Only copy tags to node with no tags
This PR backports https://github.com/apache/spark/pull/29593 to branch-3.0
### What changes were proposed in this pull request?
Only copy tags to node with no tags when transforming plans.
### Why are the changes needed?
cloud-fan [made a good
point](https://github.com/apache/spark/pull/29593#discussion_r482013121) that
it doesn't make sense to append tags to existing nodes when nodes are removed.
That will cause such bugs as duplicate rows when deduplicating and
repartitioning by the same column with AQE.
```
spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")
val df = spark.sql("select id from v1 group by id distribute by id")
println(df.collect().toArray.mkString(","))
println(df.queryExecution.executedPlan)
// With AQE
[4],[0],[3],[2],[1],[7],[6],[8],[5],[9],[4],[0],[3],[2],[1],[7],[6],[8],[5],[9]
AdaptiveSparkPlan(isFinalPlan=true)
+- CustomShuffleReader local
+- ShuffleQueryStage 0
+- Exchange hashpartitioning(id#183L, 10), true
+- *(3) HashAggregate(keys=[id#183L], functions=[],
output=[id#183L])
+- Union
:- *(1) Range (0, 10, step=1, splits=2)
+- *(2) Range (0, 10, step=1, splits=2)
// Without AQE
[4],[7],[0],[6],[8],[3],[2],[5],[1],[9]
*(4) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
+- Exchange hashpartitioning(id#206L, 10), true
+- *(3) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
+- Union
:- *(1) Range (0, 10, step=1, splits=2)
+- *(2) Range (0, 10, step=1, splits=2)
```
It's too expensive to detect node removal so we make a compromise only to
copy tags to node with no tags.
### Does this PR introduce any user-facing change?
Yes. Fix a bug.
### How was this patch tested?
Add test.
Closes #29665 from manuzhang/spark-32753-3.0.
Authored-by: manuzhang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/catalyst/trees/TreeNode.scala | 7 ++++++-
.../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 16 +++++++++++++++-
2 files changed, 21 insertions(+), 2 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index c4a1067..4c74742 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -91,7 +91,12 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
extends Product {
private val tags: mutable.Map[TreeNodeTag[_], Any] = mutable.Map.empty
protected def copyTagsFrom(other: BaseType): Unit = {
- tags ++= other.tags
+ // SPARK-32753: it only makes sense to copy tags to a new node
+ // but it's too expensive to detect other cases likes node removal
+ // so we make a compromise here to copy tags to node with no tags
+ if (tags.isEmpty) {
+ tags ++= other.tags
+ }
}
def setTagValue[T](tag: TreeNodeTag[T], value: T): Unit = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index e18adbd..6d97a6b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -28,7 +28,7 @@ import
org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
import org.apache.spark.sql.execution.{PartialReducerPartitionSpec,
ReusedSubqueryExec, ShuffledRowRDD, SparkPlan}
import
org.apache.spark.sql.execution.adaptive.OptimizeLocalShuffleReader.LOCAL_SHUFFLE_READER_DESCRIPTION
import org.apache.spark.sql.execution.command.DataWritingCommandExec
-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
Exchange, ReusedExchangeExec}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
Exchange, ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec,
BuildRight, SortMergeJoinExec}
import
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
import org.apache.spark.sql.functions._
@@ -889,4 +889,18 @@ class AdaptiveQueryExecSuite
}
}
}
+
+ test("SPARK-32753: Only copy tags to node with no tags") {
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+ withTempView("v1") {
+ spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")
+
+ val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
+ "SELECT id FROM v1 GROUP BY id DISTRIBUTE BY id")
+ assert(collect(adaptivePlan) {
+ case s: ShuffleExchangeExec => s
+ }.length == 1)
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]