This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 1a1e468 [SPARK-36353][SQL][3.1] RemoveNoopOperators should keep
output schema
1a1e468 is described below
commit 1a1e468bbbc19175d485bf2cb576e538291568f7
Author: Angerszhuuuu <[email protected]>
AuthorDate: Fri Aug 13 16:43:28 2021 +0800
[SPARK-36353][SQL][3.1] RemoveNoopOperators should keep output schema
### What changes were proposed in this pull request?
RemoveNoopOperators should keep output schema
### Why are the changes needed?
Expand function
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Not need
Closes #33704 from AngersZhuuuu/SPARK-36353-3.1.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/optimizer/Optimizer.scala | 26 +++++++++++-
.../optimizer/RemoveNoopOperatorsSuite.scala | 47 ++++++++++++++++++++++
2 files changed, 72 insertions(+), 1 deletion(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 1fec27f..c3e6310 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -488,9 +488,33 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
* Remove no-op operators from the query plan that do not make any
modifications.
*/
object RemoveNoopOperators extends Rule[LogicalPlan] {
+ def restoreOriginalOutputNames(
+ projectList: Seq[NamedExpression],
+ originalNames: Seq[String]): Seq[NamedExpression] = {
+ projectList.zip(originalNames).map {
+ case (attr: Attribute, name) => attr.withName(name)
+ case (alias: Alias, name) => alias.withName(name)
+ case (other, _) => other
+ }
+ }
+
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Eliminate no-op Projects
- case p @ Project(_, child) if child.sameOutput(p) => child
+ case p @ Project(projectList, child) if child.sameOutput(p) =>
+ val newChild = child match {
+ case p: Project =>
+ p.copy(projectList = restoreOriginalOutputNames(p.projectList,
projectList.map(_.name)))
+ case agg: Aggregate =>
+ agg.copy(aggregateExpressions =
+ restoreOriginalOutputNames(agg.aggregateExpressions,
projectList.map(_.name)))
+ case _ =>
+ child
+ }
+ if (newChild.output.zip(projectList).forall { case (a1, a2) => a1.name
== a2.name }) {
+ newChild
+ } else {
+ p
+ }
// Eliminate no-op Window
case w: Window if w.windowExpressions.isEmpty => w.child
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveNoopOperatorsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveNoopOperatorsSuite.scala
new file mode 100644
index 0000000..0f70e6f
--- /dev/null
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveNoopOperatorsSuite.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class RemoveNoopOperatorsSuite extends PlanTest {
+
+ object Optimize extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("RemoveNoopOperators", Once,
+ RemoveNoopOperators) :: Nil
+ }
+
+ val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+
+ test("SPARK-36353: RemoveNoopOperators should keep output schema") {
+ val query = testRelation
+ .select(('a + 'b).as("c"))
+ .analyze
+ val originalQuery = Project(Seq(query.output.head.withName("C")), query)
+ val optimized = Optimize.execute(originalQuery.analyze)
+ val result = testRelation
+ .select(('a + 'b).as("C"))
+ .analyze
+ comparePlans(optimized, result)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]