Repository: spark
Updated Branches:
  refs/heads/master ad09e4ca0 -> 0d589ba00


[SPARK-20857][SQL] Generic resolved hint node

## What changes were proposed in this pull request?
This patch renames BroadcastHint to ResolvedHint (and Hint to UnresolvedHint) 
so the hint framework is more generic and would allow us to introduce other 
hint types in the future without introducing new hint nodes.

## How was this patch tested?
Updated test cases.

Author: Reynold Xin <r...@databricks.com>

Closes #18072 from rxin/SPARK-20857.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d589ba0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d589ba0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d589ba0

Branch: refs/heads/master
Commit: 0d589ba00b5d539fbfef5174221de046a70548cd
Parents: ad09e4c
Author: Reynold Xin <r...@databricks.com>
Authored: Tue May 23 18:44:49 2017 +0200
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue May 23 18:44:49 2017 +0200

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  2 +-
 .../sql/catalyst/analysis/CheckAnalysis.scala   |  2 +-
 .../sql/catalyst/analysis/ResolveHints.scala    | 12 ++---
 .../sql/catalyst/optimizer/Optimizer.scala      |  2 +-
 .../sql/catalyst/optimizer/expressions.scala    |  2 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala  |  4 +-
 .../spark/sql/catalyst/planning/patterns.scala  |  4 +-
 .../sql/catalyst/plans/logical/Statistics.scala |  5 ++
 .../plans/logical/basicLogicalOperators.scala   | 22 +--------
 .../sql/catalyst/plans/logical/hints.scala      | 49 ++++++++++++++++++++
 .../catalyst/analysis/ResolveHintsSuite.scala   | 41 ++++++++--------
 .../catalyst/optimizer/ColumnPruningSuite.scala |  5 +-
 .../optimizer/FilterPushdownSuite.scala         |  4 +-
 .../optimizer/JoinOptimizationSuite.scala       |  4 +-
 .../sql/catalyst/parser/PlanParserSuite.scala   | 15 +++---
 .../BasicStatsEstimationSuite.scala             |  2 +-
 .../scala/org/apache/spark/sql/Dataset.scala    |  2 +-
 .../spark/sql/execution/SparkStrategies.scala   |  2 +-
 .../scala/org/apache/spark/sql/functions.scala  |  5 +-
 .../execution/joins/BroadcastJoinSuite.scala    | 14 +++---
 20 files changed, 118 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0d589ba0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index d58b8ac..d130962 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1336,7 +1336,7 @@ class Analyzer(
 
         // Category 1:
         // BroadcastHint, Distinct, LeafNode, Repartition, and SubqueryAlias
-        case _: BroadcastHint | _: Distinct | _: LeafNode | _: Repartition | 
_: SubqueryAlias =>
+        case _: ResolvedHint | _: Distinct | _: LeafNode | _: Repartition | _: 
SubqueryAlias =>
 
         // Category 2:
         // These operators can be anywhere in a correlated subquery.

http://git-wip-us.apache.org/repos/asf/spark/blob/0d589ba0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index ea4560a..2e3ac3e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -399,7 +399,7 @@ trait CheckAnalysis extends PredicateHelper {
                  |in operator ${operator.simpleString}
                """.stripMargin)
 
-          case _: Hint =>
+          case _: UnresolvedHint =>
             throw new IllegalStateException(
               "Internal error: logical hint operator should have been removed 
during analysis")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0d589ba0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
index df688fa..9dfd84c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
@@ -57,11 +57,11 @@ object ResolveHints {
       val newNode = CurrentOrigin.withOrigin(plan.origin) {
         plan match {
           case u: UnresolvedRelation if toBroadcast.exists(resolver(_, 
u.tableIdentifier.table)) =>
-            BroadcastHint(plan)
+            ResolvedHint(plan, isBroadcastable = Option(true))
           case r: SubqueryAlias if toBroadcast.exists(resolver(_, r.alias)) =>
-            BroadcastHint(plan)
+            ResolvedHint(plan, isBroadcastable = Option(true))
 
-          case _: BroadcastHint | _: View | _: With | _: SubqueryAlias =>
+          case _: ResolvedHint | _: View | _: With | _: SubqueryAlias =>
             // Don't traverse down these nodes.
             // For an existing broadcast hint, there is no point going down 
(if we do, we either
             // won't change the structure, or will introduce another broadcast 
hint that is useless.
@@ -85,10 +85,10 @@ object ResolveHints {
     }
 
     def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-      case h: Hint if 
BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
+      case h: UnresolvedHint if 
BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
         if (h.parameters.isEmpty) {
           // If there is no table alias specified, turn the entire subtree 
into a BroadcastHint.
-          BroadcastHint(h.child)
+          ResolvedHint(h.child, isBroadcastable = Option(true))
         } else {
           // Otherwise, find within the subtree query plans that should be 
broadcasted.
           applyBroadcastHint(h.child, h.parameters.toSet)
@@ -102,7 +102,7 @@ object ResolveHints {
    */
   object RemoveAllHints extends Rule[LogicalPlan] {
     def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-      case h: Hint => h.child
+      case h: UnresolvedHint => h.child
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0d589ba0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 1802cd4..ae2f6bf 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -862,7 +862,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with 
PredicateHelper {
     // Note that some operators (e.g. project, aggregate, union) are being 
handled separately
     // (earlier in this rule).
     case _: AppendColumns => true
-    case _: BroadcastHint => true
+    case _: ResolvedHint => true
     case _: Distinct => true
     case _: Generate => true
     case _: Pivot => true

http://git-wip-us.apache.org/repos/asf/spark/blob/0d589ba0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
index d3ef5ea..8931eb2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
@@ -478,7 +478,7 @@ object FoldablePropagation extends Rule[LogicalPlan] {
     case _: Distinct => true
     case _: AppendColumns => true
     case _: AppendColumnsWithObject => true
-    case _: BroadcastHint => true
+    case _: ResolvedHint => true
     case _: RepartitionByExpression => true
     case _: Repartition => true
     case _: Sort => true

http://git-wip-us.apache.org/repos/asf/spark/blob/0d589ba0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index f033fd4..7d2e3a6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -533,13 +533,13 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
   }
 
   /**
-   * Add a [[Hint]] to a logical plan.
+   * Add a [[UnresolvedHint]] to a logical plan.
    */
   private def withHints(
       ctx: HintContext,
       query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
     val stmt = ctx.hintStatement
-    Hint(stmt.hintName.getText, stmt.parameters.asScala.map(_.getText), query)
+    UnresolvedHint(stmt.hintName.getText, 
stmt.parameters.asScala.map(_.getText), query)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/0d589ba0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index d39b0ef..ef925f9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -65,8 +65,8 @@ object PhysicalOperation extends PredicateHelper {
         val substitutedCondition = substitute(aliases)(condition)
         (fields, filters ++ splitConjunctivePredicates(substitutedCondition), 
other, aliases)
 
-      case BroadcastHint(child) =>
-        collectProjectsAndFilters(child)
+      case h: ResolvedHint =>
+        collectProjectsAndFilters(h.child)
 
       case other =>
         (None, Nil, other, Map.empty)

http://git-wip-us.apache.org/repos/asf/spark/blob/0d589ba0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index 3d4efef..81bb374 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -68,6 +68,11 @@ case class Statistics(
       s"isBroadcastable=$isBroadcastable"
     ).filter(_.nonEmpty).mkString(", ")
   }
+
+  /** Must be called when computing stats for a join operator to reset hints. 
*/
+  def resetHintsForJoin(): Statistics = copy(
+    isBroadcastable = false
+  )
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0d589ba0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index d291ca0..9f34b37 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -364,7 +364,7 @@ case class Join(
       case _ =>
         // Make sure we don't propagate isBroadcastable in other joins, because
         // they could explode the size.
-        super.computeStats(conf).copy(isBroadcastable = false)
+        super.computeStats(conf).resetHintsForJoin()
     }
 
     if (conf.cboEnabled) {
@@ -376,26 +376,6 @@ case class Join(
 }
 
 /**
- * A hint for the optimizer that we should broadcast the `child` if used in a 
join operator.
- */
-case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = child.output
-
-  // set isBroadcastable to true so the child will be broadcasted
-  override def computeStats(conf: SQLConf): Statistics =
-    child.stats(conf).copy(isBroadcastable = true)
-}
-
-/**
- * A general hint for the child. This node will be eliminated post analysis.
- * A pair of (name, parameters).
- */
-case class Hint(name: String, parameters: Seq[String], child: LogicalPlan) 
extends UnaryNode {
-  override lazy val resolved: Boolean = false
-  override def output: Seq[Attribute] = child.output
-}
-
-/**
  * Insert some data into a table. Note that this plan is unresolved and has to 
be replaced by the
  * concrete implementations during analysis.
  *

http://git-wip-us.apache.org/repos/asf/spark/blob/0d589ba0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
new file mode 100644
index 0000000..9bcbfbb
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * A general hint for the child that is not yet resolved. This node is 
generated by the parser and
+ * should be removed This node will be eliminated post analysis.
+ * A pair of (name, parameters).
+ */
+case class UnresolvedHint(name: String, parameters: Seq[String], child: 
LogicalPlan)
+  extends UnaryNode {
+
+  override lazy val resolved: Boolean = false
+  override def output: Seq[Attribute] = child.output
+}
+
+/**
+ * A resolved hint node. The analyzer should convert all [[UnresolvedHint]] 
into [[ResolvedHint]].
+ */
+case class ResolvedHint(
+    child: LogicalPlan,
+    isBroadcastable: Option[Boolean] = None)
+  extends UnaryNode {
+
+  override def output: Seq[Attribute] = child.output
+
+  override def computeStats(conf: SQLConf): Statistics = {
+    val stats = child.stats(conf)
+    isBroadcastable.map(x => stats.copy(isBroadcastable = x)).getOrElse(stats)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0d589ba0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
index d101e22..bb914e1 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
@@ -28,68 +28,70 @@ class ResolveHintsSuite extends AnalysisTest {
 
   test("invalid hints should be ignored") {
     checkAnalysis(
-      Hint("some_random_hint_that_does_not_exist", Seq("TaBlE"), 
table("TaBlE")),
+      UnresolvedHint("some_random_hint_that_does_not_exist", Seq("TaBlE"), 
table("TaBlE")),
       testRelation,
       caseSensitive = false)
   }
 
   test("case-sensitive or insensitive parameters") {
     checkAnalysis(
-      Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
-      BroadcastHint(testRelation),
+      UnresolvedHint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
+      ResolvedHint(testRelation, isBroadcastable = Option(true)),
       caseSensitive = false)
 
     checkAnalysis(
-      Hint("MAPJOIN", Seq("table"), table("TaBlE")),
-      BroadcastHint(testRelation),
+      UnresolvedHint("MAPJOIN", Seq("table"), table("TaBlE")),
+      ResolvedHint(testRelation, isBroadcastable = Option(true)),
       caseSensitive = false)
 
     checkAnalysis(
-      Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
-      BroadcastHint(testRelation),
+      UnresolvedHint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
+      ResolvedHint(testRelation, isBroadcastable = Option(true)),
       caseSensitive = true)
 
     checkAnalysis(
-      Hint("MAPJOIN", Seq("table"), table("TaBlE")),
+      UnresolvedHint("MAPJOIN", Seq("table"), table("TaBlE")),
       testRelation,
       caseSensitive = true)
   }
 
   test("multiple broadcast hint aliases") {
     checkAnalysis(
-      Hint("MAPJOIN", Seq("table", "table2"), 
table("table").join(table("table2"))),
-      Join(BroadcastHint(testRelation), BroadcastHint(testRelation2), Inner, 
None),
+      UnresolvedHint("MAPJOIN", Seq("table", "table2"), 
table("table").join(table("table2"))),
+      Join(ResolvedHint(testRelation, isBroadcastable = Option(true)),
+        ResolvedHint(testRelation2, isBroadcastable = Option(true)), Inner, 
None),
       caseSensitive = false)
   }
 
   test("do not traverse past existing broadcast hints") {
     checkAnalysis(
-      Hint("MAPJOIN", Seq("table"), BroadcastHint(table("table").where('a > 
1))),
-      BroadcastHint(testRelation.where('a > 1)).analyze,
+      UnresolvedHint("MAPJOIN", Seq("table"),
+        ResolvedHint(table("table").where('a > 1), isBroadcastable = 
Option(true))),
+      ResolvedHint(testRelation.where('a > 1), isBroadcastable = 
Option(true)).analyze,
       caseSensitive = false)
   }
 
   test("should work for subqueries") {
     checkAnalysis(
-      Hint("MAPJOIN", Seq("tableAlias"), table("table").as("tableAlias")),
-      BroadcastHint(testRelation),
+      UnresolvedHint("MAPJOIN", Seq("tableAlias"), 
table("table").as("tableAlias")),
+      ResolvedHint(testRelation, isBroadcastable = Option(true)),
       caseSensitive = false)
 
     checkAnalysis(
-      Hint("MAPJOIN", Seq("tableAlias"), table("table").subquery('tableAlias)),
-      BroadcastHint(testRelation),
+      UnresolvedHint("MAPJOIN", Seq("tableAlias"), 
table("table").subquery('tableAlias)),
+      ResolvedHint(testRelation, isBroadcastable = Option(true)),
       caseSensitive = false)
 
     // Negative case: if the alias doesn't match, don't match the original 
table name.
     checkAnalysis(
-      Hint("MAPJOIN", Seq("table"), table("table").as("tableAlias")),
+      UnresolvedHint("MAPJOIN", Seq("table"), table("table").as("tableAlias")),
       testRelation,
       caseSensitive = false)
   }
 
   test("do not traverse past subquery alias") {
     checkAnalysis(
-      Hint("MAPJOIN", Seq("table"), table("table").where('a > 
1).subquery('tableAlias)),
+      UnresolvedHint("MAPJOIN", Seq("table"), table("table").where('a > 
1).subquery('tableAlias)),
       testRelation.where('a > 1).analyze,
       caseSensitive = false)
   }
@@ -102,7 +104,8 @@ class ResolveHintsSuite extends AnalysisTest {
           |SELECT /*+ BROADCAST(ctetable) */ * FROM ctetable
         """.stripMargin
       ),
-      BroadcastHint(testRelation.where('a > 1).select('a)).select('a).analyze,
+      ResolvedHint(testRelation.where('a > 1).select('a), isBroadcastable = 
Option(true))
+        .select('a).analyze,
       caseSensitive = false)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0d589ba0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
index 589607e..a0a0dae 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
@@ -321,15 +321,14 @@ class ColumnPruningSuite extends PlanTest {
       Project(Seq($"x.key", $"y.key"),
         Join(
           SubqueryAlias("x", input),
-          BroadcastHint(SubqueryAlias("y", input)), Inner, None)).analyze
+          ResolvedHint(SubqueryAlias("y", input)), Inner, None)).analyze
 
     val optimized = Optimize.execute(query)
 
     val expected =
       Join(
         Project(Seq($"x.key"), SubqueryAlias("x", input)),
-        BroadcastHint(
-          Project(Seq($"y.key"), SubqueryAlias("y", input))),
+        ResolvedHint(Project(Seq($"y.key"), SubqueryAlias("y", input))),
         Inner, None).analyze
 
     comparePlans(optimized, expected)

http://git-wip-us.apache.org/repos/asf/spark/blob/0d589ba0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 950aa23..d4d281e 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -798,12 +798,12 @@ class FilterPushdownSuite extends PlanTest {
   }
 
   test("broadcast hint") {
-    val originalQuery = BroadcastHint(testRelation)
+    val originalQuery = ResolvedHint(testRelation)
       .where('a === 2L && 'b + Rand(10).as("rnd") === 3)
 
     val optimized = Optimize.execute(originalQuery.analyze)
 
-    val correctAnswer = BroadcastHint(testRelation.where('a === 2L))
+    val correctAnswer = ResolvedHint(testRelation.where('a === 2L))
       .where('b + Rand(10).as("rnd") === 3)
       .analyze
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0d589ba0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
index a43d78c..105407d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
@@ -129,14 +129,14 @@ class JoinOptimizationSuite extends PlanTest {
       Project(Seq($"x.key", $"y.key"),
         Join(
           SubqueryAlias("x", input),
-          BroadcastHint(SubqueryAlias("y", input)), Cross, None)).analyze
+          ResolvedHint(SubqueryAlias("y", input)), Cross, None)).analyze
 
     val optimized = Optimize.execute(query)
 
     val expected =
       Join(
         Project(Seq($"x.key"), SubqueryAlias("x", input)),
-        BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input))),
+        ResolvedHint(Project(Seq($"y.key"), SubqueryAlias("y", input))),
         Cross, None).analyze
 
     comparePlans(optimized, expected)

http://git-wip-us.apache.org/repos/asf/spark/blob/0d589ba0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index d78741d..134e761 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -534,30 +534,31 @@ class PlanParserSuite extends PlanTest {
 
     comparePlans(
       parsePlan("SELECT /*+ HINT */ * FROM t"),
-      Hint("HINT", Seq.empty, table("t").select(star())))
+      UnresolvedHint("HINT", Seq.empty, table("t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ BROADCASTJOIN(u) */ * FROM t"),
-      Hint("BROADCASTJOIN", Seq("u"), table("t").select(star())))
+      UnresolvedHint("BROADCASTJOIN", Seq("u"), table("t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ MAPJOIN(u) */ * FROM t"),
-      Hint("MAPJOIN", Seq("u"), table("t").select(star())))
+      UnresolvedHint("MAPJOIN", Seq("u"), table("t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ STREAMTABLE(a,b,c) */ * FROM t"),
-      Hint("STREAMTABLE", Seq("a", "b", "c"), table("t").select(star())))
+      UnresolvedHint("STREAMTABLE", Seq("a", "b", "c"), 
table("t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ INDEX(t, emp_job_ix) */ * FROM t"),
-      Hint("INDEX", Seq("t", "emp_job_ix"), table("t").select(star())))
+      UnresolvedHint("INDEX", Seq("t", "emp_job_ix"), 
table("t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ MAPJOIN(`default.t`) */ * from `default.t`"),
-      Hint("MAPJOIN", Seq("default.t"), table("default.t").select(star())))
+      UnresolvedHint("MAPJOIN", Seq("default.t"), 
table("default.t").select(star())))
 
     comparePlans(
       parsePlan("SELECT /*+ MAPJOIN(t) */ a from t where true group by a order 
by a"),
-      Hint("MAPJOIN", Seq("t"), 
table("t").where(Literal(true)).groupBy('a)('a)).orderBy('a.asc))
+      UnresolvedHint("MAPJOIN", Seq("t"),
+        table("t").where(Literal(true)).groupBy('a)('a)).orderBy('a.asc))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0d589ba0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
index b06871f..81b91e6 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
@@ -45,7 +45,7 @@ class BasicStatsEstimationSuite extends 
StatsEstimationTestBase {
       expectedStatsCboOn = filterStatsCboOn,
       expectedStatsCboOff = filterStatsCboOff)
 
-    val broadcastHint = BroadcastHint(filter)
+    val broadcastHint = ResolvedHint(filter, isBroadcastable = Option(true))
     checkStats(
       broadcastHint,
       expectedStatsCboOn = filterStatsCboOn.copy(isBroadcastable = true),

http://git-wip-us.apache.org/repos/asf/spark/blob/0d589ba0/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 53773f1..cbab029 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1174,7 +1174,7 @@ class Dataset[T] private[sql](
    */
   @scala.annotation.varargs
   def hint(name: String, parameters: String*): Dataset[T] = withTypedPlan {
-    Hint(name, parameters, logicalPlan)
+    UnresolvedHint(name, parameters, logicalPlan)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/0d589ba0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 73541c2..5981b49 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -433,7 +433,7 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       case ExternalRDD(outputObjAttr, rdd) => 
ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
       case r: LogicalRDD =>
         RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, 
r.outputOrdering) :: Nil
-      case BroadcastHint(child) => planLater(child) :: Nil
+      case h: ResolvedHint => planLater(h.child) :: Nil
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0d589ba0/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 5edf036..563eae0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.{Star, 
UnresolvedFunction}
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.plans.logical.BroadcastHint
+import org.apache.spark.sql.catalyst.plans.logical.ResolvedHint
 import org.apache.spark.sql.execution.SparkSqlParser
 import org.apache.spark.sql.expressions.UserDefinedFunction
 import org.apache.spark.sql.internal.SQLConf
@@ -1019,7 +1019,8 @@ object functions {
    * @since 1.5.0
    */
   def broadcast[T](df: Dataset[T]): Dataset[T] = {
-    Dataset[T](df.sparkSession, BroadcastHint(df.logicalPlan))(df.exprEnc)
+    Dataset[T](df.sparkSession,
+      ResolvedHint(df.logicalPlan, isBroadcastable = Option(true)))(df.exprEnc)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/0d589ba0/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index 26c45e0..afb8ced 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -157,7 +157,7 @@ class BroadcastJoinSuite extends QueryTest with 
SQLTestUtils {
   }
 
   test("broadcast hint in SQL") {
-    import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, Join}
+    import org.apache.spark.sql.catalyst.plans.logical.{ResolvedHint, Join}
 
     spark.range(10).createOrReplaceTempView("t")
     spark.range(10).createOrReplaceTempView("u")
@@ -170,12 +170,12 @@ class BroadcastJoinSuite extends QueryTest with 
SQLTestUtils {
       val plan3 = sql(s"SELECT /*+ $name(v) */ * FROM t JOIN u ON t.id = 
u.id").queryExecution
         .optimizedPlan
 
-      assert(plan1.asInstanceOf[Join].left.isInstanceOf[BroadcastHint])
-      assert(!plan1.asInstanceOf[Join].right.isInstanceOf[BroadcastHint])
-      assert(!plan2.asInstanceOf[Join].left.isInstanceOf[BroadcastHint])
-      assert(plan2.asInstanceOf[Join].right.isInstanceOf[BroadcastHint])
-      assert(!plan3.asInstanceOf[Join].left.isInstanceOf[BroadcastHint])
-      assert(!plan3.asInstanceOf[Join].right.isInstanceOf[BroadcastHint])
+      assert(plan1.asInstanceOf[Join].left.isInstanceOf[ResolvedHint])
+      assert(!plan1.asInstanceOf[Join].right.isInstanceOf[ResolvedHint])
+      assert(!plan2.asInstanceOf[Join].left.isInstanceOf[ResolvedHint])
+      assert(plan2.asInstanceOf[Join].right.isInstanceOf[ResolvedHint])
+      assert(!plan3.asInstanceOf[Join].left.isInstanceOf[ResolvedHint])
+      assert(!plan3.asInstanceOf[Join].right.isInstanceOf[ResolvedHint])
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to