Repository: spark
Updated Branches:
refs/heads/branch-1.3 a15a0a02c -> 639a3c2fd
[SQL] Optimize arithmetic and predicate operators
Existing implementation of arithmetic operators and BinaryComparison operators
have redundant type checking codes, e.g.:
Expression.n2 is used by Add/Subtract/Multiply.
(1) n2 always checks left.dataType == right.dataType. However, this checking
should be done once when we resolve expression types;
(2) n2 requires dataType is a NumericType. This can be done once.
This PR optimizes arithmetic and predicate operators by removing such redundant
type-checking codes.
Some preliminary benchmarking on 10G TPC-H data over 5 r3.2xlarge EC2 machines
shows that this PR can reduce the query time by 5.5% to 11%.
The benchmark queries follow the template below, where OP is
plus/minus/times/divide/remainder/bitwise and/bitwise or/bitwise xor.
SELECT l_returnflag, l_linestatus, SUM(l_quantity OP cnt1), SUM(l_quantity OP
cnt2), ...., SUM(l_quantity OP cnt700)
FROM (
SELECT l_returnflag, l_linestatus, l_quantity, 1 AS cnt1, 2 AS cnt2, ...,
700 AS cnt700
FROM lineitem
WHERE l_shipdate <= '1998-09-01'
)
GROUP BY l_returnflag, l_linestatus;
Author: kai <[email protected]>
Closes #4472 from kai-zeng/arithmetic-optimize and squashes the following
commits:
fef0cf1 [kai] Merge branch 'master' of github.com:apache/spark into
arithmetic-optimize
4b3a1bb [kai] chmod a-x
5a41e49 [kai] chmod a-x Expression.scala
cb37c94 [kai] rebase onto spark master
7f6e968 [kai] chmod 100755 -> 100644
6cddb46 [kai] format
7490dbc [kai] fix unresolved-expression exception for EqualTo
9c40bc0 [kai] fix bitwisenot
3cbd363 [kai] clean up test code
ca47801 [kai] override evalInternal for bitwise ops
8fa84a1 [kai] add bitwise or and xor
6892fc4 [kai] revert override evalInternal
f8eba24 [kai] override evalInternal
31ccdd4 [kai] rewrite all bitwise op and remove evalInternal
86297e2 [kai] generalized
cb92ae1 [kai] bitwise-and: override eval
97a7d6c [kai] bitwise-and: override evalInternal using and func
0906c39 [kai] add bitwise test
62abbbc [kai] clean up predicate and arithmetic
b34d58d [kai] add caching and benmark option
12c5b32 [kai] override eval
1cd7571 [kai] fix sqrt and maxof
03fd0c3 [kai] fix predicate
16fd84c [kai] optimize + - * / % -(unary) abs < > <= >=
fd95823 [kai] remove unnecessary type checking
24d062f [kai] test suite
(cherry picked from commit cb6c48c874af2bd78ee73c1dc8a44fd28ecc0991)
Signed-off-by: Michael Armbrust <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/639a3c2f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/639a3c2f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/639a3c2f
Branch: refs/heads/branch-1.3
Commit: 639a3c2fdf302a34ad5f62199abee2b96023669b
Parents: a15a0a0
Author: kai <[email protected]>
Authored: Mon Feb 16 15:58:05 2015 -0800
Committer: Michael Armbrust <[email protected]>
Committed: Mon Feb 16 15:58:18 2015 -0800
----------------------------------------------------------------------
.../apache/spark/sql/catalyst/SqlParser.scala | 0
.../apache/spark/sql/catalyst/dsl/package.scala | 0
.../sql/catalyst/expressions/Expression.scala | 200 ---------------
.../sql/catalyst/expressions/aggregates.scala | 0
.../sql/catalyst/expressions/arithmetic.scala | 243 ++++++++++++++-----
.../sql/catalyst/expressions/predicates.scala | 107 +++++++-
.../org/apache/spark/sql/types/Metadata.scala | 0
.../spark/sql/catalyst/util/MetadataSuite.scala | 0
.../apache/spark/sql/execution/Aggregate.scala | 0
.../hive/thriftserver/SparkSQLCLIDriver.scala | 0
10 files changed, 290 insertions(+), 260 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/639a3c2f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
old mode 100755
new mode 100644
http://git-wip-us.apache.org/repos/asf/spark/blob/639a3c2f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
old mode 100755
new mode 100644
http://git-wip-us.apache.org/repos/asf/spark/blob/639a3c2f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index c32a4b8..6ad39b8 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -77,206 +77,6 @@ abstract class Expression extends TreeNode[Expression] {
case u: UnresolvedAttribute => PrettyAttribute(u.name)
}.toString
}
-
- /**
- * A set of helper functions that return the correct descendant of
`scala.math.Numeric[T]` type
- * and do any casting necessary of child evaluation.
- */
- @inline
- def n1(e: Expression, i: Row, f: ((Numeric[Any], Any) => Any)): Any = {
- val evalE = e.eval(i)
- if (evalE == null) {
- null
- } else {
- e.dataType match {
- case n: NumericType =>
- val castedFunction = f.asInstanceOf[(Numeric[n.JvmType], n.JvmType)
=> n.JvmType]
- castedFunction(n.numeric, evalE.asInstanceOf[n.JvmType])
- case other => sys.error(s"Type $other does not support numeric
operations")
- }
- }
- }
-
- /**
- * Evaluation helper function for 2 Numeric children expressions. Those
expressions are supposed
- * to be in the same data type, and also the return type.
- * Either one of the expressions result is null, the evaluation result
should be null.
- */
- @inline
- protected final def n2(
- i: Row,
- e1: Expression,
- e2: Expression,
- f: ((Numeric[Any], Any, Any) => Any)): Any = {
-
- if (e1.dataType != e2.dataType) {
- throw new TreeNodeException(this, s"Types do not match ${e1.dataType}
!= ${e2.dataType}")
- }
-
- val evalE1 = e1.eval(i)
- if(evalE1 == null) {
- null
- } else {
- val evalE2 = e2.eval(i)
- if (evalE2 == null) {
- null
- } else {
- e1.dataType match {
- case n: NumericType =>
- f.asInstanceOf[(Numeric[n.JvmType], n.JvmType, n.JvmType) =>
n.JvmType](
- n.numeric, evalE1.asInstanceOf[n.JvmType],
evalE2.asInstanceOf[n.JvmType])
- case other => sys.error(s"Type $other does not support numeric
operations")
- }
- }
- }
- }
-
- /**
- * Evaluation helper function for 2 Fractional children expressions. Those
expressions are
- * supposed to be in the same data type, and also the return type.
- * Either one of the expressions result is null, the evaluation result
should be null.
- */
- @inline
- protected final def f2(
- i: Row,
- e1: Expression,
- e2: Expression,
- f: ((Fractional[Any], Any, Any) => Any)): Any = {
- if (e1.dataType != e2.dataType) {
- throw new TreeNodeException(this, s"Types do not match ${e1.dataType}
!= ${e2.dataType}")
- }
-
- val evalE1 = e1.eval(i: Row)
- if(evalE1 == null) {
- null
- } else {
- val evalE2 = e2.eval(i: Row)
- if (evalE2 == null) {
- null
- } else {
- e1.dataType match {
- case ft: FractionalType =>
- f.asInstanceOf[(Fractional[ft.JvmType], ft.JvmType, ft.JvmType) =>
ft.JvmType](
- ft.fractional, evalE1.asInstanceOf[ft.JvmType],
evalE2.asInstanceOf[ft.JvmType])
- case other => sys.error(s"Type $other does not support fractional
operations")
- }
- }
- }
- }
-
- /**
- * Evaluation helper function for 1 Fractional children expression.
- * if the expression result is null, the evaluation result should be null.
- */
- @inline
- protected final def f1(i: Row, e1: Expression, f: ((Fractional[Any], Any) =>
Any)): Any = {
- val evalE1 = e1.eval(i: Row)
- if(evalE1 == null) {
- null
- } else {
- e1.dataType match {
- case ft: FractionalType =>
- f.asInstanceOf[(Fractional[ft.JvmType], ft.JvmType) => ft.JvmType](
- ft.fractional, evalE1.asInstanceOf[ft.JvmType])
- case other => sys.error(s"Type $other does not support fractional
operations")
- }
- }
- }
-
- /**
- * Evaluation helper function for 2 Integral children expressions. Those
expressions are
- * supposed to be in the same data type, and also the return type.
- * Either one of the expressions result is null, the evaluation result
should be null.
- */
- @inline
- protected final def i2(
- i: Row,
- e1: Expression,
- e2: Expression,
- f: ((Integral[Any], Any, Any) => Any)): Any = {
- if (e1.dataType != e2.dataType) {
- throw new TreeNodeException(this, s"Types do not match ${e1.dataType}
!= ${e2.dataType}")
- }
-
- val evalE1 = e1.eval(i)
- if(evalE1 == null) {
- null
- } else {
- val evalE2 = e2.eval(i)
- if (evalE2 == null) {
- null
- } else {
- e1.dataType match {
- case i: IntegralType =>
- f.asInstanceOf[(Integral[i.JvmType], i.JvmType, i.JvmType) =>
i.JvmType](
- i.integral, evalE1.asInstanceOf[i.JvmType],
evalE2.asInstanceOf[i.JvmType])
- case i: FractionalType =>
- f.asInstanceOf[(Integral[i.JvmType], i.JvmType, i.JvmType) =>
i.JvmType](
- i.asIntegral, evalE1.asInstanceOf[i.JvmType],
evalE2.asInstanceOf[i.JvmType])
- case other => sys.error(s"Type $other does not support numeric
operations")
- }
- }
- }
- }
-
- /**
- * Evaluation helper function for 1 Integral children expression.
- * if the expression result is null, the evaluation result should be null.
- */
- @inline
- protected final def i1(i: Row, e1: Expression, f: ((Integral[Any], Any) =>
Any)): Any = {
- val evalE1 = e1.eval(i)
- if(evalE1 == null) {
- null
- } else {
- e1.dataType match {
- case i: IntegralType =>
- f.asInstanceOf[(Integral[i.JvmType], i.JvmType) => i.JvmType](
- i.integral, evalE1.asInstanceOf[i.JvmType])
- case i: FractionalType =>
- f.asInstanceOf[(Integral[i.JvmType], i.JvmType) => i.JvmType](
- i.asIntegral, evalE1.asInstanceOf[i.JvmType])
- case other => sys.error(s"Type $other does not support numeric
operations")
- }
- }
- }
-
- /**
- * Evaluation helper function for 2 Comparable children expressions. Those
expressions are
- * supposed to be in the same data type, and the return type should be
Integer:
- * Negative value: 1st argument less than 2nd argument
- * Zero: 1st argument equals 2nd argument
- * Positive value: 1st argument greater than 2nd argument
- *
- * Either one of the expressions result is null, the evaluation result
should be null.
- */
- @inline
- protected final def c2(
- i: Row,
- e1: Expression,
- e2: Expression,
- f: ((Ordering[Any], Any, Any) => Any)): Any = {
- if (e1.dataType != e2.dataType) {
- throw new TreeNodeException(this, s"Types do not match ${e1.dataType}
!= ${e2.dataType}")
- }
-
- val evalE1 = e1.eval(i)
- if(evalE1 == null) {
- null
- } else {
- val evalE2 = e2.eval(i)
- if (evalE2 == null) {
- null
- } else {
- e1.dataType match {
- case i: NativeType =>
- f.asInstanceOf[(Ordering[i.JvmType], i.JvmType, i.JvmType) =>
Boolean](
- i.ordering, evalE1.asInstanceOf[i.JvmType],
evalE2.asInstanceOf[i.JvmType])
- case other => sys.error(s"Type $other does not support ordered
operations")
- }
- }
- }
- }
}
abstract class BinaryExpression extends Expression with
trees.BinaryNode[Expression] {
http://git-wip-us.apache.org/repos/asf/spark/blob/639a3c2f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
old mode 100755
new mode 100644
http://git-wip-us.apache.org/repos/asf/spark/blob/639a3c2f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
index 574907f..00b0d3c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
+import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.types._
case class UnaryMinus(child: Expression) extends UnaryExpression {
@@ -28,8 +29,18 @@ case class UnaryMinus(child: Expression) extends
UnaryExpression {
def nullable = child.nullable
override def toString = s"-$child"
+ lazy val numeric = dataType match {
+ case n: NumericType => n.numeric.asInstanceOf[Numeric[Any]]
+ case other => sys.error(s"Type $other does not support numeric operations")
+ }
+
override def eval(input: Row): Any = {
- n1(child, input, _.negate(_))
+ val evalE = child.eval(input)
+ if (evalE == null) {
+ null
+ } else {
+ numeric.negate(evalE)
+ }
}
}
@@ -41,18 +52,19 @@ case class Sqrt(child: Expression) extends UnaryExpression {
def nullable = true
override def toString = s"SQRT($child)"
+ lazy val numeric = child.dataType match {
+ case n: NumericType => n.numeric.asInstanceOf[Numeric[Any]]
+ case other => sys.error(s"Type $other does not support non-negative
numeric operations")
+ }
+
override def eval(input: Row): Any = {
val evalE = child.eval(input)
if (evalE == null) {
null
} else {
- child.dataType match {
- case n: NumericType =>
- val value = n.numeric.toDouble(evalE.asInstanceOf[n.JvmType])
- if (value < 0) null
- else math.sqrt(value)
- case other => sys.error(s"Type $other does not support non-negative
numeric operations")
- }
+ val value = numeric.toDouble(evalE)
+ if (value < 0) null
+ else math.sqrt(value)
}
}
}
@@ -98,19 +110,70 @@ abstract class BinaryArithmetic extends BinaryExpression {
case class Add(left: Expression, right: Expression) extends BinaryArithmetic {
def symbol = "+"
- override def eval(input: Row): Any = n2(input, left, right, _.plus(_, _))
+ lazy val numeric = dataType match {
+ case n: NumericType => n.numeric.asInstanceOf[Numeric[Any]]
+ case other => sys.error(s"Type $other does not support numeric operations")
+ }
+
+ override def eval(input: Row): Any = {
+ val evalE1 = left.eval(input)
+ if(evalE1 == null) {
+ null
+ } else {
+ val evalE2 = right.eval(input)
+ if (evalE2 == null) {
+ null
+ } else {
+ numeric.plus(evalE1, evalE2)
+ }
+ }
+ }
}
case class Subtract(left: Expression, right: Expression) extends
BinaryArithmetic {
def symbol = "-"
- override def eval(input: Row): Any = n2(input, left, right, _.minus(_, _))
+ lazy val numeric = dataType match {
+ case n: NumericType => n.numeric.asInstanceOf[Numeric[Any]]
+ case other => sys.error(s"Type $other does not support numeric operations")
+ }
+
+ override def eval(input: Row): Any = {
+ val evalE1 = left.eval(input)
+ if(evalE1 == null) {
+ null
+ } else {
+ val evalE2 = right.eval(input)
+ if (evalE2 == null) {
+ null
+ } else {
+ numeric.minus(evalE1, evalE2)
+ }
+ }
+ }
}
case class Multiply(left: Expression, right: Expression) extends
BinaryArithmetic {
def symbol = "*"
- override def eval(input: Row): Any = n2(input, left, right, _.times(_, _))
+ lazy val numeric = dataType match {
+ case n: NumericType => n.numeric.asInstanceOf[Numeric[Any]]
+ case other => sys.error(s"Type $other does not support numeric operations")
+ }
+
+ override def eval(input: Row): Any = {
+ val evalE1 = left.eval(input)
+ if(evalE1 == null) {
+ null
+ } else {
+ val evalE2 = right.eval(input)
+ if (evalE2 == null) {
+ null
+ } else {
+ numeric.times(evalE1, evalE2)
+ }
+ }
+ }
}
case class Divide(left: Expression, right: Expression) extends
BinaryArithmetic {
@@ -118,16 +181,25 @@ case class Divide(left: Expression, right: Expression)
extends BinaryArithmetic
override def nullable = true
+ lazy val div: (Any, Any) => Any = dataType match {
+ case ft: FractionalType => ft.fractional.asInstanceOf[Fractional[Any]].div
+ case it: IntegralType => it.integral.asInstanceOf[Integral[Any]].quot
+ case other => sys.error(s"Type $other does not support numeric operations")
+ }
+
override def eval(input: Row): Any = {
val evalE2 = right.eval(input)
- dataType match {
- case _ if evalE2 == null => null
- case _ if evalE2 == 0 => null
- case ft: FractionalType => f1(input, left, _.div(_,
evalE2.asInstanceOf[ft.JvmType]))
- case it: IntegralType => i1(input, left, _.quot(_,
evalE2.asInstanceOf[it.JvmType]))
+ if (evalE2 == null || evalE2 == 0) {
+ null
+ } else {
+ val evalE1 = left.eval(input)
+ if (evalE1 == null) {
+ null
+ } else {
+ div(evalE1, evalE2)
+ }
}
}
-
}
case class Remainder(left: Expression, right: Expression) extends
BinaryArithmetic {
@@ -135,12 +207,23 @@ case class Remainder(left: Expression, right: Expression)
extends BinaryArithmet
override def nullable = true
+ lazy val integral = dataType match {
+ case i: IntegralType => i.integral.asInstanceOf[Integral[Any]]
+ case i: FractionalType => i.asIntegral.asInstanceOf[Integral[Any]]
+ case other => sys.error(s"Type $other does not support numeric operations")
+ }
+
override def eval(input: Row): Any = {
val evalE2 = right.eval(input)
- dataType match {
- case _ if evalE2 == null => null
- case _ if evalE2 == 0 => null
- case nt: NumericType => i1(input, left, _.rem(_,
evalE2.asInstanceOf[nt.JvmType]))
+ if (evalE2 == null || evalE2 == 0) {
+ null
+ } else {
+ val evalE1 = left.eval(input)
+ if (evalE1 == null) {
+ null
+ } else {
+ integral.rem(evalE1, evalE2)
+ }
}
}
}
@@ -151,13 +234,19 @@ case class Remainder(left: Expression, right: Expression)
extends BinaryArithmet
case class BitwiseAnd(left: Expression, right: Expression) extends
BinaryArithmetic {
def symbol = "&"
- override def evalInternal(evalE1: EvaluatedType, evalE2: EvaluatedType): Any
= dataType match {
- case ByteType => (evalE1.asInstanceOf[Byte] &
evalE2.asInstanceOf[Byte]).toByte
- case ShortType => (evalE1.asInstanceOf[Short] &
evalE2.asInstanceOf[Short]).toShort
- case IntegerType => evalE1.asInstanceOf[Int] & evalE2.asInstanceOf[Int]
- case LongType => evalE1.asInstanceOf[Long] & evalE2.asInstanceOf[Long]
+ lazy val and: (Any, Any) => Any = dataType match {
+ case ByteType =>
+ ((evalE1: Byte, evalE2: Byte) => (evalE1 &
evalE2).toByte).asInstanceOf[(Any, Any) => Any]
+ case ShortType =>
+ ((evalE1: Short, evalE2: Short) => (evalE1 &
evalE2).toShort).asInstanceOf[(Any, Any) => Any]
+ case IntegerType =>
+ ((evalE1: Int, evalE2: Int) => evalE1 & evalE2).asInstanceOf[(Any, Any)
=> Any]
+ case LongType =>
+ ((evalE1: Long, evalE2: Long) => evalE1 & evalE2).asInstanceOf[(Any,
Any) => Any]
case other => sys.error(s"Unsupported bitwise & operation on $other")
}
+
+ override def evalInternal(evalE1: EvaluatedType, evalE2: EvaluatedType): Any
= and(evalE1, evalE2)
}
/**
@@ -166,13 +255,19 @@ case class BitwiseAnd(left: Expression, right:
Expression) extends BinaryArithme
case class BitwiseOr(left: Expression, right: Expression) extends
BinaryArithmetic {
def symbol = "|"
- override def evalInternal(evalE1: EvaluatedType, evalE2: EvaluatedType): Any
= dataType match {
- case ByteType => (evalE1.asInstanceOf[Byte] |
evalE2.asInstanceOf[Byte]).toByte
- case ShortType => (evalE1.asInstanceOf[Short] |
evalE2.asInstanceOf[Short]).toShort
- case IntegerType => evalE1.asInstanceOf[Int] | evalE2.asInstanceOf[Int]
- case LongType => evalE1.asInstanceOf[Long] | evalE2.asInstanceOf[Long]
+ lazy val or: (Any, Any) => Any = dataType match {
+ case ByteType =>
+ ((evalE1: Byte, evalE2: Byte) => (evalE1 |
evalE2).toByte).asInstanceOf[(Any, Any) => Any]
+ case ShortType =>
+ ((evalE1: Short, evalE2: Short) => (evalE1 |
evalE2).toShort).asInstanceOf[(Any, Any) => Any]
+ case IntegerType =>
+ ((evalE1: Int, evalE2: Int) => evalE1 | evalE2).asInstanceOf[(Any, Any)
=> Any]
+ case LongType =>
+ ((evalE1: Long, evalE2: Long) => evalE1 | evalE2).asInstanceOf[(Any,
Any) => Any]
case other => sys.error(s"Unsupported bitwise | operation on $other")
}
+
+ override def evalInternal(evalE1: EvaluatedType, evalE2: EvaluatedType): Any
= or(evalE1, evalE2)
}
/**
@@ -181,13 +276,19 @@ case class BitwiseOr(left: Expression, right: Expression)
extends BinaryArithmet
case class BitwiseXor(left: Expression, right: Expression) extends
BinaryArithmetic {
def symbol = "^"
- override def evalInternal(evalE1: EvaluatedType, evalE2: EvaluatedType): Any
= dataType match {
- case ByteType => (evalE1.asInstanceOf[Byte] ^
evalE2.asInstanceOf[Byte]).toByte
- case ShortType => (evalE1.asInstanceOf[Short] ^
evalE2.asInstanceOf[Short]).toShort
- case IntegerType => evalE1.asInstanceOf[Int] ^ evalE2.asInstanceOf[Int]
- case LongType => evalE1.asInstanceOf[Long] ^ evalE2.asInstanceOf[Long]
+ lazy val xor: (Any, Any) => Any = dataType match {
+ case ByteType =>
+ ((evalE1: Byte, evalE2: Byte) => (evalE1 ^
evalE2).toByte).asInstanceOf[(Any, Any) => Any]
+ case ShortType =>
+ ((evalE1: Short, evalE2: Short) => (evalE1 ^
evalE2).toShort).asInstanceOf[(Any, Any) => Any]
+ case IntegerType =>
+ ((evalE1: Int, evalE2: Int) => evalE1 ^ evalE2).asInstanceOf[(Any, Any)
=> Any]
+ case LongType =>
+ ((evalE1: Long, evalE2: Long) => evalE1 ^ evalE2).asInstanceOf[(Any,
Any) => Any]
case other => sys.error(s"Unsupported bitwise ^ operation on $other")
}
+
+ override def evalInternal(evalE1: EvaluatedType, evalE2: EvaluatedType): Any
= xor(evalE1, evalE2)
}
/**
@@ -201,18 +302,24 @@ case class BitwiseNot(child: Expression) extends
UnaryExpression {
def nullable = child.nullable
override def toString = s"~$child"
+ lazy val not: (Any) => Any = dataType match {
+ case ByteType =>
+ ((evalE: Byte) => (~evalE).toByte).asInstanceOf[(Any) => Any]
+ case ShortType =>
+ ((evalE: Short) => (~evalE).toShort).asInstanceOf[(Any) => Any]
+ case IntegerType =>
+ ((evalE: Int) => ~evalE).asInstanceOf[(Any) => Any]
+ case LongType =>
+ ((evalE: Long) => ~evalE).asInstanceOf[(Any) => Any]
+ case other => sys.error(s"Unsupported bitwise ~ operation on $other")
+ }
+
override def eval(input: Row): Any = {
val evalE = child.eval(input)
if (evalE == null) {
null
} else {
- dataType match {
- case ByteType => (~evalE.asInstanceOf[Byte]).toByte
- case ShortType => (~evalE.asInstanceOf[Short]).toShort
- case IntegerType => ~evalE.asInstanceOf[Int]
- case LongType => ~evalE.asInstanceOf[Long]
- case other => sys.error(s"Unsupported bitwise ~ operation on $other")
- }
+ not(evalE)
}
}
}
@@ -226,21 +333,35 @@ case class MaxOf(left: Expression, right: Expression)
extends Expression {
override def children = left :: right :: Nil
- override def dataType = left.dataType
+ override lazy val resolved =
+ left.resolved && right.resolved &&
+ left.dataType == right.dataType
+
+ override def dataType = {
+ if (!resolved) {
+ throw new UnresolvedException(this,
+ s"datatype. Can not resolve due to differing types ${left.dataType},
${right.dataType}")
+ }
+ left.dataType
+ }
+
+ lazy val ordering = left.dataType match {
+ case i: NativeType => i.ordering.asInstanceOf[Ordering[Any]]
+ case other => sys.error(s"Type $other does not support ordered operations")
+ }
override def eval(input: Row): Any = {
- val leftEval = left.eval(input)
- val rightEval = right.eval(input)
- if (leftEval == null) {
- rightEval
- } else if (rightEval == null) {
- leftEval
+ val evalE1 = left.eval(input)
+ val evalE2 = right.eval(input)
+ if (evalE1 == null) {
+ evalE2
+ } else if (evalE2 == null) {
+ evalE1
} else {
- val numeric =
left.dataType.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]]
- if (numeric.compare(leftEval, rightEval) < 0) {
- rightEval
+ if (ordering.compare(evalE1, evalE2) < 0) {
+ evalE2
} else {
- leftEval
+ evalE1
}
}
}
@@ -259,5 +380,17 @@ case class Abs(child: Expression) extends UnaryExpression
{
def nullable = child.nullable
override def toString = s"Abs($child)"
- override def eval(input: Row): Any = n1(child, input, _.abs(_))
+ lazy val numeric = dataType match {
+ case n: NumericType => n.numeric.asInstanceOf[Numeric[Any]]
+ case other => sys.error(s"Type $other does not support numeric operations")
+ }
+
+ override def eval(input: Row): Any = {
+ val evalE = child.eval(input)
+ if (evalE == null) {
+ null
+ } else {
+ numeric.abs(evalE)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/639a3c2f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 365b168..0024ef9 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -18,8 +18,9 @@
package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
+import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.types.{BinaryType, BooleanType}
+import org.apache.spark.sql.types.{BinaryType, BooleanType, NativeType}
object InterpretedPredicate {
def apply(expression: Expression, inputSchema: Seq[Attribute]): (Row =>
Boolean) =
@@ -201,22 +202,118 @@ case class EqualNullSafe(left: Expression, right:
Expression) extends BinaryComp
case class LessThan(left: Expression, right: Expression) extends
BinaryComparison {
def symbol = "<"
- override def eval(input: Row): Any = c2(input, left, right, _.lt(_, _))
+
+ lazy val ordering = {
+ if (left.dataType != right.dataType) {
+ throw new TreeNodeException(this,
+ s"Types do not match ${left.dataType} != ${right.dataType}")
+ }
+ left.dataType match {
+ case i: NativeType => i.ordering.asInstanceOf[Ordering[Any]]
+ case other => sys.error(s"Type $other does not support ordered
operations")
+ }
+ }
+
+ override def eval(input: Row): Any = {
+ val evalE1 = left.eval(input)
+ if(evalE1 == null) {
+ null
+ } else {
+ val evalE2 = right.eval(input)
+ if (evalE2 == null) {
+ null
+ } else {
+ ordering.lt(evalE1, evalE2)
+ }
+ }
+ }
}
case class LessThanOrEqual(left: Expression, right: Expression) extends
BinaryComparison {
def symbol = "<="
- override def eval(input: Row): Any = c2(input, left, right, _.lteq(_, _))
+
+ lazy val ordering = {
+ if (left.dataType != right.dataType) {
+ throw new TreeNodeException(this,
+ s"Types do not match ${left.dataType} != ${right.dataType}")
+ }
+ left.dataType match {
+ case i: NativeType => i.ordering.asInstanceOf[Ordering[Any]]
+ case other => sys.error(s"Type $other does not support ordered
operations")
+ }
+ }
+
+ override def eval(input: Row): Any = {
+ val evalE1 = left.eval(input)
+ if(evalE1 == null) {
+ null
+ } else {
+ val evalE2 = right.eval(input)
+ if (evalE2 == null) {
+ null
+ } else {
+ ordering.lteq(evalE1, evalE2)
+ }
+ }
+ }
}
case class GreaterThan(left: Expression, right: Expression) extends
BinaryComparison {
def symbol = ">"
- override def eval(input: Row): Any = c2(input, left, right, _.gt(_, _))
+
+ lazy val ordering = {
+ if (left.dataType != right.dataType) {
+ throw new TreeNodeException(this,
+ s"Types do not match ${left.dataType} != ${right.dataType}")
+ }
+ left.dataType match {
+ case i: NativeType => i.ordering.asInstanceOf[Ordering[Any]]
+ case other => sys.error(s"Type $other does not support ordered
operations")
+ }
+ }
+
+ override def eval(input: Row): Any = {
+ val evalE1 = left.eval(input)
+ if(evalE1 == null) {
+ null
+ } else {
+ val evalE2 = right.eval(input)
+ if (evalE2 == null) {
+ null
+ } else {
+ ordering.gt(evalE1, evalE2)
+ }
+ }
+ }
}
case class GreaterThanOrEqual(left: Expression, right: Expression) extends
BinaryComparison {
def symbol = ">="
- override def eval(input: Row): Any = c2(input, left, right, _.gteq(_, _))
+
+ lazy val ordering = {
+ if (left.dataType != right.dataType) {
+ throw new TreeNodeException(this,
+ s"Types do not match ${left.dataType} != ${right.dataType}")
+ }
+ left.dataType match {
+ case i: NativeType => i.ordering.asInstanceOf[Ordering[Any]]
+ case other => sys.error(s"Type $other does not support ordered
operations")
+ }
+ }
+
+ override def eval(input: Row): Any = {
+ val evalE1 = left.eval(input)
+ if(evalE1 == null) {
+ null
+ } else {
+ val evalE2 = right.eval(input)
+ if (evalE2 == null) {
+ null
+ } else {
+ ordering.gteq(evalE1, evalE2)
+ }
+ }
+ }
}
case class If(predicate: Expression, trueValue: Expression, falseValue:
Expression)
http://git-wip-us.apache.org/repos/asf/spark/blob/639a3c2f/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
old mode 100755
new mode 100644
http://git-wip-us.apache.org/repos/asf/spark/blob/639a3c2f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/MetadataSuite.scala
old mode 100755
new mode 100644
http://git-wip-us.apache.org/repos/asf/spark/blob/639a3c2f/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
old mode 100755
new mode 100644
http://git-wip-us.apache.org/repos/asf/spark/blob/639a3c2f/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
----------------------------------------------------------------------
diff --git
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
old mode 100755
new mode 100644
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]