Repository: spark Updated Branches: refs/heads/master 82066a166 -> 30c18841e
Revert "[SPARK-13840][SQL] Split Optimizer Rule ColumnPruning to ColumnPruning and EliminateOperator" This reverts commit 99bd2f0e94657687834c5c59c4270c1484c9f595. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30c18841 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30c18841 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30c18841 Branch: refs/heads/master Commit: 30c18841e40abe768c015104f156dacf02e520eb Parents: 82066a1 Author: Davies Liu <[email protected]> Authored: Wed Mar 16 23:11:13 2016 -0700 Committer: Davies Liu <[email protected]> Committed: Wed Mar 16 23:11:13 2016 -0700 ---------------------------------------------------------------------- .../sql/catalyst/optimizer/Optimizer.scala | 31 +++++++------------- .../catalyst/optimizer/ColumnPruningSuite.scala | 5 ++-- .../optimizer/CombiningLimitsSuite.scala | 3 +- .../optimizer/JoinOptimizationSuite.scala | 1 - 4 files changed, 14 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/30c18841/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 3f57b07..d0e5859 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -71,7 +71,6 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { PushPredicateThroughAggregate, LimitPushDown, ColumnPruning, - EliminateOperators, InferFiltersFromConstraints, // Operator combine CollapseRepartition, @@ -316,7 +315,11 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { * - LeftSemiJoin */ object ColumnPruning extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean = + output1.size == output2.size && + output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2)) + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { // Prunes the unused columns from project list of Project/Aggregate/Expand case p @ Project(_, p2: Project) if (p2.outputSet -- p.references).nonEmpty => p.copy(child = p2.copy(projectList = p2.projectList.filter(p.references.contains))) @@ -377,6 +380,12 @@ object ColumnPruning extends Rule[LogicalPlan] { p.copy(child = w.copy( windowExpressions = w.windowExpressions.filter(p.references.contains))) + // Eliminate no-op Window + case w: Window if w.windowExpressions.isEmpty => w.child + + // Eliminate no-op Projects + case p @ Project(projectList, child) if sameOutput(child.output, p.output) => child + // Can't prune the columns on LeafNode case p @ Project(_, l: LeafNode) => p @@ -401,24 +410,6 @@ object ColumnPruning extends Rule[LogicalPlan] { } /** - * Eliminate no-op Project and Window. - * - * Note: this rule should be executed just after ColumnPruning. - */ -object EliminateOperators extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - // Eliminate no-op Projects - case p @ Project(projectList, child) if sameOutput(child.output, p.output) => child - // Eliminate no-op Window - case w: Window if w.windowExpressions.isEmpty => w.child - } - - private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean = - output1.size == output2.size && - output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2)) -} - -/** * Combines two adjacent [[Project]] operators into one and perform alias substitution, * merging the expressions into one single expression. */ http://git-wip-us.apache.org/repos/asf/spark/blob/30c18841/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 6187fb9..dd7d65d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -35,7 +35,6 @@ class ColumnPruningSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Column pruning", FixedPoint(100), ColumnPruning, - EliminateOperators, CollapseProject) :: Nil } @@ -328,8 +327,8 @@ class ColumnPruningSuite extends PlanTest { val input2 = LocalRelation('c.int, 'd.string, 'e.double) val query = Project('b :: Nil, Union(input1 :: input2 :: Nil)).analyze - val expected = - Union(Project('b :: Nil, input1) :: Project('d :: Nil, input2) :: Nil).analyze + val expected = Project('b :: Nil, + Union(Project('b :: Nil, input1) :: Project('d :: Nil, input2) :: Nil)).analyze comparePlans(Optimize.execute(query), expected) } http://git-wip-us.apache.org/repos/asf/spark/blob/30c18841/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala index e0e9b6d..87ad81d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala @@ -28,8 +28,7 @@ class CombiningLimitsSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Filter Pushdown", FixedPoint(100), - ColumnPruning, - EliminateOperators) :: + ColumnPruning) :: Batch("Combine Limit", FixedPoint(10), CombineLimits) :: Batch("Constant Folding", FixedPoint(10), http://git-wip-us.apache.org/repos/asf/spark/blob/30c18841/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala index 51468fa..e2f8146 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala @@ -43,7 +43,6 @@ class JoinOptimizationSuite extends PlanTest { PushPredicateThroughGenerate, PushPredicateThroughAggregate, ColumnPruning, - EliminateOperators, CollapseProject) :: Nil } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
