Repository: spark Updated Branches: refs/heads/master 65697bbea -> 4e290522c
[SPARK-7276] [DATAFRAME] speed up DataFrame.select by collapsing Project Author: Wenchen Fan <[email protected]> Closes #5831 from cloud-fan/7276 and squashes the following commits: ee4a1e1 [Wenchen Fan] fix rebase mistake a3b565d [Wenchen Fan] refactor 99deb5d [Wenchen Fan] add test f1f67ad [Wenchen Fan] fix 7276 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e290522 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e290522 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e290522 Branch: refs/heads/master Commit: 4e290522c2a6310636317c54589dc35c91d95486 Parents: 65697bb Author: Wenchen Fan <[email protected]> Authored: Tue May 12 11:51:55 2015 -0700 Committer: Michael Armbrust <[email protected]> Committed: Tue May 12 11:51:55 2015 -0700 ---------------------------------------------------------------------- .../sql/catalyst/optimizer/Optimizer.scala | 40 ++++++++++++-------- .../optimizer/FilterPushdownSuite.scala | 3 +- .../scala/org/apache/spark/sql/DataFrame.scala | 4 +- .../org/apache/spark/sql/DataFrameSuite.scala | 12 ++++++ 4 files changed, 41 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4e290522/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 1ee5fb2..b163707 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -43,6 +43,7 @@ object DefaultOptimizer extends Optimizer { PushPredicateThroughJoin, PushPredicateThroughGenerate, ColumnPruning, + ProjectCollapsing, CombineLimits) :: Batch("ConstantFolding", FixedPoint(100), NullPropagation, @@ -114,7 +115,7 @@ object UnionPushdown extends Rule[LogicalPlan] { * - Aggregate * - Project <- Join * - LeftSemiJoin - * - Collapse adjacent projections, performing alias substitution. + * - Performing alias substitution. */ object ColumnPruning extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { @@ -152,7 +153,28 @@ object ColumnPruning extends Rule[LogicalPlan] { Join(left, prunedChild(right, allReferences), LeftSemi, condition) - // Combine adjacent Projects. + case Project(projectList, Limit(exp, child)) => + Limit(exp, Project(projectList, child)) + + // Eliminate no-op Projects + case Project(projectList, child) if child.output == projectList => child + } + + /** Applies a projection only when the child is producing unnecessary attributes */ + private def prunedChild(c: LogicalPlan, allReferences: AttributeSet) = + if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) { + Project(allReferences.filter(c.outputSet.contains).toSeq, c) + } else { + c + } +} + +/** + * Combines two adjacent [[Project]] operators into one, merging the + * expressions into one single expression. + */ +object ProjectCollapsing extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case Project(projectList1, Project(projectList2, child)) => // Create a map of Aliases to their values from the child projection. // e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)). @@ -169,21 +191,7 @@ object ColumnPruning extends Rule[LogicalPlan] { }).asInstanceOf[Seq[NamedExpression]] Project(substitutedProjection, child) - - case Project(projectList, Limit(exp, child)) => - Limit(exp, Project(projectList, child)) - - // Eliminate no-op Projects - case Project(projectList, child) if child.output == projectList => child } - - /** Applies a projection only when the child is producing unnecessary attributes */ - private def prunedChild(c: LogicalPlan, allReferences: AttributeSet) = - if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) { - Project(allReferences.filter(c.outputSet.contains).toSeq, c) - } else { - c - } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/4e290522/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 58d415d..0c428f7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -38,7 +38,8 @@ class FilterPushdownSuite extends PlanTest { PushPredicateThroughProject, PushPredicateThroughJoin, PushPredicateThroughGenerate, - ColumnPruning) :: Nil + ColumnPruning, + ProjectCollapsing) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) http://git-wip-us.apache.org/repos/asf/spark/blob/4e290522/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index f3107f7..1f85dac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -567,7 +567,9 @@ class DataFrame private[sql]( case Column(expr: NamedExpression) => expr case Column(expr: Expression) => Alias(expr, expr.prettyString)() } - Project(namedExpressions.toSeq, logicalPlan) + // When user continuously call `select`, speed up analysis by collapsing `Project` + import org.apache.spark.sql.catalyst.optimizer.ProjectCollapsing + Project(namedExpressions.toSeq, ProjectCollapsing(logicalPlan)) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/4e290522/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index d58438e..52aa1f6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -493,4 +493,16 @@ class DataFrameSuite extends QueryTest { testData.dropDuplicates(Seq("value2")), Seq(Row(2, 1, 2), Row(1, 1, 1))) } + + test("SPARK-7276: Project collapse for continuous select") { + var df = testData + for (i <- 1 to 5) { + df = df.select($"*") + } + + import org.apache.spark.sql.catalyst.plans.logical.Project + // make sure df have at most two Projects + val p = df.logicalPlan.asInstanceOf[Project].child.asInstanceOf[Project] + assert(!p.child.isInstanceOf[Project]) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
