Repository: spark
Updated Branches:
  refs/heads/master f1f7385a5 -> 981bde9b0


[SQL]Extract the joinkeys from join condition

Extract the join keys from equality conditions, that can be evaluated using 
equi-join.

Author: Cheng Hao <[email protected]>

Closes #1190 from chenghao-intel/extract_join_keys and squashes the following 
commits:

4a1060a [Cheng Hao] Fix some of the small issues
ceb4924 [Cheng Hao] Remove the redundant pattern of join keys extraction
cec34e8 [Cheng Hao] Update the code style issues
dcc4584 [Cheng Hao] Extract the joinkeys from join condition


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/981bde9b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/981bde9b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/981bde9b

Branch: refs/heads/master
Commit: 981bde9b056ef5e91aed553e0b5930f12e1ff797
Parents: f1f7385
Author: Cheng Hao <[email protected]>
Authored: Thu Jun 26 19:18:11 2014 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Thu Jun 26 19:18:11 2014 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      |  7 ++-
 .../spark/sql/catalyst/planning/patterns.scala  | 62 +++++++-------------
 .../spark/sql/execution/SparkStrategies.scala   | 13 ++--
 3 files changed, 33 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/981bde9b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index b20b5de..fb517e4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -257,8 +257,11 @@ object PushPredicateThroughProject extends 
Rule[LogicalPlan] {
  * Check https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior 
for more details
  */
 object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper 
{
-  // split the condition expression into 3 parts, 
-  // (canEvaluateInLeftSide, canEvaluateInRightSide, 
haveToEvaluateWithBothSide) 
+  /**
+   * Splits join condition expressions into three categories based on the 
attributes required
+   * to evaluate them.
+   * @returns (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth)
+   */
   private def split(condition: Seq[Expression], left: LogicalPlan, right: 
LogicalPlan) = {
     val (leftEvaluateCondition, rest) =
         condition.partition(_.references subsetOf left.outputSet)

http://git-wip-us.apache.org/repos/asf/spark/blob/981bde9b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index a43bef3..026692a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -105,57 +105,39 @@ object PhysicalOperation extends PredicateHelper {
 }
 
 /**
- * A pattern that finds joins with equality conditions that can be evaluated 
using hashing
- * techniques.  For inner joins, any filters on top of the join operator are 
also matched.
+ * A pattern that finds joins with equality conditions that can be evaluated 
using equi-join.
  */
-object HashFilteredJoin extends Logging with PredicateHelper {
+object ExtractEquiJoinKeys extends Logging with PredicateHelper {
   /** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */
   type ReturnType =
     (JoinType, Seq[Expression], Seq[Expression], Option[Expression], 
LogicalPlan, LogicalPlan)
 
   def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
-    // All predicates can be evaluated for inner join (i.e., those that are in 
the ON
-    // clause and WHERE clause.)
-    case FilteredOperation(predicates, join @ Join(left, right, Inner, 
condition)) =>
-      logger.debug(s"Considering hash inner join on: ${predicates ++ 
condition}")
-      splitPredicates(predicates ++ condition, join)
-    // All predicates can be evaluated for left semi join (those that are in 
the WHERE
-    // clause can only from left table, so they can all be pushed down.)
-    case FilteredOperation(predicates, join @ Join(left, right, LeftSemi, 
condition)) =>
-      logger.debug(s"Considering hash left semi join on: ${predicates ++ 
condition}")
-      splitPredicates(predicates ++ condition, join)
     case join @ Join(left, right, joinType, condition) =>
-      logger.debug(s"Considering hash join on: $condition")
-      splitPredicates(condition.toSeq, join)
-    case _ => None
-  }
-
-  // Find equi-join predicates that can be evaluated before the join, and thus 
can be used
-  // as join keys.
-  def splitPredicates(allPredicates: Seq[Expression], join: Join): 
Option[ReturnType] = {
-    val Join(left, right, joinType, _) = join
-    val (joinPredicates, otherPredicates) =
-      allPredicates.flatMap(splitConjunctivePredicates).partition {
-        case EqualTo(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) 
||
-          (canEvaluate(l, right) && canEvaluate(r, left)) => true
-        case _ => false
+      logger.debug(s"Considering join on: $condition")
+      // Find equi-join predicates that can be evaluated before the join, and 
thus can be used
+      // as join keys.
+      val (joinPredicates, otherPredicates) = 
+        condition.map(splitConjunctivePredicates).getOrElse(Nil).partition {
+          case EqualTo(l, r) if (canEvaluate(l, left) && canEvaluate(r, 
right)) ||
+            (canEvaluate(l, right) && canEvaluate(r, left)) => true
+          case _ => false
+        }
+
+      val joinKeys = joinPredicates.map {
+        case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => 
(l, r)
+        case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => 
(r, l)
       }
-
-    val joinKeys = joinPredicates.map {
-      case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => 
(l, r)
-      case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => 
(r, l)
-    }
-
-    // Do not consider this strategy if there are no join keys.
-    if (joinKeys.nonEmpty) {
       val leftKeys = joinKeys.map(_._1)
       val rightKeys = joinKeys.map(_._2)
 
-      Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), 
left, right))
-    } else {
-      logger.debug(s"Avoiding hash join with no join keys.")
-      None
-    }
+      if (joinKeys.nonEmpty) {
+        logger.debug(s"leftKeys:${leftKeys} | rightKeys:${rightKeys}")
+        Some((joinType, leftKeys, rightKeys, 
otherPredicates.reduceOption(And), left, right))
+      } else {
+        None
+      }
+    case _ => None
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/981bde9b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 3cd2996..0925605 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -31,9 +31,8 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
   object LeftSemiJoin extends Strategy with PredicateHelper {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      // Find left semi joins where at least some predicates can be evaluated 
by matching hash
-      // keys using the HashFilteredJoin pattern.
-      case HashFilteredJoin(LeftSemi, leftKeys, rightKeys, condition, left, 
right) =>
+      // Find left semi joins where at least some predicates can be evaluated 
by matching join keys
+      case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, 
right) =>
         val semiJoin = execution.LeftSemiJoinHash(
           leftKeys, rightKeys, planLater(left), planLater(right))
         condition.map(Filter(_, semiJoin)).getOrElse(semiJoin) :: Nil
@@ -46,7 +45,7 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   }
 
   /**
-   * Uses the HashFilteredJoin pattern to find joins where at least some of 
the predicates can be
+   * Uses the ExtractEquiJoinKeys pattern to find joins where at least some of 
the predicates can be
    * evaluated by matching hash keys.
    */
   object HashJoin extends Strategy with PredicateHelper {
@@ -65,7 +64,7 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
     def broadcastTables: Seq[String] = 
sqlContext.joinBroadcastTables.split(",").toBuffer
 
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case HashFilteredJoin(
+      case ExtractEquiJoinKeys(
               Inner,
               leftKeys,
               rightKeys,
@@ -75,7 +74,7 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
         if broadcastTables.contains(b.tableName) =>
           broadcastHashJoin(leftKeys, rightKeys, left, right, condition, 
BuildRight)
 
-      case HashFilteredJoin(
+      case ExtractEquiJoinKeys(
               Inner,
               leftKeys,
               rightKeys,
@@ -85,7 +84,7 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
         if broadcastTables.contains(b.tableName) =>
           broadcastHashJoin(leftKeys, rightKeys, left, right, condition, 
BuildLeft)
 
-      case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, 
right) =>
+      case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, 
right) =>
         val hashJoin =
           execution.ShuffledHashJoin(
             leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))

Reply via email to