Repository: spark Updated Branches: refs/heads/master 893018183 -> f37398699
[SPARK-13383][SQL] Keep broadcast hint after column pruning JIRA: https://issues.apache.org/jira/browse/SPARK-13383 ## What changes were proposed in this pull request? When we do column pruning in Optimizer, we put additional Project on top of a logical plan. However, when we already wrap a BroadcastHint on a logical plan, the added Project will hide BroadcastHint after later execution. We should take care of BroadcastHint when we do column pruning. ## How was the this patch tested? Unit test is added. Author: Liang-Chi Hsieh <[email protected]> Closes #11260 from viirya/keep-broadcasthint. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f3739869 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f3739869 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f3739869 Branch: refs/heads/master Commit: f3739869973ba4285196a61775d891292b8e282b Parents: 8930181 Author: Liang-Chi Hsieh <[email protected]> Authored: Wed Feb 24 10:22:40 2016 -0800 Committer: Michael Armbrust <[email protected]> Committed: Wed Feb 24 10:22:40 2016 -0800 ---------------------------------------------------------------------- .../catalyst/plans/logical/basicOperators.scala | 4 + .../optimizer/JoinOptimizationSuite.scala | 122 +++++++++++++++++++ .../sql/catalyst/optimizer/JoinOrderSuite.scala | 95 --------------- .../spark/sql/execution/SparkStrategies.scala | 12 +- 4 files changed, 133 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f3739869/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index af43cb3..5d2a65b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -332,6 +332,10 @@ case class Join( */ case class BroadcastHint(child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output + + // We manually set statistics of BroadcastHint to smallest value to make sure + // the plan wrapped by BroadcastHint will be considered to broadcast later. + override def statistics: Statistics = Statistics(sizeInBytes = 1) } case class InsertIntoTable( http://git-wip-us.apache.org/repos/asf/spark/blob/f3739869/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala new file mode 100644 index 0000000..d482519 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins +import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.RuleExecutor + + +class JoinOptimizationSuite extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("Subqueries", Once, + EliminateSubqueryAliases) :: + Batch("Filter Pushdown", FixedPoint(100), + CombineFilters, + PushPredicateThroughProject, + BooleanSimplification, + ReorderJoin, + PushPredicateThroughJoin, + PushPredicateThroughGenerate, + PushPredicateThroughAggregate, + ColumnPruning, + CollapseProject) :: Nil + + } + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + val testRelation1 = LocalRelation('d.int) + + test("extract filters and joins") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + val z = testRelation.subquery('z) + + def testExtract(plan: LogicalPlan, expected: Option[(Seq[LogicalPlan], Seq[Expression])]) { + assert(ExtractFiltersAndInnerJoins.unapply(plan) === expected) + } + + testExtract(x, None) + testExtract(x.where("x.b".attr === 1), None) + testExtract(x.join(y), Some(Seq(x, y), Seq())) + testExtract(x.join(y, condition = Some("x.b".attr === "y.d".attr)), + Some(Seq(x, y), Seq("x.b".attr === "y.d".attr))) + testExtract(x.join(y).where("x.b".attr === "y.d".attr), + Some(Seq(x, y), Seq("x.b".attr === "y.d".attr))) + testExtract(x.join(y).join(z), Some(Seq(x, y, z), Seq())) + testExtract(x.join(y).where("x.b".attr === "y.d".attr).join(z), + Some(Seq(x, y, z), Seq("x.b".attr === "y.d".attr))) + testExtract(x.join(y).join(x.join(z)), Some(Seq(x, y, x.join(z)), Seq())) + testExtract(x.join(y).join(x.join(z)).where("x.b".attr === "y.d".attr), + Some(Seq(x, y, x.join(z)), Seq("x.b".attr === "y.d".attr))) + } + + test("reorder inner joins") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + val z = testRelation.subquery('z) + + val originalQuery = { + x.join(y).join(z) + .where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr)) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + x.join(z, condition = Some("x.b".attr === "z.b".attr)) + .join(y, condition = Some("y.d".attr === "z.a".attr)) + .analyze + + comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) + } + + test("broadcasthint sets relation statistics to smallest value") { + val input = LocalRelation('key.int, 'value.string) + + val query = + Project(Seq($"x.key", $"y.key"), + Join( + SubqueryAlias("x", input), + BroadcastHint(SubqueryAlias("y", input)), Inner, None)).analyze + + val optimized = Optimize.execute(query) + + val expected = + Project(Seq($"x.key", $"y.key"), + Join( + Project(Seq($"x.key"), SubqueryAlias("x", input)), + BroadcastHint( + Project(Seq($"y.key"), SubqueryAlias("y", input))), + Inner, None)).analyze + + comparePlans(optimized, expected) + + val broadcastChildren = optimized.collect { + case Join(_, r, _, _) if r.statistics.sizeInBytes == 1 => r + } + assert(broadcastChildren.size == 1) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f3739869/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala deleted file mode 100644 index a5b487b..0000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.optimizer - -import org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases -import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins -import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} -import org.apache.spark.sql.catalyst.rules.RuleExecutor - - -class JoinOrderSuite extends PlanTest { - - object Optimize extends RuleExecutor[LogicalPlan] { - val batches = - Batch("Subqueries", Once, - EliminateSubqueryAliases) :: - Batch("Filter Pushdown", Once, - CombineFilters, - PushPredicateThroughProject, - BooleanSimplification, - ReorderJoin, - PushPredicateThroughJoin, - PushPredicateThroughGenerate, - PushPredicateThroughAggregate, - ColumnPruning, - CollapseProject) :: Nil - - } - - val testRelation = LocalRelation('a.int, 'b.int, 'c.int) - val testRelation1 = LocalRelation('d.int) - - test("extract filters and joins") { - val x = testRelation.subquery('x) - val y = testRelation1.subquery('y) - val z = testRelation.subquery('z) - - def testExtract(plan: LogicalPlan, expected: Option[(Seq[LogicalPlan], Seq[Expression])]) { - assert(ExtractFiltersAndInnerJoins.unapply(plan) === expected) - } - - testExtract(x, None) - testExtract(x.where("x.b".attr === 1), None) - testExtract(x.join(y), Some(Seq(x, y), Seq())) - testExtract(x.join(y, condition = Some("x.b".attr === "y.d".attr)), - Some(Seq(x, y), Seq("x.b".attr === "y.d".attr))) - testExtract(x.join(y).where("x.b".attr === "y.d".attr), - Some(Seq(x, y), Seq("x.b".attr === "y.d".attr))) - testExtract(x.join(y).join(z), Some(Seq(x, y, z), Seq())) - testExtract(x.join(y).where("x.b".attr === "y.d".attr).join(z), - Some(Seq(x, y, z), Seq("x.b".attr === "y.d".attr))) - testExtract(x.join(y).join(x.join(z)), Some(Seq(x, y, x.join(z)), Seq())) - testExtract(x.join(y).join(x.join(z)).where("x.b".attr === "y.d".attr), - Some(Seq(x, y, x.join(z)), Seq("x.b".attr === "y.d".attr))) - } - - test("reorder inner joins") { - val x = testRelation.subquery('x) - val y = testRelation1.subquery('y) - val z = testRelation.subquery('z) - - val originalQuery = { - x.join(y).join(z) - .where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr)) - } - - val optimized = Optimize.execute(originalQuery.analyze) - val correctAnswer = - x.join(z, condition = Some("x.b".attr === "z.b".attr)) - .join(y, condition = Some("y.d".attr === "z.a".attr)) - .analyze - - comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/f3739869/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 7347156..247eb05 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -81,11 +81,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * Matches a plan whose output should be small enough to be used in broadcast join. */ object CanBroadcast { - def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match { - case BroadcastHint(p) => Some(p) - case p if sqlContext.conf.autoBroadcastJoinThreshold > 0 && - p.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold => Some(p) - case _ => None + def unapply(plan: LogicalPlan): Option[LogicalPlan] = { + if (sqlContext.conf.autoBroadcastJoinThreshold > 0 && + plan.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold) { + Some(plan) + } else { + None + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
