Repository: spark
Updated Branches:
  refs/heads/master cd12dd9bd -> 86ff8b102


Generalize pattern for planning hash joins.

This will be helpful for 
[SPARK-1495](https://issues.apache.org/jira/browse/SPARK-1495) and other cases 
where we want to have custom hash join implementations but don't want to repeat 
the logic for finding the join keys.

Author: Michael Armbrust <[email protected]>

Closes #418 from marmbrus/hashFilter and squashes the following commits:

d5cc79b [Michael Armbrust] Address @rxin 's comments.
366b6d9 [Michael Armbrust] style fixes
14560eb [Michael Armbrust] Generalize pattern for planning hash joins.
f4809c1 [Michael Armbrust] Move common functions to PredicateHelper.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/86ff8b10
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/86ff8b10
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/86ff8b10

Branch: refs/heads/master
Commit: 86ff8b10270bbe2579cdb1dc2297a9f4e145973e
Parents: cd12dd9
Author: Michael Armbrust <[email protected]>
Authored: Thu Apr 24 21:42:33 2014 -0700
Committer: Reynold Xin <[email protected]>
Committed: Thu Apr 24 21:42:33 2014 -0700

----------------------------------------------------------------------
 .../sql/catalyst/expressions/predicates.scala   | 29 ++++++++---
 .../spark/sql/catalyst/planning/patterns.scala  | 52 ++++++++++++++++++++
 .../spark/sql/execution/SparkStrategies.scala   | 49 +++---------------
 3 files changed, 82 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/86ff8b10/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index da5b2cf..82c7af6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -17,10 +17,11 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import org.apache.spark.sql.catalyst.trees
-import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.analysis.UnresolvedException
-import org.apache.spark.sql.catalyst.types.{BooleanType, StringType, 
TimestampType}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.trees
+import org.apache.spark.sql.catalyst.types.BooleanType
+
 
 object InterpretedPredicate {
   def apply(expression: Expression): (Row => Boolean) = {
@@ -37,10 +38,26 @@ trait Predicate extends Expression {
 }
 
 trait PredicateHelper {
-  def splitConjunctivePredicates(condition: Expression): Seq[Expression] = 
condition match {
-    case And(cond1, cond2) => splitConjunctivePredicates(cond1) ++ 
splitConjunctivePredicates(cond2)
-    case other => other :: Nil
+  protected def splitConjunctivePredicates(condition: Expression): 
Seq[Expression] = {
+    condition match {
+      case And(cond1, cond2) =>
+        splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
+      case other => other :: Nil
+    }
   }
+
+  /**
+   * Returns true if `expr` can be evaluated using only the output of `plan`.  
This method
+   * can be used to determine when is is acceptable to move expression 
evaluation within a query
+   * plan.
+   *
+   * For example consider a join between two relations R(a, b) and S(c, d).
+   *
+   * `canEvaluate(Equals(a,b), R)` returns `true` where as 
`canEvaluate(Equals(a,c), R)` returns
+   * `false`.
+   */
+  protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean =
+    expr.references.subsetOf(plan.outputSet)
 }
 
 abstract class BinaryPredicate extends BinaryExpression with Predicate {

http://git-wip-us.apache.org/repos/asf/spark/blob/86ff8b10/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 6dd816a..0e3a8a6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -19,7 +19,10 @@ package org.apache.spark.sql.catalyst.planning
 
 import scala.annotation.tailrec
 
+import org.apache.spark.sql.Logging
+
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 
 /**
@@ -102,6 +105,55 @@ object PhysicalOperation extends PredicateHelper {
 }
 
 /**
+ * A pattern that finds joins with equality conditions that can be evaluated 
using hashing
+ * techniques.  For inner joins, any filters on top of the join operator are 
also matched.
+ */
+object HashFilteredJoin extends Logging with PredicateHelper {
+  /** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */
+  type ReturnType =
+    (JoinType, Seq[Expression], Seq[Expression], Option[Expression], 
LogicalPlan, LogicalPlan)
+
+  def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
+    // All predicates can be evaluated for inner join (i.e., those that are in 
the ON
+    // clause and WHERE clause.)
+    case FilteredOperation(predicates, join @ Join(left, right, Inner, 
condition)) =>
+      logger.debug(s"Considering hash inner join on: ${predicates ++ 
condition}")
+      splitPredicates(predicates ++ condition, join)
+    case join @ Join(left, right, joinType, condition) =>
+      logger.debug(s"Considering hash join on: $condition")
+      splitPredicates(condition.toSeq, join)
+    case _ => None
+  }
+
+  // Find equi-join predicates that can be evaluated before the join, and thus 
can be used
+  // as join keys.
+  def splitPredicates(allPredicates: Seq[Expression], join: Join): 
Option[ReturnType] = {
+    val Join(left, right, joinType, _) = join
+    val (joinPredicates, otherPredicates) = allPredicates.partition {
+      case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
+        (canEvaluate(l, right) && canEvaluate(r, left)) => true
+      case _ => false
+    }
+
+    val joinKeys = joinPredicates.map {
+      case Equals(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => 
(l, r)
+      case Equals(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => 
(r, l)
+    }
+
+    // Do not consider this strategy if there are no join keys.
+    if (joinKeys.nonEmpty) {
+      val leftKeys = joinKeys.map(_._1)
+      val rightKeys = joinKeys.map(_._2)
+
+      Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), 
left, right))
+    } else {
+      logger.debug(s"Avoiding hash join with no join keys.")
+      None
+    }
+  }
+}
+
+/**
  * A pattern that collects all adjacent unions and returns their children as a 
Seq.
  */
 object Unions {

http://git-wip-us.apache.org/repos/asf/spark/blob/86ff8b10/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 500fde1..f763106 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -28,51 +28,16 @@ import org.apache.spark.sql.parquet._
 private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
   self: SQLContext#SparkPlanner =>
 
-  object HashJoin extends Strategy {
+  object HashJoin extends Strategy with PredicateHelper {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case FilteredOperation(predicates, logical.Join(left, right, Inner, 
condition)) =>
-        logger.debug(s"Considering join: ${predicates ++ condition}")
-        // Find equi-join predicates that can be evaluated before the join, 
and thus can be used
-        // as join keys. Note we can only mix in the conditions with other 
predicates because the
-        // match above ensures that this is and Inner join.
-        val (joinPredicates, otherPredicates) = (predicates ++ 
condition).partition {
-          case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) 
||
-                               (canEvaluate(l, right) && canEvaluate(r, left)) 
=> true
-          case _ => false
-        }
-
-        val joinKeys = joinPredicates.map {
-          case Equals(l,r) if canEvaluate(l, left) && canEvaluate(r, right) => 
(l, r)
-          case Equals(l,r) if canEvaluate(l, right) && canEvaluate(r, left) => 
(r, l)
-        }
-
-        // Do not consider this strategy if there are no join keys.
-        if (joinKeys.nonEmpty) {
-          val leftKeys = joinKeys.map(_._1)
-          val rightKeys = joinKeys.map(_._2)
-
-          val joinOp = execution.HashJoin(
-            leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))
-
-          // Make sure other conditions are met if present.
-          if (otherPredicates.nonEmpty) {
-            execution.Filter(combineConjunctivePredicates(otherPredicates), 
joinOp) :: Nil
-          } else {
-            joinOp :: Nil
-          }
-        } else {
-          logger.debug(s"Avoiding spark join with no join keys.")
-          Nil
-        }
+      // Find inner joins where at least some predicates can be evaluated by 
matching hash keys
+      // using the HashFilteredJoin pattern.
+      case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, 
right) =>
+        val hashJoin =
+          execution.HashJoin(leftKeys, rightKeys, BuildRight, planLater(left), 
planLater(right))
+        condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
       case _ => Nil
     }
-
-    private def combineConjunctivePredicates(predicates: Seq[Expression]) =
-      predicates.reduceLeft(And)
-
-    /** Returns true if `expr` can be evaluated using only the output of 
`plan`. */
-    protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean =
-      expr.references subsetOf plan.outputSet
   }
 
   object PartialAggregation extends Strategy {

Reply via email to