Repository: spark
Updated Branches:
refs/heads/master 046c1e2aa -> cbeaf9eba
[SPARK-6376][SQL] Avoid eliminating subqueries until optimization
Previously it was okay to throw away subqueries after analysis, as we would
never try to use that tree for resolution again. However, with eager analysis
in `DataFrame`s this can cause errors for queries such as:
```scala
val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("x.str").count()
```
As a result, in this PR we defer the elimination of subqueries until the
optimization phase.
Author: Michael Armbrust <[email protected]>
Closes #5160 from marmbrus/subqueriesInDfs and squashes the following commits:
a9bb262 [Michael Armbrust] Update Optimizer.scala
27d25bf [Michael Armbrust] fix hive tests
9137e03 [Michael Armbrust] add type
81cd597 [Michael Armbrust] Avoid eliminating subqueries until optimization
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cbeaf9eb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cbeaf9eb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cbeaf9eb
Branch: refs/heads/master
Commit: cbeaf9ebab31a0bcbca884d4db7a791fd9edbff3
Parents: 046c1e2
Author: Michael Armbrust <[email protected]>
Authored: Tue Mar 24 14:08:20 2015 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Tue Mar 24 14:08:20 2015 -0700
----------------------------------------------------------------------
.../spark/sql/catalyst/analysis/Analyzer.scala | 4 +---
.../org/apache/spark/sql/catalyst/dsl/package.scala | 4 ++--
.../spark/sql/catalyst/optimizer/Optimizer.scala | 4 ++++
.../sql/catalyst/plans/logical/LogicalPlan.scala | 16 ++++++++++------
.../spark/sql/catalyst/analysis/AnalysisSuite.scala | 8 ++++++--
.../scala/org/apache/spark/sql/DataFrameSuite.scala | 7 +++++++
.../test/scala/org/apache/spark/sql/JoinSuite.scala | 4 ++--
.../spark/sql/hive/MetastoreDataSourcesSuite.scala | 2 +-
.../org/apache/spark/sql/hive/parquetSuites.scala | 2 +-
9 files changed, 34 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/cbeaf9eb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index c93af79..13d2ae4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -64,9 +64,7 @@ class Analyzer(catalog: Catalog,
UnresolvedHavingClauseAttributes ::
TrimGroupingAliases ::
typeCoercionRules ++
- extendedResolutionRules : _*),
- Batch("Remove SubQueries", fixedPoint,
- EliminateSubQueries)
+ extendedResolutionRules : _*)
)
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/cbeaf9eb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 51a09ac..7f5f617 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
import scala.language.implicitConversions
import scala.reflect.runtime.universe.{TypeTag, typeTag}
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedGetField,
UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries,
UnresolvedGetField, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
@@ -289,7 +289,7 @@ package object dsl {
InsertIntoTable(
analysis.UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan,
overwrite)
- def analyze = analysis.SimpleAnalyzer(logicalPlan)
+ def analyze: LogicalPlan =
EliminateSubQueries(analysis.SimpleAnalyzer(logicalPlan))
}
object plans { // scalastyle:ignore
http://git-wip-us.apache.org/repos/asf/spark/blob/cbeaf9eb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 1a75fcf..74edaac 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.optimizer
import scala.collection.immutable.HashSet
+import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.FullOuter
@@ -32,6 +33,9 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan]
object DefaultOptimizer extends Optimizer {
val batches =
+ // SubQueries are only needed for analysis and can be removed before
execution.
+ Batch("Remove SubQueries", FixedPoint(100),
+ EliminateSubQueries) ::
Batch("Combine Limits", FixedPoint(100),
CombineLimits) ::
Batch("ConstantFolding", FixedPoint(100),
http://git-wip-us.apache.org/repos/asf/spark/blob/cbeaf9eb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 0f8b144..b01a61d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.Logging
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedGetField, Resolver}
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries,
UnresolvedGetField, Resolver}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.trees.TreeNode
@@ -73,12 +73,16 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan]
with Logging {
* can do better should override this function.
*/
def sameResult(plan: LogicalPlan): Boolean = {
- plan.getClass == this.getClass &&
- plan.children.size == children.size && {
- logDebug(s"[${cleanArgs.mkString(", ")}] ==
[${plan.cleanArgs.mkString(", ")}]")
- cleanArgs == plan.cleanArgs
+ val cleanLeft = EliminateSubQueries(this)
+ val cleanRight = EliminateSubQueries(plan)
+
+ cleanLeft.getClass == cleanRight.getClass &&
+ cleanLeft.children.size == cleanRight.children.size && {
+ logDebug(
+ s"[${cleanRight.cleanArgs.mkString(", ")}] ==
[${cleanLeft.cleanArgs.mkString(", ")}]")
+ cleanRight.cleanArgs == cleanLeft.cleanArgs
} &&
- (plan.children, children).zipped.forall(_ sameResult _)
+ (cleanLeft.children, cleanRight.children).zipped.forall(_ sameResult _)
}
/** Args that have cleaned such that differences in expression id should not
affect equality */
http://git-wip-us.apache.org/repos/asf/spark/blob/cbeaf9eb/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 359aec4..756cd36 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -32,9 +32,13 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
val caseInsensitiveCatalog = new SimpleCatalog(false)
val caseSensitiveAnalyzer =
- new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitive =
true)
+ new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitive =
true) {
+ override val extendedResolutionRules = EliminateSubQueries :: Nil
+ }
val caseInsensitiveAnalyzer =
- new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseSensitive
= false)
+ new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseSensitive
= false) {
+ override val extendedResolutionRules = EliminateSubQueries :: Nil
+ }
val checkAnalysis = new CheckAnalysis
http://git-wip-us.apache.org/repos/asf/spark/blob/cbeaf9eb/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index ff441ef..c30ed69 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -108,6 +108,13 @@ class DataFrameSuite extends QueryTest {
)
}
+ test("self join with aliases") {
+ val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
+ checkAnswer(
+ df.as('x).join(df.as('y), $"x.str" ===
$"y.str").groupBy("x.str").count(),
+ Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
+ }
+
test("explode") {
val df = Seq((1, "a b c"), (2, "a b"), (3, "a")).toDF("number", "letters")
val df2 =
http://git-wip-us.apache.org/repos/asf/spark/blob/cbeaf9eb/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index dd0948a..e4dee87 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -34,7 +34,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
test("equi-join is hash-join") {
val x = testData2.as("x")
val y = testData2.as("y")
- val join = x.join(y, $"x.a" === $"y.a", "inner").queryExecution.analyzed
+ val join = x.join(y, $"x.a" === $"y.a",
"inner").queryExecution.optimizedPlan
val planned = planner.HashJoin(join)
assert(planned.size === 1)
}
@@ -109,7 +109,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
test("multiple-key equi-join is hash-join") {
val x = testData2.as("x")
val y = testData2.as("y")
- val join = x.join(y, ($"x.a" === $"y.a") && ($"x.b" ===
$"y.b")).queryExecution.analyzed
+ val join = x.join(y, ($"x.a" === $"y.a") && ($"x.b" ===
$"y.b")).queryExecution.optimizedPlan
val planned = planner.HashJoin(join)
assert(planned.size === 1)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/cbeaf9eb/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index ff2e6ea..e5ad0bf 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -579,7 +579,7 @@ class MetastoreDataSourcesSuite extends QueryTest with
BeforeAndAfterEach {
Row(3) :: Row(4) :: Nil
)
- table("test_parquet_ctas").queryExecution.analyzed match {
+ table("test_parquet_ctas").queryExecution.optimizedPlan match {
case LogicalRelation(p: ParquetRelation2) => // OK
case _ =>
fail(
http://git-wip-us.apache.org/repos/asf/spark/blob/cbeaf9eb/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index d891c4e..8a31bd0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -292,7 +292,7 @@ class ParquetDataSourceOnMetastoreSuite extends
ParquetMetastoreSuiteBase {
Seq(Row(1, "str1"))
)
- table("test_parquet_ctas").queryExecution.analyzed match {
+ table("test_parquet_ctas").queryExecution.optimizedPlan match {
case LogicalRelation(p: ParquetRelation2) => // OK
case _ =>
fail(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]