Repository: spark
Updated Branches:
  refs/heads/branch-2.3 6588e007e -> 438631031


[SPARK-22916][SQL][FOLLOW-UP] Update the Description of Join Selection

## What changes were proposed in this pull request?
This PR is to update the description of the join algorithm changes.

## How was this patch tested?
N/A

Author: gatorsmile <gatorsm...@gmail.com>

Closes #20420 from gatorsmile/followUp22916.

(cherry picked from commit e30b34f7bd9a687eb43d636fffeb98fe235fcbf4)
Signed-off-by: gatorsmile <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43863103
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43863103
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43863103

Branch: refs/heads/branch-2.3
Commit: 438631031b2a7d79f8c639ef8ef0de1303bb9f2b
Parents: 6588e00
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Mon Jan 29 10:29:42 2018 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Mon Jan 29 10:29:50 2018 -0800

----------------------------------------------------------------------
 .../spark/sql/execution/SparkStrategies.scala   | 60 +++++++++++++++-----
 1 file changed, 47 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/43863103/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 9102948..25436e1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -90,23 +90,58 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
    * Select the proper physical plan for join based on joining keys and size 
of logical plan.
    *
    * At first, uses the [[ExtractEquiJoinKeys]] pattern to find joins where at 
least some of the
-   * predicates can be evaluated by matching join keys. If found,  Join 
implementations are chosen
+   * predicates can be evaluated by matching join keys. If found, join 
implementations are chosen
    * with the following precedence:
    *
-   * - Broadcast: We prefer to broadcast the join side with an explicit 
broadcast hint(e.g. the
-   *     user applied the [[org.apache.spark.sql.functions.broadcast()]] 
function to a DataFrame).
-   *     If both sides have the broadcast hint, we prefer to broadcast the 
side with a smaller
-   *     estimated physical size. If neither one of the sides has the 
broadcast hint,
-   *     we only broadcast the join side if its estimated physical size that 
is smaller than
-   *     the user-configurable [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] 
threshold.
+   * - Broadcast hash join (BHJ):
+   *     BHJ is not supported for full outer join. For right outer join, we 
only can broadcast the
+   *     left side. For left outer, left semi, left anti and the internal join 
type ExistenceJoin,
+   *     we only can broadcast the right side. For inner like join, we can 
broadcast both sides.
+   *     Normally, BHJ can perform faster than the other join algorithms when 
the broadcast side is
+   *     small. However, broadcasting tables is a network-intensive operation. 
It could cause OOM
+   *     or perform worse than the other join algorithms, especially when the 
build/broadcast side
+   *     is big.
+   *
+   *     For the supported cases, users can specify the broadcast hint (e.g. 
the user applied the
+   *     [[org.apache.spark.sql.functions.broadcast()]] function to a 
DataFrame) and session-based
+   *     [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold to adjust whether 
BHJ is used and
+   *     which join side is broadcast.
+   *
+   *     1) Broadcast the join side with the broadcast hint, even if the size 
is larger than
+   *     [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]]. If both sides have the hint 
(only when the type
+   *     is inner like join), the side with a smaller estimated physical size 
will be broadcast.
+   *     2) Respect the [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold and 
broadcast the side
+   *     whose estimated physical size is smaller than the threshold. If both 
sides are below the
+   *     threshold, broadcast the smaller side. If neither is smaller, BHJ is 
not used.
+   *
    * - Shuffle hash join: if the average size of a single partition is small 
enough to build a hash
    *     table.
+   *
    * - Sort merge: if the matching join keys are sortable.
    *
    * If there is no joining keys, Join implementations are chosen with the 
following precedence:
-   * - BroadcastNestedLoopJoin: if one side of the join could be broadcasted
-   * - CartesianProduct: for Inner join
-   * - BroadcastNestedLoopJoin
+   * - BroadcastNestedLoopJoin (BNLJ):
+   *     BNLJ supports all the join types but the impl is OPTIMIZED for the 
following scenarios:
+   *     For right outer join, the left side is broadcast. For left outer, 
left semi, left anti
+   *     and the internal join type ExistenceJoin, the right side is 
broadcast. For inner like
+   *     joins, either side is broadcast.
+   *
+   *     Like BHJ, users still can specify the broadcast hint and session-based
+   *     [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold to impact which 
side is broadcast.
+   *
+   *     1) Broadcast the join side with the broadcast hint, even if the size 
is larger than
+   *     [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]]. If both sides have the hint 
(i.e., just for
+   *     inner-like join), the side with a smaller estimated physical size 
will be broadcast.
+   *     2) Respect the [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold and 
broadcast the side
+   *     whose estimated physical size is smaller than the threshold. If both 
sides are below the
+   *     threshold, broadcast the smaller side. If neither is smaller, BNLJ is 
not used.
+   *
+   * - CartesianProduct: for inner like join, CartesianProduct is the fallback 
option.
+   *
+   * - BroadcastNestedLoopJoin (BNLJ):
+   *     For the other join types, BNLJ is the fallback option. Here, we just 
pick the broadcast
+   *     side with the broadcast hint. If neither side has a hint, we 
broadcast the side with
+   *     the smaller estimated physical size.
    */
   object JoinSelection extends Strategy with PredicateHelper {
 
@@ -139,8 +174,7 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
     }
 
     private def canBuildRight(joinType: JoinType): Boolean = joinType match {
-      case _: InnerLike | LeftOuter | LeftSemi | LeftAnti => true
-      case j: ExistenceJoin => true
+      case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin 
=> true
       case _ => false
     }
 
@@ -243,7 +277,7 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
       // --- Without joining keys 
------------------------------------------------------------
 
-      // Pick BroadcastNestedLoopJoin if one side could be broadcasted
+      // Pick BroadcastNestedLoopJoin if one side could be broadcast
       case j @ logical.Join(left, right, joinType, condition)
           if canBroadcastByHints(joinType, left, right) =>
         val buildSide = broadcastSideByHints(joinType, left, right)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to