Repository: spark Updated Branches: refs/heads/branch-2.4 fc036729c -> c67c597b6
[SPARK-25450][SQL] PushProjectThroughUnion rule uses the same exprId for project expressions in each Union child, causing mistakes in constant propagation ## What changes were proposed in this pull request? The problem was cause by the PushProjectThroughUnion rule, which, when creating new Project for each child of Union, uses the same exprId for expressions of the same position. This is wrong because, for each child of Union, the expressions are all independent, and it can lead to a wrong result if other rules like FoldablePropagation kicks in, taking two different expressions as the same. This fix is to create new expressions in the new Project for each child of Union. ## How was this patch tested? Added UT. Closes #22447 from maryannxue/push-project-thru-union-bug. Authored-by: maryannxue <[email protected]> Signed-off-by: gatorsmile <[email protected]> (cherry picked from commit 88446b6ad19371f15d06ef67052f6c1a8072c04a) Signed-off-by: gatorsmile <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c67c597b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c67c597b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c67c597b Branch: refs/heads/branch-2.4 Commit: c67c597b604c1c22334913c81e69f6237639e42e Parents: fc03672 Author: maryannxue <[email protected]> Authored: Thu Sep 20 10:00:28 2018 -0700 Committer: gatorsmile <[email protected]> Committed: Thu Sep 20 10:00:42 2018 -0700 ---------------------------------------------------------------------- .../sql/catalyst/optimizer/Optimizer.scala | 4 ++ .../PushProjectThroughUnionSuite.scala | 54 ++++++++++++++++++++ 2 files changed, 58 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c67c597b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e4b4f1e..b489a67 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -486,6 +486,10 @@ object PushProjectionThroughUnion extends Rule[LogicalPlan] with PredicateHelper private def pushToRight[A <: Expression](e: A, rewrites: AttributeMap[Attribute]) = { val result = e transform { case a: Attribute => rewrites(a) + } match { + // Make sure exprId is unique in each child of Union. + case Alias(child, alias) => Alias(child, alias)() + case other => other } // We must promise the compiler that we did not discard the names in the case of project http://git-wip-us.apache.org/repos/asf/spark/blob/c67c597b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectThroughUnionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectThroughUnionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectThroughUnionSuite.scala new file mode 100644 index 0000000..294d298 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectThroughUnionSuite.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor + +class PushProjectThroughUnionSuite extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = Batch("Optimizer Batch", FixedPoint(100), + PushProjectionThroughUnion, + FoldablePropagation) :: Nil + } + + test("SPARK-25450 PushProjectThroughUnion rule uses the same exprId for project expressions " + + "in each Union child, causing mistakes in constant propagation") { + val testRelation1 = LocalRelation('a.string, 'b.int, 'c.string) + val testRelation2 = LocalRelation('d.string, 'e.int, 'f.string) + val query = testRelation1 + .union(testRelation2.select("bar".as("d"), 'e, 'f)) + .select('a.as("n")) + .select('n, "dummy").analyze + val optimized = Optimize.execute(query) + + val expected = testRelation1 + .select('a.as("n")) + .select('n, "dummy") + .union(testRelation2 + .select("bar".as("d"), 'e, 'f) + .select("bar".as("n")) + .select("bar".as("n"), "dummy")).analyze + + comparePlans(optimized, expected) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
