Repository: spark
Updated Branches:
  refs/heads/branch-1.3 92bf888ae -> df671bc36


[SPARK-6376][SQL] Avoid eliminating subqueries until optimization

Previously it was okay to throw away subqueries after analysis, as we would 
never try to use that tree for resolution again.  However, with eager analysis 
in `DataFrame`s this can cause errors for queries such as:

```scala
val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("x.str").count()
```

As a result, in this PR we defer the elimination of subqueries until the 
optimization phase.

Author: Michael Armbrust <[email protected]>

Closes #5160 from marmbrus/subqueriesInDfs and squashes the following commits:

a9bb262 [Michael Armbrust] Update Optimizer.scala
27d25bf [Michael Armbrust] fix hive tests
9137e03 [Michael Armbrust] add type
81cd597 [Michael Armbrust] Avoid eliminating subqueries until optimization

(cherry picked from commit cbeaf9ebab31a0bcbca884d4db7a791fd9edbff3)
Signed-off-by: Michael Armbrust <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df671bc3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df671bc3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df671bc3

Branch: refs/heads/branch-1.3
Commit: df671bc36c1c57456e0b2a2391a2a276b3d38bec
Parents: 92bf888
Author: Michael Armbrust <[email protected]>
Authored: Tue Mar 24 14:08:20 2015 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Tue Mar 24 14:09:09 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala      |  4 +---
 .../org/apache/spark/sql/catalyst/dsl/package.scala |  4 ++--
 .../spark/sql/catalyst/optimizer/Optimizer.scala    |  4 ++++
 .../sql/catalyst/plans/logical/LogicalPlan.scala    | 16 ++++++++++------
 .../spark/sql/catalyst/analysis/AnalysisSuite.scala |  8 ++++++--
 .../scala/org/apache/spark/sql/DataFrameSuite.scala |  7 +++++++
 .../test/scala/org/apache/spark/sql/JoinSuite.scala |  4 ++--
 .../spark/sql/hive/MetastoreDataSourcesSuite.scala  |  2 +-
 .../org/apache/spark/sql/hive/parquetSuites.scala   |  2 +-
 9 files changed, 34 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/df671bc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index c93af79..13d2ae4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -64,9 +64,7 @@ class Analyzer(catalog: Catalog,
       UnresolvedHavingClauseAttributes ::
       TrimGroupingAliases ::
       typeCoercionRules ++
-      extendedResolutionRules : _*),
-    Batch("Remove SubQueries", fixedPoint,
-      EliminateSubQueries)
+      extendedResolutionRules : _*)
   )
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/df671bc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 51a09ac..7f5f617 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
 import scala.language.implicitConversions
 import scala.reflect.runtime.universe.{TypeTag, typeTag}
 
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedGetField, 
UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, 
UnresolvedGetField, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
@@ -289,7 +289,7 @@ package object dsl {
       InsertIntoTable(
         analysis.UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan, 
overwrite)
 
-    def analyze = analysis.SimpleAnalyzer(logicalPlan)
+    def analyze: LogicalPlan = 
EliminateSubQueries(analysis.SimpleAnalyzer(logicalPlan))
   }
 
   object plans {  // scalastyle:ignore

http://git-wip-us.apache.org/repos/asf/spark/blob/df671bc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 1a75fcf..74edaac 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.optimizer
 
 import scala.collection.immutable.HashSet
+import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.FullOuter
@@ -32,6 +33,9 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan]
 
 object DefaultOptimizer extends Optimizer {
   val batches =
+    // SubQueries are only needed for analysis and can be removed before 
execution.
+    Batch("Remove SubQueries", FixedPoint(100),
+      EliminateSubQueries) ::
     Batch("Combine Limits", FixedPoint(100),
       CombineLimits) ::
     Batch("ConstantFolding", FixedPoint(100),

http://git-wip-us.apache.org/repos/asf/spark/blob/df671bc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 0f8b144..b01a61d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedGetField, Resolver}
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, 
UnresolvedGetField, Resolver}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.trees.TreeNode
@@ -73,12 +73,16 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] 
with Logging {
    * can do better should override this function.
    */
   def sameResult(plan: LogicalPlan): Boolean = {
-    plan.getClass == this.getClass &&
-    plan.children.size == children.size && {
-      logDebug(s"[${cleanArgs.mkString(", ")}] == 
[${plan.cleanArgs.mkString(", ")}]")
-      cleanArgs == plan.cleanArgs
+    val cleanLeft = EliminateSubQueries(this)
+    val cleanRight = EliminateSubQueries(plan)
+
+    cleanLeft.getClass == cleanRight.getClass &&
+      cleanLeft.children.size == cleanRight.children.size && {
+      logDebug(
+        s"[${cleanRight.cleanArgs.mkString(", ")}] == 
[${cleanLeft.cleanArgs.mkString(", ")}]")
+      cleanRight.cleanArgs == cleanLeft.cleanArgs
     } &&
-    (plan.children, children).zipped.forall(_ sameResult _)
+    (cleanLeft.children, cleanRight.children).zipped.forall(_ sameResult _)
   }
 
   /** Args that have cleaned such that differences in expression id should not 
affect equality */

http://git-wip-us.apache.org/repos/asf/spark/blob/df671bc3/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 359aec4..756cd36 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -32,9 +32,13 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
   val caseInsensitiveCatalog = new SimpleCatalog(false)
 
   val caseSensitiveAnalyzer =
-    new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitive = 
true)
+    new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitive = 
true) {
+      override val extendedResolutionRules = EliminateSubQueries :: Nil
+    }
   val caseInsensitiveAnalyzer =
-    new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseSensitive 
= false)
+    new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseSensitive 
= false) {
+      override val extendedResolutionRules = EliminateSubQueries :: Nil
+    }
 
   val checkAnalysis = new CheckAnalysis
 

http://git-wip-us.apache.org/repos/asf/spark/blob/df671bc3/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index ff441ef..c30ed69 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -108,6 +108,13 @@ class DataFrameSuite extends QueryTest {
     )
   }
 
+  test("self join with aliases") {
+    val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
+    checkAnswer(
+      df.as('x).join(df.as('y), $"x.str" === 
$"y.str").groupBy("x.str").count(),
+      Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
+  }
+
   test("explode") {
     val df = Seq((1, "a b c"), (2, "a b"), (3, "a")).toDF("number", "letters")
     val df2 =

http://git-wip-us.apache.org/repos/asf/spark/blob/df671bc3/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index dd0948a..e4dee87 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -34,7 +34,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
   test("equi-join is hash-join") {
     val x = testData2.as("x")
     val y = testData2.as("y")
-    val join = x.join(y, $"x.a" === $"y.a", "inner").queryExecution.analyzed
+    val join = x.join(y, $"x.a" === $"y.a", 
"inner").queryExecution.optimizedPlan
     val planned = planner.HashJoin(join)
     assert(planned.size === 1)
   }
@@ -109,7 +109,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
   test("multiple-key equi-join is hash-join") {
     val x = testData2.as("x")
     val y = testData2.as("y")
-    val join = x.join(y, ($"x.a" === $"y.a") && ($"x.b" === 
$"y.b")).queryExecution.analyzed
+    val join = x.join(y, ($"x.a" === $"y.a") && ($"x.b" === 
$"y.b")).queryExecution.optimizedPlan
     val planned = planner.HashJoin(join)
     assert(planned.size === 1)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/df671bc3/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 5d6a6f3..8857ccd 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -579,7 +579,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
BeforeAndAfterEach {
         Row(3) :: Row(4) :: Nil
       )
 
-      table("test_parquet_ctas").queryExecution.analyzed match {
+      table("test_parquet_ctas").queryExecution.optimizedPlan match {
         case LogicalRelation(p: ParquetRelation2) => // OK
         case _ =>
           fail(

http://git-wip-us.apache.org/repos/asf/spark/blob/df671bc3/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 1904f5f..48a39e7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -291,7 +291,7 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
       Seq(Row(1, "str1"))
     )
 
-    table("test_parquet_ctas").queryExecution.analyzed match {
+    table("test_parquet_ctas").queryExecution.optimizedPlan match {
       case LogicalRelation(p: ParquetRelation2) => // OK
       case _ =>
         fail(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to