Repository: spark Updated Branches: refs/heads/master b55563c17 -> 733c59ec1
[SPARK-16475][SQL] broadcast hint for SQL queries - follow up ## What changes were proposed in this pull request? A small update to https://github.com/apache/spark/pull/16925 1. Rename SubstituteHints -> ResolveHints to be more consistent with rest of the rules. 2. Added more documentation in the rule and be more defensive / future proof to skip views as well as CTEs. ## How was this patch tested? This pull request contains no real logic change and all behavior should be covered by existing tests. Author: Reynold Xin <r...@databricks.com> Closes #16939 from rxin/SPARK-16475. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/733c59ec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/733c59ec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/733c59ec Branch: refs/heads/master Commit: 733c59ec1ee5746c322e68459cd06241f5fa0903 Parents: b55563c Author: Reynold Xin <r...@databricks.com> Authored: Wed Feb 15 17:10:49 2017 +0100 Committer: Herman van Hovell <hvanhov...@databricks.com> Committed: Wed Feb 15 17:10:49 2017 +0100 ---------------------------------------------------------------------- .../spark/sql/catalyst/analysis/Analyzer.scala | 4 +- .../sql/catalyst/analysis/ResolveHints.scala | 103 ++++++++++++++++ .../sql/catalyst/analysis/SubstituteHints.scala | 104 ---------------- .../catalyst/analysis/ResolveHintsSuite.scala | 120 ++++++++++++++++++ .../analysis/SubstituteHintsSuite.scala | 121 ------------------- 5 files changed, 225 insertions(+), 227 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/733c59ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 8348cb5..6aa0e8d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -115,8 +115,8 @@ class Analyzer( lazy val batches: Seq[Batch] = Seq( Batch("Hints", fixedPoint, - new SubstituteHints.SubstituteBroadcastHints(conf), - SubstituteHints.RemoveAllHints), + new ResolveHints.ResolveBroadcastHints(conf), + ResolveHints.RemoveAllHints), Batch("Substitution", fixedPoint, CTESubstitution, WindowsSubstitution, http://git-wip-us.apache.org/repos/asf/spark/blob/733c59ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala new file mode 100644 index 0000000..2124177 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.CurrentOrigin + + +/** + * Collection of rules related to hints. The only hint currently available is broadcast join hint. + * + * Note that this is separatedly into two rules because in the future we might introduce new hint + * rules that have different ordering requirements from broadcast. + */ +object ResolveHints { + + /** + * For broadcast hint, we accept "BROADCAST", "BROADCASTJOIN", and "MAPJOIN", and a sequence of + * relation aliases can be specified in the hint. A broadcast hint plan node will be inserted + * on top of any relation (that is not aliased differently), subquery, or common table expression + * that match the specified name. + * + * The hint resolution works by recursively traversing down the query plan to find a relation or + * subquery that matches one of the specified broadcast aliases. The traversal does not go past + * beyond any existing broadcast hints, subquery aliases. + * + * This rule must happen before common table expressions. + */ + class ResolveBroadcastHints(conf: CatalystConf) extends Rule[LogicalPlan] { + private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN") + + def resolver: Resolver = conf.resolver + + private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = { + // Whether to continue recursing down the tree + var recurse = true + + val newNode = CurrentOrigin.withOrigin(plan.origin) { + plan match { + case r: UnresolvedRelation => + val alias = r.alias.getOrElse(r.tableIdentifier.table) + if (toBroadcast.exists(resolver(_, alias))) BroadcastHint(plan) else plan + + case r: SubqueryAlias if toBroadcast.exists(resolver(_, r.alias)) => + BroadcastHint(plan) + + case _: BroadcastHint | _: View | _: With | _: SubqueryAlias => + // Don't traverse down these nodes. + // For an existing broadcast hint, there is no point going down (if we do, we either + // won't change the structure, or will introduce another broadcast hint that is useless. + // The rest (view, with, subquery) indicates different scopes that we shouldn't traverse + // down. Note that technically when this rule is executed, we haven't completed view + // resolution yet and as a result the view part should be deadcode. I'm leaving it here + // to be more future proof in case we change the view we do view resolution. + recurse = false + plan + + case _ => + plan + } + } + + if ((plan fastEquals newNode) && recurse) { + newNode.mapChildren(child => applyBroadcastHint(child, toBroadcast)) + } else { + newNode + } + } + + def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + case h: Hint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase) => + applyBroadcastHint(h.child, h.parameters.toSet) + } + } + + /** + * Removes all the hints, used to remove invalid hints provided by the user. + * This must be executed after all the other hint rules are executed. + */ + object RemoveAllHints extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + case h: Hint => h.child + } + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/733c59ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHints.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHints.scala deleted file mode 100644 index fda4d1b..0000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHints.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.analysis - -import org.apache.spark.sql.catalyst.CatalystConf -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.trees.CurrentOrigin - - -/** - * Collection of rules related to hints. The only hint currently available is broadcast join hint. - * - * Note that this is separatedly into two rules because in the future we might introduce new hint - * rules that have different ordering requirements from broadcast. - */ -object SubstituteHints { - - /** - * Substitute Hints. - * - * The only hint currently available is broadcast join hint. - * - * For broadcast hint, we accept "BROADCAST", "BROADCASTJOIN", and "MAPJOIN", and a sequence of - * relation aliases can be specified in the hint. A broadcast hint plan node will be inserted - * on top of any relation (that is not aliased differently), subquery, or common table expression - * that match the specified name. - * - * The hint resolution works by recursively traversing down the query plan to find a relation or - * subquery that matches one of the specified broadcast aliases. The traversal does not go past - * beyond any existing broadcast hints, subquery aliases. - * - * This rule must happen before common table expressions. - */ - class SubstituteBroadcastHints(conf: CatalystConf) extends Rule[LogicalPlan] { - private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN") - - def resolver: Resolver = conf.resolver - - private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = { - // Whether to continue recursing down the tree - var recurse = true - - val newNode = CurrentOrigin.withOrigin(plan.origin) { - plan match { - case r: UnresolvedRelation => - val alias = r.alias.getOrElse(r.tableIdentifier.table) - if (toBroadcast.exists(resolver(_, alias))) BroadcastHint(plan) else plan - case r: SubqueryAlias => - if (toBroadcast.exists(resolver(_, r.alias))) { - BroadcastHint(plan) - } else { - // Don't recurse down subquery aliases if there are no match. - recurse = false - plan - } - case _: BroadcastHint => - // Found a broadcast hint; don't change the plan but also don't recurse down. - recurse = false - plan - case _ => - plan - } - } - - if ((plan fastEquals newNode) && recurse) { - newNode.mapChildren(child => applyBroadcastHint(child, toBroadcast)) - } else { - newNode - } - } - - def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - case h: Hint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase) => - applyBroadcastHint(h.child, h.parameters.toSet) - } - } - - /** - * Removes all the hints, used to remove invalid hints provided by the user. - * This must be executed after all the other hint rules are executed. - */ - object RemoveAllHints extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - case h: Hint => h.child - } - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/733c59ec/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala new file mode 100644 index 0000000..d101e22 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.logical._ + +class ResolveHintsSuite extends AnalysisTest { + import org.apache.spark.sql.catalyst.analysis.TestRelations._ + + test("invalid hints should be ignored") { + checkAnalysis( + Hint("some_random_hint_that_does_not_exist", Seq("TaBlE"), table("TaBlE")), + testRelation, + caseSensitive = false) + } + + test("case-sensitive or insensitive parameters") { + checkAnalysis( + Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")), + BroadcastHint(testRelation), + caseSensitive = false) + + checkAnalysis( + Hint("MAPJOIN", Seq("table"), table("TaBlE")), + BroadcastHint(testRelation), + caseSensitive = false) + + checkAnalysis( + Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")), + BroadcastHint(testRelation), + caseSensitive = true) + + checkAnalysis( + Hint("MAPJOIN", Seq("table"), table("TaBlE")), + testRelation, + caseSensitive = true) + } + + test("multiple broadcast hint aliases") { + checkAnalysis( + Hint("MAPJOIN", Seq("table", "table2"), table("table").join(table("table2"))), + Join(BroadcastHint(testRelation), BroadcastHint(testRelation2), Inner, None), + caseSensitive = false) + } + + test("do not traverse past existing broadcast hints") { + checkAnalysis( + Hint("MAPJOIN", Seq("table"), BroadcastHint(table("table").where('a > 1))), + BroadcastHint(testRelation.where('a > 1)).analyze, + caseSensitive = false) + } + + test("should work for subqueries") { + checkAnalysis( + Hint("MAPJOIN", Seq("tableAlias"), table("table").as("tableAlias")), + BroadcastHint(testRelation), + caseSensitive = false) + + checkAnalysis( + Hint("MAPJOIN", Seq("tableAlias"), table("table").subquery('tableAlias)), + BroadcastHint(testRelation), + caseSensitive = false) + + // Negative case: if the alias doesn't match, don't match the original table name. + checkAnalysis( + Hint("MAPJOIN", Seq("table"), table("table").as("tableAlias")), + testRelation, + caseSensitive = false) + } + + test("do not traverse past subquery alias") { + checkAnalysis( + Hint("MAPJOIN", Seq("table"), table("table").where('a > 1).subquery('tableAlias)), + testRelation.where('a > 1).analyze, + caseSensitive = false) + } + + test("should work for CTE") { + checkAnalysis( + CatalystSqlParser.parsePlan( + """ + |WITH ctetable AS (SELECT * FROM table WHERE a > 1) + |SELECT /*+ BROADCAST(ctetable) */ * FROM ctetable + """.stripMargin + ), + BroadcastHint(testRelation.where('a > 1).select('a)).select('a).analyze, + caseSensitive = false) + } + + test("should not traverse down CTE") { + checkAnalysis( + CatalystSqlParser.parsePlan( + """ + |WITH ctetable AS (SELECT * FROM table WHERE a > 1) + |SELECT /*+ BROADCAST(table) */ * FROM ctetable + """.stripMargin + ), + testRelation.where('a > 1).select('a).select('a).analyze, + caseSensitive = false) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/733c59ec/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHintsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHintsSuite.scala deleted file mode 100644 index 9d671f3..0000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SubstituteHintsSuite.scala +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.analysis - -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.plans.Inner -import org.apache.spark.sql.catalyst.plans.logical._ - -class SubstituteHintsSuite extends AnalysisTest { - import org.apache.spark.sql.catalyst.analysis.TestRelations._ - - test("invalid hints should be ignored") { - checkAnalysis( - Hint("some_random_hint_that_does_not_exist", Seq("TaBlE"), table("TaBlE")), - testRelation, - caseSensitive = false) - } - - test("case-sensitive or insensitive parameters") { - checkAnalysis( - Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")), - BroadcastHint(testRelation), - caseSensitive = false) - - checkAnalysis( - Hint("MAPJOIN", Seq("table"), table("TaBlE")), - BroadcastHint(testRelation), - caseSensitive = false) - - checkAnalysis( - Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")), - BroadcastHint(testRelation), - caseSensitive = true) - - checkAnalysis( - Hint("MAPJOIN", Seq("table"), table("TaBlE")), - testRelation, - caseSensitive = true) - } - - test("multiple broadcast hint aliases") { - checkAnalysis( - Hint("MAPJOIN", Seq("table", "table2"), table("table").join(table("table2"))), - Join(BroadcastHint(testRelation), BroadcastHint(testRelation2), Inner, None), - caseSensitive = false) - } - - test("do not traverse past existing broadcast hints") { - checkAnalysis( - Hint("MAPJOIN", Seq("table"), BroadcastHint(table("table").where('a > 1))), - BroadcastHint(testRelation.where('a > 1)).analyze, - caseSensitive = false) - } - - test("should work for subqueries") { - checkAnalysis( - Hint("MAPJOIN", Seq("tableAlias"), table("table").as("tableAlias")), - BroadcastHint(testRelation), - caseSensitive = false) - - checkAnalysis( - Hint("MAPJOIN", Seq("tableAlias"), table("table").subquery('tableAlias)), - BroadcastHint(testRelation), - caseSensitive = false) - - // Negative case: if the alias doesn't match, don't match the original table name. - checkAnalysis( - Hint("MAPJOIN", Seq("table"), table("table").as("tableAlias")), - testRelation, - caseSensitive = false) - } - - test("do not traverse past subquery alias") { - checkAnalysis( - Hint("MAPJOIN", Seq("table"), table("table").where('a > 1).subquery('tableAlias)), - testRelation.where('a > 1).analyze, - caseSensitive = false) - } - - test("should work for CTE") { - checkAnalysis( - CatalystSqlParser.parsePlan( - """ - |WITH ctetable AS (SELECT * FROM table WHERE a > 1) - |SELECT /*+ BROADCAST(ctetable) */ * FROM ctetable - """.stripMargin - ), - BroadcastHint(testRelation.where('a > 1).select('a)).select('a).analyze, - caseSensitive = false) - } - - test("should not traverse down CTE") { - checkAnalysis( - CatalystSqlParser.parsePlan( - """ - |WITH ctetable AS (SELECT * FROM table WHERE a > 1) - |SELECT /*+ BROADCAST(table) */ * FROM ctetable - """.stripMargin - ), - testRelation.where('a > 1).select('a).select('a).analyze, - caseSensitive = false) - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org