Repository: spark
Updated Branches:
  refs/heads/branch-2.3 dad5c48b2 -> 7edfdfcec


[SPARK-25450][SQL] PushProjectThroughUnion rule uses the same exprId for 
project expressions in each Union child, causing mistakes in constant 
propagation

## What changes were proposed in this pull request?

The problem was cause by the PushProjectThroughUnion rule, which, when creating 
new Project for each child of Union, uses the same exprId for expressions of 
the same position. This is wrong because, for each child of Union, the 
expressions are all independent, and it can lead to a wrong result if other 
rules like FoldablePropagation kicks in, taking two different expressions as 
the same.

This fix is to create new expressions in the new Project for each child of 
Union.

## How was this patch tested?

Added UT.

Closes #22447 from maryannxue/push-project-thru-union-bug.

Authored-by: maryannxue <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
(cherry picked from commit 88446b6ad19371f15d06ef67052f6c1a8072c04a)
Signed-off-by: gatorsmile <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7edfdfce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7edfdfce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7edfdfce

Branch: refs/heads/branch-2.3
Commit: 7edfdfcecd07f02ecd9bda8f62c00d32884e4de8
Parents: dad5c48
Author: maryannxue <[email protected]>
Authored: Thu Sep 20 10:00:28 2018 -0700
Committer: gatorsmile <[email protected]>
Committed: Thu Sep 20 10:01:20 2018 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      |  4 ++
 .../PushProjectThroughUnionSuite.scala          | 54 ++++++++++++++++++++
 2 files changed, 58 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7edfdfce/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 38799c1..ff206d4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -388,6 +388,10 @@ object PushProjectionThroughUnion extends 
Rule[LogicalPlan] with PredicateHelper
   private def pushToRight[A <: Expression](e: A, rewrites: 
AttributeMap[Attribute]) = {
     val result = e transform {
       case a: Attribute => rewrites(a)
+    } match {
+      // Make sure exprId is unique in each child of Union.
+      case Alias(child, alias) => Alias(child, alias)()
+      case other => other
     }
 
     // We must promise the compiler that we did not discard the names in the 
case of project

http://git-wip-us.apache.org/repos/asf/spark/blob/7edfdfce/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectThroughUnionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectThroughUnionSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectThroughUnionSuite.scala
new file mode 100644
index 0000000..294d298
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectThroughUnionSuite.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class PushProjectThroughUnionSuite extends PlanTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches = Batch("Optimizer Batch", FixedPoint(100),
+      PushProjectionThroughUnion,
+      FoldablePropagation) :: Nil
+  }
+
+  test("SPARK-25450 PushProjectThroughUnion rule uses the same exprId for 
project expressions " +
+    "in each Union child, causing mistakes in constant propagation") {
+    val testRelation1 = LocalRelation('a.string, 'b.int, 'c.string)
+    val testRelation2 = LocalRelation('d.string, 'e.int, 'f.string)
+    val query = testRelation1
+      .union(testRelation2.select("bar".as("d"), 'e, 'f))
+      .select('a.as("n"))
+      .select('n, "dummy").analyze
+    val optimized = Optimize.execute(query)
+
+    val expected = testRelation1
+      .select('a.as("n"))
+      .select('n, "dummy")
+      .union(testRelation2
+        .select("bar".as("d"), 'e, 'f)
+        .select("bar".as("n"))
+        .select("bar".as("n"), "dummy")).analyze
+
+    comparePlans(optimized, expected)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to