Repository: spark
Updated Branches:
  refs/heads/master 0cba49512 -> c92949ac2


[SPARK-20972][SQL] rename HintInfo.isBroadcastable to broadcast

## What changes were proposed in this pull request?

`HintInfo.isBroadcastable` is actually not an accurate name, it's used to force 
the planner to broadcast a plan no matter what the data size is, via the hint 
mechanism. I think `forceBroadcast` is a better name.

And `isBroadcastable` only have 2 possible values: `Some(true)` and `None`, so 
we can just use boolean type for it.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenc...@databricks.com>

Closes #18189 from cloud-fan/stats.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c92949ac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c92949ac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c92949ac

Branch: refs/heads/master
Commit: c92949ac23652e2c3a0c97fdf3d6e016f9d01dda
Parents: 0cba495
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Tue Jun 6 22:50:06 2017 -0700
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Tue Jun 6 22:50:06 2017 -0700

----------------------------------------------------------------------
 .../sql/catalyst/analysis/ResolveHints.scala    |  6 +++---
 .../sql/catalyst/plans/logical/hints.scala      | 16 +++++++---------
 .../catalyst/analysis/ResolveHintsSuite.scala   | 20 ++++++++++----------
 .../BasicStatsEstimationSuite.scala             |  6 +++---
 .../spark/sql/execution/SparkStrategies.scala   |  2 +-
 .../scala/org/apache/spark/sql/functions.scala  |  2 +-
 6 files changed, 25 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c92949ac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
index 62a3482..f068bce 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
@@ -58,9 +58,9 @@ object ResolveHints {
       val newNode = CurrentOrigin.withOrigin(plan.origin) {
         plan match {
           case u: UnresolvedRelation if toBroadcast.exists(resolver(_, 
u.tableIdentifier.table)) =>
-            ResolvedHint(plan, HintInfo(isBroadcastable = Option(true)))
+            ResolvedHint(plan, HintInfo(broadcast = true))
           case r: SubqueryAlias if toBroadcast.exists(resolver(_, r.alias)) =>
-            ResolvedHint(plan, HintInfo(isBroadcastable = Option(true)))
+            ResolvedHint(plan, HintInfo(broadcast = true))
 
           case _: ResolvedHint | _: View | _: With | _: SubqueryAlias =>
             // Don't traverse down these nodes.
@@ -89,7 +89,7 @@ object ResolveHints {
       case h: UnresolvedHint if 
BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
         if (h.parameters.isEmpty) {
           // If there is no table alias specified, turn the entire subtree 
into a BroadcastHint.
-          ResolvedHint(h.child, HintInfo(isBroadcastable = Option(true)))
+          ResolvedHint(h.child, HintInfo(broadcast = true))
         } else {
           // Otherwise, find within the subtree query plans that should be 
broadcasted.
           applyBroadcastHint(h.child, h.parameters.map {

http://git-wip-us.apache.org/repos/asf/spark/blob/c92949ac/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
index d16fae5..e49970d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
@@ -51,19 +51,17 @@ case class ResolvedHint(child: LogicalPlan, hints: HintInfo 
= HintInfo())
 }
 
 
-case class HintInfo(
-    isBroadcastable: Option[Boolean] = None) {
+case class HintInfo(broadcast: Boolean = false) {
 
   /** Must be called when computing stats for a join operator to reset hints. 
*/
-  def resetForJoin(): HintInfo = copy(
-    isBroadcastable = None
-  )
+  def resetForJoin(): HintInfo = copy(broadcast = false)
 
   override def toString: String = {
-    if (productIterator.forall(_.asInstanceOf[Option[_]].isEmpty)) {
-      "none"
-    } else {
-      isBroadcastable.map(x => s"isBroadcastable=$x").getOrElse("")
+    val hints = scala.collection.mutable.ArrayBuffer.empty[String]
+    if (broadcast) {
+      hints += "broadcast"
     }
+
+    if (hints.isEmpty) "none" else hints.mkString("(", ", ", ")")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c92949ac/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
index 3d51480..9782b5f 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala
@@ -36,17 +36,17 @@ class ResolveHintsSuite extends AnalysisTest {
   test("case-sensitive or insensitive parameters") {
     checkAnalysis(
       UnresolvedHint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
-      ResolvedHint(testRelation, HintInfo(isBroadcastable = Option(true))),
+      ResolvedHint(testRelation, HintInfo(broadcast = true)),
       caseSensitive = false)
 
     checkAnalysis(
       UnresolvedHint("MAPJOIN", Seq("table"), table("TaBlE")),
-      ResolvedHint(testRelation, HintInfo(isBroadcastable = Option(true))),
+      ResolvedHint(testRelation, HintInfo(broadcast = true)),
       caseSensitive = false)
 
     checkAnalysis(
       UnresolvedHint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
-      ResolvedHint(testRelation, HintInfo(isBroadcastable = Option(true))),
+      ResolvedHint(testRelation, HintInfo(broadcast = true)),
       caseSensitive = true)
 
     checkAnalysis(
@@ -58,28 +58,28 @@ class ResolveHintsSuite extends AnalysisTest {
   test("multiple broadcast hint aliases") {
     checkAnalysis(
       UnresolvedHint("MAPJOIN", Seq("table", "table2"), 
table("table").join(table("table2"))),
-      Join(ResolvedHint(testRelation, HintInfo(isBroadcastable = 
Option(true))),
-        ResolvedHint(testRelation2, HintInfo(isBroadcastable = Option(true))), 
Inner, None),
+      Join(ResolvedHint(testRelation, HintInfo(broadcast = true)),
+        ResolvedHint(testRelation2, HintInfo(broadcast = true)), Inner, None),
       caseSensitive = false)
   }
 
   test("do not traverse past existing broadcast hints") {
     checkAnalysis(
       UnresolvedHint("MAPJOIN", Seq("table"),
-        ResolvedHint(table("table").where('a > 1), HintInfo(isBroadcastable = 
Option(true)))),
-      ResolvedHint(testRelation.where('a > 1), HintInfo(isBroadcastable = 
Option(true))).analyze,
+        ResolvedHint(table("table").where('a > 1), HintInfo(broadcast = 
true))),
+      ResolvedHint(testRelation.where('a > 1), HintInfo(broadcast = 
true)).analyze,
       caseSensitive = false)
   }
 
   test("should work for subqueries") {
     checkAnalysis(
       UnresolvedHint("MAPJOIN", Seq("tableAlias"), 
table("table").as("tableAlias")),
-      ResolvedHint(testRelation, HintInfo(isBroadcastable = Option(true))),
+      ResolvedHint(testRelation, HintInfo(broadcast = true)),
       caseSensitive = false)
 
     checkAnalysis(
       UnresolvedHint("MAPJOIN", Seq("tableAlias"), 
table("table").subquery('tableAlias)),
-      ResolvedHint(testRelation, HintInfo(isBroadcastable = Option(true))),
+      ResolvedHint(testRelation, HintInfo(broadcast = true)),
       caseSensitive = false)
 
     // Negative case: if the alias doesn't match, don't match the original 
table name.
@@ -104,7 +104,7 @@ class ResolveHintsSuite extends AnalysisTest {
           |SELECT /*+ BROADCAST(ctetable) */ * FROM ctetable
         """.stripMargin
       ),
-      ResolvedHint(testRelation.where('a > 1).select('a), 
HintInfo(isBroadcastable = Option(true)))
+      ResolvedHint(testRelation.where('a > 1).select('a), HintInfo(broadcast = 
true))
         .select('a).analyze,
       caseSensitive = false)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c92949ac/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
index 2afea6d..833f5a7 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
@@ -45,11 +45,11 @@ class BasicStatsEstimationSuite extends 
StatsEstimationTestBase {
       expectedStatsCboOn = filterStatsCboOn,
       expectedStatsCboOff = filterStatsCboOff)
 
-    val broadcastHint = ResolvedHint(filter, HintInfo(isBroadcastable = 
Option(true)))
+    val broadcastHint = ResolvedHint(filter, HintInfo(broadcast = true))
     checkStats(
       broadcastHint,
-      expectedStatsCboOn = filterStatsCboOn.copy(hints = 
HintInfo(isBroadcastable = Option(true))),
-      expectedStatsCboOff = filterStatsCboOff.copy(hints = 
HintInfo(isBroadcastable = Option(true)))
+      expectedStatsCboOn = filterStatsCboOn.copy(hints = HintInfo(broadcast = 
true)),
+      expectedStatsCboOff = filterStatsCboOff.copy(hints = HintInfo(broadcast 
= true))
     )
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c92949ac/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index f13294c..ea86f6e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -114,7 +114,7 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
      * Matches a plan whose output should be small enough to be used in 
broadcast join.
      */
     private def canBroadcast(plan: LogicalPlan): Boolean = {
-      plan.stats(conf).hints.isBroadcastable.getOrElse(false) ||
+      plan.stats(conf).hints.broadcast ||
         (plan.stats(conf).sizeInBytes >= 0 &&
           plan.stats(conf).sizeInBytes <= conf.autoBroadcastJoinThreshold)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/c92949ac/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 67ec132..8d0a8c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -1020,7 +1020,7 @@ object functions {
    */
   def broadcast[T](df: Dataset[T]): Dataset[T] = {
     Dataset[T](df.sparkSession,
-      ResolvedHint(df.logicalPlan, HintInfo(isBroadcastable = 
Option(true))))(df.exprEnc)
+      ResolvedHint(df.logicalPlan, HintInfo(broadcast = true)))(df.exprEnc)
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to