Repository: spark
Updated Branches:
  refs/heads/master bb98ecafc -> a20fea988


[Spark-1461] Deferred Expression Evaluation (short-circuit evaluation)

This patch unify the foldable & nullable interface for Expression.
1) Deterministic-less UDF (like Rand()) can not be folded.
2) Short-circut will significantly improves the performance in Expression 
Evaluation, however, the stateful UDF should not be ignored in a short-circuit 
evaluation(e.g. in expression: col1 > 0 and row_sequence() < 1000, 
row_sequence() can not be ignored even if col1 > 0 is false)

I brought an concept of DeferredObject from Hive, which has 2 kinds of children 
classes (EagerResult / DeferredResult), the former requires triggering the 
evaluation before it's created, while the later trigger the evaluation when 
first called its get() method.

Author: Cheng Hao <[email protected]>

Closes #446 from chenghao-intel/expression_deferred_evaluation and squashes the 
following commits:

d2729de [Cheng Hao] Fix the codestyle issues
a08f09c [Cheng Hao] fix bug in or/and short-circuit evaluation
af2236b [Cheng Hao] revert the short-circuit expression evaluation for IF
b7861d2 [Cheng Hao] Add Support for Deferred Expression Evaluation


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a20fea98
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a20fea98
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a20fea98

Branch: refs/heads/master
Commit: a20fea98811d98958567780815fcf0d4fb4e28d4
Parents: bb98eca
Author: Cheng Hao <[email protected]>
Authored: Thu May 15 22:12:34 2014 -0700
Committer: Reynold Xin <[email protected]>
Committed: Thu May 15 22:12:34 2014 -0700

----------------------------------------------------------------------
 .../sql/catalyst/expressions/predicates.scala   | 47 +++++++++++++-------
 .../org/apache/spark/sql/hive/hiveUdfs.scala    | 28 +++++++++---
 2 files changed, 53 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a20fea98/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 6ee4799..d111578 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -98,13 +98,19 @@ case class And(left: Expression, right: Expression) extends 
BinaryPredicate {
 
   override def eval(input: Row): Any = {
     val l = left.eval(input)
-    val r = right.eval(input)
-    if (l == false || r == false) {
-      false
-    } else if (l == null || r == null ) {
-      null
+    if (l == false) {
+       false
     } else {
-      true
+      val r = right.eval(input)
+      if (r == false) {
+        false
+      } else {
+        if (l != null && r != null) {
+          true
+        } else {
+          null
+        }
+      }
     }
   }
 }
@@ -114,13 +120,19 @@ case class Or(left: Expression, right: Expression) 
extends BinaryPredicate {
 
   override def eval(input: Row): Any = {
     val l = left.eval(input)
-    val r = right.eval(input)
-    if (l == true || r == true) {
+    if (l == true) {
       true
-    } else if (l == null || r == null) {
-      null
     } else {
-      false
+      val r = right.eval(input)
+      if (r == true) {
+        true
+      } else {
+        if (l != null && r != null) {
+          false
+        } else {
+          null
+        }
+      }
     }
   }
 }
@@ -133,8 +145,12 @@ case class Equals(left: Expression, right: Expression) 
extends BinaryComparison
   def symbol = "="
   override def eval(input: Row): Any = {
     val l = left.eval(input)
-    val r = right.eval(input)
-    if (l == null || r == null) null else l == r
+    if (l == null) {
+      null
+    } else {
+      val r = right.eval(input)
+      if (r == null) null else l == r
+    }
   }
 }
 
@@ -162,7 +178,7 @@ case class If(predicate: Expression, trueValue: Expression, 
falseValue: Expressi
     extends Expression {
 
   def children = predicate :: trueValue :: falseValue :: Nil
-  def nullable = trueValue.nullable || falseValue.nullable
+  override def nullable = trueValue.nullable || falseValue.nullable
   def references = children.flatMap(_.references).toSet
   override lazy val resolved = childrenResolved && trueValue.dataType == 
falseValue.dataType
   def dataType = {
@@ -175,8 +191,9 @@ case class If(predicate: Expression, trueValue: Expression, 
falseValue: Expressi
   }
 
   type EvaluatedType = Any
+
   override def eval(input: Row): Any = {
-    if (predicate.eval(input).asInstanceOf[Boolean]) {
+    if (true == predicate.eval(input)) {
       trueValue.eval(input)
     } else {
       falseValue.eval(input)

http://git-wip-us.apache.org/repos/asf/spark/blob/a20fea98/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index d50e2c6..5729020 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -248,17 +248,31 @@ private[hive] case class HiveGenericUdf(name: String, 
children: Seq[Expression])
     isUDFDeterministic && children.foldLeft(true)((prev, n) => prev && 
n.foldable)
   }
 
+  protected lazy val deferedObjects = 
Array.fill[DeferredObject](children.length)({
+    new DeferredObjectAdapter
+  })
+
+  // Adapter from Catalyst ExpressionResult to Hive DeferredObject
+  class DeferredObjectAdapter extends DeferredObject {
+    private var func: () => Any = _
+    def set(func: () => Any) {
+      this.func = func
+    }
+    override def prepare(i: Int) = {}
+    override def get(): AnyRef = wrap(func())
+  }
+
   val dataType: DataType = inspectorToDataType(returnInspector)
 
   override def eval(input: Row): Any = {
     returnInspector // Make sure initialized.
-    val args = children.map { v =>
-      new DeferredObject {
-        override def prepare(i: Int) = {}
-        override def get(): AnyRef = wrap(v.eval(input))
-      }
-    }.toArray
-    unwrap(function.evaluate(args))
+    var i = 0
+    while (i < children.length) {
+      val idx = i
+      deferedObjects(i).asInstanceOf[DeferredObjectAdapter].set(() => 
{children(idx).eval(input)})
+      i += 1
+    }
+    unwrap(function.evaluate(deferedObjects))
   }
 }
 

Reply via email to