http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/CalculationUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/CalculationUtil.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/CalculationUtil.scala
new file mode 100644
index 0000000..c969012
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/CalculationUtil.scala
@@ -0,0 +1,315 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule
+
+import scala.util.{Success, Try}
+
+
+object CalculationUtil {
+
+  implicit def option2CalculationValue(v: Option[_]): CalculationValue = 
CalculationValue(v)
+
+  // redefine the calculation method of operators in DSL
+  case class CalculationValue(value: Option[_]) extends Serializable {
+
+    def + (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
+          case (Some(v1: String), Some(v2)) => Some(v1 + v2.toString)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 + v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 + v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 + v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 + v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 + v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 + v2.toString.toDouble)
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def - (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
+          case (Some(v1: Byte), Some(v2)) => Some(v1 - v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 - v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 - v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 - v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 - v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 - v2.toString.toDouble)
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def * (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
+          case (Some(s1: String), Some(n2: Int)) => Some(s1 * n2)
+          case (Some(s1: String), Some(n2: Long)) => Some(s1 * n2.toInt)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 * v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 * v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 * v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 * v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 * v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 * v2.toString.toDouble)
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def / (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
+          case (Some(v1: Byte), Some(v2)) => Some(v1 / v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 / v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 / v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 / v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 / v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 / v2.toString.toDouble)
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def % (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
+          case (Some(v1: Byte), Some(v2)) => Some(v1 % v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 % v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 % v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 % v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 % v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 % v2.toString.toDouble)
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def unary_- (): Option[_] = {
+      value match {
+        case None => None
+        case Some(null) => None
+        case Some(v: String) => Some(v.reverse.toString)
+        case Some(v: Boolean) => Some(!v)
+        case Some(v: Byte) => Some(-v)
+        case Some(v: Short) => Some(-v)
+        case Some(v: Int) => Some(-v)
+        case Some(v: Long) => Some(-v)
+        case Some(v: Float) => Some(-v)
+        case Some(v: Double) => Some(-v)
+        case Some(v) => Some(v)
+        case _ => None
+      }
+    }
+
+
+    def === (other: Option[_]): Option[Boolean] = {
+      (value, other) match {
+        case (None, None) => Some(true)
+        case (Some(v1), Some(v2)) => Some(v1 == v2)
+        case _ => Some(false)
+      }
+    }
+
+    def =!= (other: Option[_]): Option[Boolean] = {
+      (value, other) match {
+        case (None, None) => Some(false)
+        case (Some(v1), Some(v2)) => Some(v1 != v2)
+        case _ => Some(true)
+      }
+    }
+
+    def > (other: Option[_]): Option[Boolean] = {
+      Try {
+        (value, other) match {
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
+          case (Some(v1: String), Some(v2: String)) => Some(v1 > v2)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Short), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Int), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Long), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Float), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Double), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case _ => None
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def >= (other: Option[_]): Option[Boolean] = {
+      Try {
+        (value, other) match {
+          case (None, None) | (Some(null), Some(null)) => Some(true)
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
+          case (Some(v1: String), Some(v2: String)) => Some(v1 >= v2)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Short), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Int), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Long), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Float), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Double), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case _ => None
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def < (other: Option[_]): Option[Boolean] = {
+      Try {
+        (value, other) match {
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
+          case (Some(v1: String), Some(v2: String)) => Some(v1 < v2)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Short), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Int), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Long), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Float), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Double), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case _ => None
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def <= (other: Option[_]): Option[Boolean] = {
+      Try {
+        (value, other) match {
+          case (None, None) | (Some(null), Some(null)) => Some(true)
+          case (None, _) | (_, None) => None
+          case (Some(null), _) | (_, Some(null)) => None
+          case (Some(v1: String), Some(v2: String)) => Some(v1 <= v2)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Short), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Int), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Long), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Float), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Double), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case _ => None
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+
+    def in (other: Iterable[Option[_]]): Option[Boolean] = {
+      other.foldLeft(Some(false): Option[Boolean]) { (res, next) =>
+        optOr(res, ===(next))
+      }
+    }
+
+    def not_in (other: Iterable[Option[_]]): Option[Boolean] = {
+      other.foldLeft(Some(true): Option[Boolean]) { (res, next) =>
+        optAnd(res, =!=(next))
+      }
+    }
+
+    def between (other: Iterable[Option[_]]): Option[Boolean] = {
+      if (other.size < 2) None else {
+        val (begin, end) = (other.head, other.tail.head)
+        if (begin.isEmpty && end.isEmpty) Some(value.isEmpty)
+        else optAnd(>=(begin), <=(end))
+      }
+    }
+
+    def not_between (other: Iterable[Option[_]]): Option[Boolean] = {
+      if (other.size < 2) None else {
+        val (begin, end) = (other.head, other.tail.head)
+        if (begin.isEmpty && end.isEmpty) Some(value.nonEmpty)
+        else optOr(<(begin), >(end))
+      }
+    }
+
+    def unary_! (): Option[Boolean] = {
+      optNot(value)
+    }
+
+    def && (other: Option[_]): Option[Boolean] = {
+      optAnd(value, other)
+    }
+
+    def || (other: Option[_]): Option[Boolean] = {
+      optOr(value, other)
+    }
+
+
+    private def optNot(a: Option[_]): Option[Boolean] = {
+      a match {
+        case None => None
+        case Some(null) => None
+        case Some(v: Boolean) => Some(!v)
+        case _ => None
+      }
+    }
+    private def optAnd(a: Option[_], b: Option[_]): Option[Boolean] = {
+      (a, b) match {
+        case (None, _) | (_, None) => None
+        case (Some(null), _) | (_, Some(null)) => None
+        case (Some(false), _) | (_, Some(false)) => Some(false)
+        case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 && b2)
+        case _ => None
+      }
+    }
+    private def optOr(a: Option[_], b: Option[_]): Option[Boolean] = {
+      (a, b) match {
+        case (None, _) | (_, None) => None
+        case (Some(null), _) | (_, Some(null)) => None
+        case (Some(true), _) | (_, Some(true)) => Some(true)
+        case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 || b2)
+        case _ => None
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/DataTypeCalculationUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/DataTypeCalculationUtil.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/DataTypeCalculationUtil.scala
new file mode 100644
index 0000000..9d027ec
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/DataTypeCalculationUtil.scala
@@ -0,0 +1,159 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule
+
+import org.apache.spark.sql.types._
+
+object DataTypeCalculationUtil {
+
+  implicit def dataType2CalculationType(tp: DataType): CalculationType = 
CalculationType(tp)
+
+  case class CalculationType(tp: DataType) extends Serializable {
+    def binaryOpr (other: DataType): DataType = {
+      (tp, other) match {
+        case (NullType, _) | (_, NullType) => NullType
+        case (t, _) => t
+      }
+    }
+    def unaryOpr (): DataType = {
+      tp
+    }
+  }
+
+  case class DataTypeException() extends Exception {}
+
+  def getDataType(value: Any): DataType = {
+    value match {
+      case v: String => StringType
+      case v: Boolean => BooleanType
+      case v: Long => LongType
+      case v: Int => IntegerType
+      case v: Short => ShortType
+      case v: Byte => ByteType
+      case v: Double => DoubleType
+      case v: Float => FloatType
+      case v: Map[_, _] => MapType(getSameDataType(v.keys), 
getSameDataType(v.values))
+      case v: Iterable[_] => ArrayType(getSameDataType(v))
+      case _ => NullType
+    }
+  }
+
+  private def getSameDataType(values: Iterable[Any]): DataType = {
+    values.foldLeft(NullType: DataType)((a, c) => genericTypeOf(a, 
getDataType(c)))
+  }
+
+  private def genericTypeOf(dt1: DataType, dt2: DataType): DataType = {
+    if (dt1 == dt2) dt1 else {
+      dt1 match {
+        case NullType => dt2
+        case StringType => dt1
+        case DoubleType => {
+          dt2 match {
+            case StringType => dt2
+            case DoubleType | FloatType | LongType | IntegerType | ShortType | 
ByteType => dt1
+            case _ => throw DataTypeException()
+          }
+        }
+        case FloatType => {
+          dt2 match {
+            case StringType | DoubleType => dt2
+            case FloatType | LongType | IntegerType | ShortType | ByteType => 
dt1
+            case _ => throw DataTypeException()
+          }
+        }
+        case LongType => {
+          dt2 match {
+            case StringType | DoubleType | FloatType => dt2
+            case LongType | IntegerType | ShortType | ByteType => dt1
+            case _ => throw DataTypeException()
+          }
+        }
+        case IntegerType => {
+          dt2 match {
+            case StringType | DoubleType | FloatType | LongType => dt2
+            case IntegerType | ShortType | ByteType => dt1
+            case _ => throw DataTypeException()
+          }
+        }
+        case ShortType => {
+          dt2 match {
+            case StringType | DoubleType | FloatType | LongType | IntegerType 
=> dt2
+            case ShortType | ByteType => dt1
+            case _ => throw DataTypeException()
+          }
+        }
+        case ByteType => {
+          dt2 match {
+            case StringType | DoubleType | FloatType | LongType | IntegerType 
| ShortType => dt2
+            case ByteType => dt1
+            case _ => throw DataTypeException()
+          }
+        }
+        case BooleanType => {
+          dt2 match {
+            case StringType => dt2
+            case BooleanType => dt1
+            case _ => throw DataTypeException()
+          }
+        }
+        case MapType(kdt1, vdt1, _) => {
+          dt2 match {
+            case MapType(kdt2, vdt2, _) => MapType(genericTypeOf(kdt1, kdt2), 
genericTypeOf(vdt1, vdt2))
+            case _ => throw DataTypeException()
+          }
+        }
+        case ArrayType(vdt1, _) => {
+          dt2 match {
+            case ArrayType(vdt2, _) => ArrayType(genericTypeOf(vdt1, vdt2))
+            case _ => throw DataTypeException()
+          }
+        }
+        case _ => throw DataTypeException()
+      }
+    }
+  }
+
+  def sequenceDataTypeMap(aggr: Map[String, DataType], value: Map[String, 
Any]): Map[String, DataType] = {
+    val dataTypes = value.foldLeft(Map[String, DataType]()) { (map, pair) =>
+      val (k, v) = pair
+      try {
+        map + (k -> getDataType(v))
+      } catch {
+        case e: DataTypeException => map
+      }
+    }
+    combineDataTypeMap(aggr, dataTypes)
+  }
+
+  def combineDataTypeMap(aggr1: Map[String, DataType], aggr2: Map[String, 
DataType]): Map[String, DataType] = {
+    aggr2.foldLeft(aggr1) { (a, c) =>
+      a.get(c._1) match {
+        case Some(t) => {
+          try {
+            a + (c._1 -> genericTypeOf(t, c._2))
+          } catch {
+            case e: DataTypeException => a
+          }
+        }
+        case _ => a + c
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/ExprValueUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/ExprValueUtil.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/ExprValueUtil.scala
new file mode 100644
index 0000000..940d0cb
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/ExprValueUtil.scala
@@ -0,0 +1,263 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule
+
+import org.apache.griffin.measure.rule.expr._
+import org.apache.griffin.measure.rule.func._
+import org.apache.spark.sql.Row
+
+import scala.util.{Success, Try}
+
+object ExprValueUtil {
+
+  private def append(path: List[String], step: String): List[String] = {
+    path :+ step
+  }
+
+  private def value2Map(key: String, value: Option[Any]): Map[String, Any] = {
+    value.flatMap(v => Some((key -> v))).toMap
+  }
+
+  private def getSingleValue(data: Option[Any], desc: FieldDescOnly): 
Option[Any] = {
+    data match {
+      case Some(row: Row) => {
+        desc match {
+          case i: IndexDesc => try { Some(row.getAs[Any](i.index)) } catch { 
case _ => None }
+          case f: FieldDesc => try { Some(row.getAs[Any](f.field)) } catch { 
case _ => None }
+          case _ => None
+        }
+      }
+      case Some(d: Map[String, Any]) => {
+        desc match {
+          case f: FieldDesc => d.get(f.field)
+          case _ => None
+        }
+      }
+      case Some(d: Seq[Any]) => {
+        desc match {
+          case i: IndexDesc => if (i.index >= 0 && i.index < d.size) 
Some(d(i.index)) else None
+          case _ => None
+        }
+      }
+    }
+  }
+
+  private def calcExprValues(pathDatas: List[(List[String], Option[Any])], 
expr: Expr, existExprValueMap: Map[String, Any]): List[(List[String], 
Option[Any])] = {
+    Try {
+      expr match {
+        case selection: SelectionExpr => {
+          selection.selectors.foldLeft(pathDatas) { (pds, selector) =>
+            calcExprValues(pds, selector, existExprValueMap)
+          }
+        }
+        case selector: IndexFieldRangeSelectExpr => {
+          pathDatas.flatMap { pathData =>
+            val (path, data) = pathData
+            data match {
+              case Some(row: Row) => {
+                selector.fields.flatMap { field =>
+                  field match {
+                    case (_: IndexDesc) | (_: FieldDesc) => {
+                      getSingleValue(data, field).map { v => (append(path, 
field.desc), Some(v)) }
+                    }
+                    case a: AllFieldsDesc => {
+                      (0 until row.size).flatMap { i =>
+                        getSingleValue(data, IndexDesc(i.toString)).map { v =>
+                          (append(path, s"${a.desc}_${i}"), Some(v))
+                        }
+                      }.toList
+                    }
+                    case r: FieldRangeDesc => {
+                      (r.startField, r.endField) match {
+                        case (si: IndexDesc, ei: IndexDesc) => {
+                          (si.index to ei.index).flatMap { i =>
+                            (append(path, s"${r.desc}_${i}"), 
getSingleValue(data, IndexDesc(i.toString)))
+                            getSingleValue(data, IndexDesc(i.toString)).map { 
v =>
+                              (append(path, s"${r.desc}_${i}"), Some(v))
+                            }
+                          }.toList
+                        }
+                        case _ => Nil
+                      }
+                    }
+                    case _ => Nil
+                  }
+                }
+              }
+              case Some(d: Map[String, Any]) => {
+                selector.fields.flatMap { field =>
+                  field match {
+                    case (_: IndexDesc) | (_: FieldDesc) => {
+                      getSingleValue(data, field).map { v => (append(path, 
field.desc), Some(v)) }
+                    }
+                    case a: AllFieldsDesc => {
+                      d.keySet.flatMap { k =>
+                        getSingleValue(data, FieldDesc(k)).map { v =>
+                          (append(path, s"${a.desc}_${k}"), Some(v))
+                        }
+                      }
+                    }
+                    case _ => None
+                  }
+                }
+              }
+              case Some(d: Seq[Any]) => {
+                selector.fields.flatMap { field =>
+                  field match {
+                    case (_: IndexDesc) | (_: FieldDesc) => {
+                      getSingleValue(data, field).map { v => (append(path, 
field.desc), Some(v)) }
+                    }
+                    case a: AllFieldsDesc => {
+                      (0 until d.size).flatMap { i =>
+                        (append(path, s"${a.desc}_${i}"), getSingleValue(data, 
IndexDesc(i.toString)))
+                        getSingleValue(data, IndexDesc(i.toString)).map { v =>
+                          (append(path, s"${a.desc}_${i}"), Some(v))
+                        }
+                      }.toList
+                    }
+                    case r: FieldRangeDesc => {
+                      (r.startField, r.endField) match {
+                        case (si: IndexDesc, ei: IndexDesc) => {
+                          (si.index to ei.index).flatMap { i =>
+                            (append(path, s"${r.desc}_${i}"), 
getSingleValue(data, IndexDesc(i.toString)))
+                            getSingleValue(data, IndexDesc(i.toString)).map { 
v =>
+                              (append(path, s"${r.desc}_${i}"), Some(v))
+                            }
+                          }.toList
+                        }
+                        case _ => None
+                      }
+                    }
+                    case _ => None
+                  }
+                }
+              }
+            }
+          }
+        }
+        case selector: FunctionOperationExpr => {
+          val args: Array[Option[Any]] = selector.args.map { arg =>
+            arg.calculate(existExprValueMap)
+          }.toArray
+          pathDatas.flatMap { pathData =>
+            val (path, data) = pathData
+            data match {
+              case Some(d: String) => {
+                val res = FunctionUtil.invoke(selector.func, Some(d) +: args)
+                val residx = res.zipWithIndex
+                residx.map { vi =>
+                  val (v, i) = vi
+                  val step = if (i == 0) s"${selector.desc}" else 
s"${selector.desc}_${i}"
+                  (append(path, step), v)
+                }
+              }
+              case _ => None
+            }
+          }
+        }
+        case selector: FilterSelectExpr => {  // fileter means select the 
items fit the condition
+          pathDatas.flatMap { pathData =>
+            val (path, data) = pathData
+            data match {
+              case Some(row: Row) => {
+                // right value could not be selection
+                val rmap = value2Map(selector.value._id, 
selector.value.calculate(existExprValueMap))
+                (0 until row.size).flatMap { i =>
+                  val dt = getSingleValue(data, IndexDesc(i.toString))
+                  val lmap = value2Map(selector.fieldKey, getSingleValue(dt, 
selector.field))
+                  val partValueMap = lmap ++ rmap
+                  selector.calculate(partValueMap) match {
+                    case Some(true) => Some((append(path, 
s"${selector.desc}_${i}"), dt))
+                    case _ => None
+                  }
+                }
+              }
+              case Some(d: Map[String, Any]) => {
+                val rmap = value2Map(selector.value._id, 
selector.value.calculate(existExprValueMap))
+                d.keySet.flatMap { k =>
+                  val dt = getSingleValue(data, FieldDesc(k))
+                  val lmap = value2Map(selector.fieldKey, getSingleValue(dt, 
selector.field))
+                  val partValueMap = lmap ++ rmap
+                  selector.calculate(partValueMap) match {
+                    case Some(true) => Some((append(path, 
s"${selector.desc}_${k}"), dt))
+                    case _ => None
+                  }
+                }
+              }
+              case Some(d: Seq[Any]) => {
+                val rmap = value2Map(selector.value._id, 
selector.value.calculate(existExprValueMap))
+                (0 until d.size).flatMap { i =>
+                  val dt = getSingleValue(data, IndexDesc(i.toString))
+                  val lmap = value2Map(selector.fieldKey, getSingleValue(dt, 
selector.field))
+                  val partValueMap = lmap ++ rmap
+                  selector.calculate(partValueMap) match {
+                    case Some(true) => Some((append(path, 
s"${selector.desc}_${i}"), dt))
+                    case _ => None
+                  }
+                }
+              }
+            }
+          }
+        }
+        case _ => {
+          (expr.desc :: Nil, expr.calculate(existExprValueMap)) :: Nil
+        }
+      }
+    } match {
+      case Success(v) => v
+      case _ => Nil
+    }
+  }
+
+  private def calcExprsValues(data: Option[Any], exprs: Iterable[Expr], 
existExprValueMap: Map[String, Any]): List[Map[String, Any]] = {
+    val selectionValues: Map[String, List[(List[String], Any)]] = exprs.map { 
expr =>
+      (expr._id, calcExprValues((Nil, data) :: Nil, expr, 
existExprValueMap).flatMap { pair =>
+        pair._2 match {
+          case Some(v) => Some((pair._1, v))
+          case _ => None
+        }
+      })
+    }.toMap
+    // if exprs is empty, return an empty value map for each row
+    if (selectionValues.isEmpty) List(Map[String, Any]())
+    else SchemaValueCombineUtil.cartesian(selectionValues)
+  }
+
+  // try to calculate some exprs from data and initExprValueMap, generate a 
new expression value map
+  // depends on origin data and existed expr value map
+  def genExprValueMaps(data: Option[Any], exprs: Iterable[Expr], 
initExprValueMap: Map[String, Any]): List[Map[String, Any]] = {
+    val (selections, nonSelections) = 
exprs.partition(_.isInstanceOf[SelectionExpr])
+    val valueMaps = calcExprsValues(data, selections, initExprValueMap)
+    updateExprValueMaps(nonSelections, valueMaps)
+  }
+
+  // with exprValueMap, calculate expressions, update the expression value map
+  // only depends on existed expr value map, only calculation, not need origin 
data
+  def updateExprValueMaps(exprs: Iterable[Expr], exprValueMaps: 
List[Map[String, Any]]): List[Map[String, Any]] = {
+    exprValueMaps.map { valueMap =>
+      exprs.foldLeft(valueMap) { (em, expr) =>
+        expr.calculate(em) match {
+          case Some(v) => em + (expr._id -> v)
+          case _ => em
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/RuleAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/RuleAnalyzer.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/RuleAnalyzer.scala
new file mode 100644
index 0000000..5ec143f
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/RuleAnalyzer.scala
@@ -0,0 +1,72 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule
+
+import org.apache.griffin.measure.rule.expr._
+
+case class RuleAnalyzer(rule: StatementExpr) extends Serializable {
+
+  val constData = ""
+  private val SourceData = "source"
+  private val TargetData = "target"
+
+  val constCacheExprs: Iterable[Expr] = rule.getCacheExprs(constData)
+  private val sourceCacheExprs: Iterable[Expr] = rule.getCacheExprs(SourceData)
+  private val targetCacheExprs: Iterable[Expr] = rule.getCacheExprs(TargetData)
+
+  private val sourcePersistExprs: Iterable[Expr] = 
rule.getPersistExprs(SourceData)
+  private val targetPersistExprs: Iterable[Expr] = 
rule.getPersistExprs(TargetData)
+
+  val constFinalCacheExprs: Iterable[Expr] = 
rule.getFinalCacheExprs(constData).toSet
+  private val sourceFinalCacheExprs: Iterable[Expr] = 
rule.getFinalCacheExprs(SourceData).toSet ++ sourcePersistExprs.toSet
+  private val targetFinalCacheExprs: Iterable[Expr] = 
rule.getFinalCacheExprs(TargetData).toSet ++ targetPersistExprs.toSet
+
+  private val groupbyExprPairs: Seq[(Expr, Expr)] = 
rule.getGroupbyExprPairs((SourceData, TargetData))
+  private val sourceGroupbyExprs: Seq[Expr] = groupbyExprPairs.map(_._1)
+  private val targetGroupbyExprs: Seq[Expr] = groupbyExprPairs.map(_._2)
+
+  val sourceRuleExprs: RuleExprs = RuleExprs(sourceGroupbyExprs, 
sourceCacheExprs,
+    sourceFinalCacheExprs, sourcePersistExprs)
+  val targetRuleExprs: RuleExprs = RuleExprs(targetGroupbyExprs, 
targetCacheExprs,
+    targetFinalCacheExprs, targetPersistExprs)
+
+}
+
+
+// for a single data source
+// groupbyExprs: in accuracy case, these exprs could be groupby exprs
+//                  Data keys for accuracy case, generated by the equal 
statements, to improve the calculation efficiency
+// cacheExprs: the exprs value could be caculated independently, and cached 
for later use
+//                  Cached for the finalCacheExprs calculation, it has some 
redundant values, saving it wastes a lot
+// finalCacheExprs: the root of cachedExprs, cached for later use, plus with 
persistExprs
+//                  Cached for the calculation usage, and can be saved for the 
re-calculation in streaming mode
+// persistExprs: the expr values should be persisted, only the direct 
selection exprs are persistable
+//                  Persisted for record usage, to record the missing data, 
need be readable as raw data
+case class RuleExprs(groupbyExprs: Seq[Expr],
+                     cacheExprs: Iterable[Expr],
+                     finalCacheExprs: Iterable[Expr],
+                     persistExprs: Iterable[Expr]
+                    ) extends Serializable {
+  // for example: for a rule "$source.name = $target.name AND $source.age < 
$target.age + (3 * 4)"
+  // in this rule, for the target data source, the targetRuleExprs looks like 
below
+  // groupbyExprs: $target.name
+  // cacheExprs: $target.name, $target.age, $target.age + (3 * 4)
+  // finalCacheExprs: $target.name, $target.age + (3 * 4), $target.age
+  // persistExprs: $target.name, $target.age
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/RuleFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/RuleFactory.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/RuleFactory.scala
new file mode 100644
index 0000000..bbaf5cb
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/RuleFactory.scala
@@ -0,0 +1,52 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule
+
+import org.apache.griffin.measure.config.params.user._
+
+import scala.util.Failure
+//import org.apache.griffin.measure.rule.expr_old._
+import org.apache.griffin.measure.rule.expr._
+
+import scala.util.{Success, Try}
+
+
+case class RuleFactory(evaluateRuleParam: EvaluateRuleParam) {
+
+  val ruleParser: RuleParser = RuleParser()
+
+  def generateRule(): StatementExpr = {
+    val rules = evaluateRuleParam.rules
+    val statement = parseExpr(rules) match {
+      case Success(se) => se
+      case Failure(ex) => throw ex
+    }
+    statement
+  }
+
+  private def parseExpr(rules: String): Try[StatementExpr] = {
+    Try {
+      val result = ruleParser.parseAll(ruleParser.rule, rules)
+      if (result.successful) result.get
+      else throw new Exception("parse rule error!")
+//      throw new Exception("parse rule error!")
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/RuleParser.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/RuleParser.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/RuleParser.scala
new file mode 100644
index 0000000..55d9f45
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/RuleParser.scala
@@ -0,0 +1,244 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule
+
+import org.apache.griffin.measure.rule.expr._
+
+import scala.util.parsing.combinator._
+
+case class RuleParser() extends JavaTokenParsers with Serializable {
+
+  /**
+    * BNF representation for grammar as below:
+    *
+    * <rule> ::= <logical-statement> [WHEN <logical-statement>]
+    * rule: mapping-rule [WHEN when-rule]
+    * - mapping-rule: the first level opr should better not be OR | NOT, 
otherwise it can't automatically find the groupby column
+    * - when-rule: only contain the general info of data source, not the 
special info of each data row
+    *
+    * <logical-statement> ::= [NOT] <logical-expression> [(AND | OR) 
<logical-expression>]+ | "(" <logical-statement> ")"
+    * logical-statement: return boolean value
+    * logical-operator: "AND" | "&&", "OR" | "||", "NOT" | "!"
+    *
+    * <logical-expression> ::= <math-expr> (<compare-opr> <math-expr> | 
<range-opr> <range-expr>)
+    * logical-expression example: $source.id = $target.id, $source.page_id IN 
('3214', '4312', '60821')
+    *
+    * <compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">="
+    * <range-opr> ::= ["NOT"] "IN" | "BETWEEN"
+    * <range-expr> ::= "(" [<math-expr>] [, <math-expr>]+ ")"
+    * range-expr example: ('3214', '4312', '60821'), (10, 15), ()
+    *
+    * <math-expr> ::= [<unary-opr>] <math-factor> [<binary-opr> <math-factor>]+
+    * math-expr example: $source.price * $target.count, "hello" + " " + 
"world" + 123
+    *
+    * <binary-opr> ::= "+" | "-" | "*" | "/" | "%"
+    * <unary-opr> ::= "+" | "-"
+    *
+    * <math-factor> ::= <literal> | <selection> | "(" <math-expr> ")"
+    *
+    * <selection> ::= <selection-head> [ <field-sel> | <function-operation> | 
<index-field-range-sel> | <filter-sel> ]+
+    * selection example: $source.price, $source.json(), $source['state'], 
$source.numList[3], $target.json().mails['org' = 'apache'].names[*]
+    *
+    * <selection-head> ::= $source | $target
+    *
+    * <field-sel> ::= "." <field-string>
+    *
+    * <function-operation> ::= "." <function-name> "(" <arg> [, <arg>]+ ")"
+    * <function-name> ::= <name-string>
+    * <arg> ::= <math-expr>
+    *
+    * <index-field-range-sel> ::= "[" <index-field-range> [, 
<index-field-range>]+ "]"
+    * <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | 
"*"
+    * index-field-range: 2 means the 3rd item, (0, 3) means first 4 items, * 
means all items, 'age' means item 'age'
+    * <index-field> ::= <index> | <field-quote> | <all-selection>
+    * index: 0 ~ n means position from start, -1 ~ -n means position from end
+    * <field-quote> ::= ' <field-string> ' | " <field-string> "
+    *
+    * <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <math-expr> "]"
+    * <filter-compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">="
+    * filter-sel example: ['name' = 'URL'], $source.man['age' > 
$source.graduate_age + 5 ]
+    *
+    * When <math-expr> in the selection, it mustn't contain the different 
<selection-head>, for example:
+    * $source.tags[1+2]             valid
+    * $source.tags[$source.first]   valid
+    * $source.tags[$target.first]   invalid
+    * -- Such job is for validation, not for parser
+    *
+    *
+    * <literal> ::= <literal-string> | <literal-number> | <literal-time> | 
<literal-boolean> | <literal-null> | <literal-none>
+    * <literal-string> ::= <any-string>
+    * <literal-number> ::= <integer> | <double>
+    * <literal-time> ::= <integer> ("d"|"h"|"m"|"s"|"ms")
+    * <literal-boolean> ::= true | false
+    * <literal-null> ::= null | undefined
+    * <literal-none> ::= none
+    *
+    */
+
+  object Keyword {
+    def WhenKeywords: Parser[String] = """(?i)when""".r
+    def UnaryLogicalKeywords: Parser[String] = """(?i)not""".r
+    def BinaryLogicalKeywords: Parser[String] = """(?i)and|or""".r
+    def RangeKeywords: Parser[String] = """(?i)(not\s+)?(in|between)""".r
+    def DataSourceKeywords: Parser[String] = """(?i)\$(source|target)""".r
+    def Keywords: Parser[String] = WhenKeywords | UnaryLogicalKeywords | 
BinaryLogicalKeywords | RangeKeywords | DataSourceKeywords
+  }
+  import Keyword._
+
+  object Operator {
+    def NotLogicalOpr: Parser[String] = """(?i)not""".r | "!"
+    def AndLogicalOpr: Parser[String] = """(?i)and""".r | "&&"
+    def OrLogicalOpr: Parser[String] = """(?i)or""".r | "||"
+    def CompareOpr: Parser[String] = """!?==?""".r | """<=?""".r | """>=?""".r
+    def RangeOpr: Parser[String] = RangeKeywords
+
+    def UnaryMathOpr: Parser[String] = "+" | "-"
+    def BinaryMathOpr1: Parser[String] = "*" | "/" | "%"
+    def BinaryMathOpr2: Parser[String] = "+" | "-"
+
+    def FilterCompareOpr: Parser[String] = """!?==?""".r | """<=?""".r | 
""">=?""".r
+
+    def SqBracketPair: (Parser[String], Parser[String]) = ("[", "]")
+    def BracketPair: (Parser[String], Parser[String]) = ("(", ")")
+    def Dot: Parser[String] = "."
+    def AllSelection: Parser[String] = "*"
+    def SQuote: Parser[String] = "'"
+    def DQuote: Parser[String] = "\""
+    def Comma: Parser[String] = ","
+  }
+  import Operator._
+
+  object SomeString {
+//    def AnyString: Parser[String] = """[^'\"{}\[\]()=<>.$@,;+\-*/\\]*""".r
+    def AnyString: Parser[String] = """[^'\"]*""".r
+    def SimpleFieldString: Parser[String] = """\w+""".r
+    def FieldString: Parser[String] = """[\w\s]+""".r
+    def NameString: Parser[String] = """[a-zA-Z_]\w*""".r
+  }
+  import SomeString._
+
+  object SomeNumber {
+    def IntegerNumber: Parser[String] = """[+\-]?\d+""".r
+    def DoubleNumber: Parser[String] = """[+\-]?(\.\d+|\d+\.\d*)""".r
+    def IndexNumber: Parser[String] = IntegerNumber
+  }
+  import SomeNumber._
+
+  // -- literal --
+  def literal: Parser[LiteralExpr] = literialString | literialTime | 
literialNumber | literialBoolean | literialNull | literialNone
+  def literialString: Parser[LiteralStringExpr] = (SQuote ~> AnyString <~ 
SQuote | DQuote ~> AnyString <~ DQuote) ^^ { LiteralStringExpr(_) }
+  def literialNumber: Parser[LiteralNumberExpr] = (DoubleNumber | 
IntegerNumber) ^^ { LiteralNumberExpr(_) }
+  def literialTime: Parser[LiteralTimeExpr] = """(\d+(d|h|m|s|ms))+""".r ^^ { 
LiteralTimeExpr(_) }
+  def literialBoolean: Parser[LiteralBooleanExpr] = ("""(?i)true""".r | 
"""(?i)false""".r) ^^ { LiteralBooleanExpr(_) }
+  def literialNull: Parser[LiteralNullExpr] = ("""(?i)null""".r | 
"""(?i)undefined""".r) ^^ { LiteralNullExpr(_) }
+  def literialNone: Parser[LiteralNoneExpr] = """(?i)none""".r ^^ { 
LiteralNoneExpr(_) }
+
+  // -- selection --
+  // <selection> ::= <selection-head> [ <field-sel> | <function-operation> | 
<index-field-range-sel> | <filter-sel> ]+
+  def selection: Parser[SelectionExpr] = selectionHead ~ rep(selector) ^^ {
+    case head ~ selectors => SelectionExpr(head, selectors)
+  }
+  def selector: Parser[SelectExpr] = (functionOperation | fieldSelect | 
indexFieldRangeSelect | filterSelect)
+
+  def selectionHead: Parser[SelectionHead] = DataSourceKeywords ^^ { 
SelectionHead(_) }
+  // <field-sel> ::= "." <field-string>
+  def fieldSelect: Parser[IndexFieldRangeSelectExpr] = Dot ~> 
SimpleFieldString ^^ {
+    case field => IndexFieldRangeSelectExpr(FieldDesc(field) :: Nil)
+  }
+  // <function-operation> ::= "." <function-name> "(" <arg> [, <arg>]+ ")"
+  def functionOperation: Parser[FunctionOperationExpr] = Dot ~ NameString ~ 
BracketPair._1 ~ repsep(argument, Comma) ~ BracketPair._2 ^^ {
+    case _ ~ func ~ _ ~ args ~ _ => FunctionOperationExpr(func, args)
+  }
+  def argument: Parser[MathExpr] = mathExpr
+  // <index-field-range-sel> ::= "[" <index-field-range> [, 
<index-field-range>]+ "]"
+  def indexFieldRangeSelect: Parser[IndexFieldRangeSelectExpr] = 
SqBracketPair._1 ~> rep1sep(indexFieldRange, Comma) <~ SqBracketPair._2 ^^ {
+    case ifrs => IndexFieldRangeSelectExpr(ifrs)
+  }
+  // <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | 
"*"
+  def indexFieldRange: Parser[FieldDescOnly] = indexField | BracketPair._1 ~ 
indexField ~ Comma ~ indexField ~ BracketPair._2 ^^ {
+    case _ ~ if1 ~ _ ~ if2 ~ _ => FieldRangeDesc(if1, if2)
+  }
+  // <index-field> ::= <index> | <field-quote> | <all-selection>
+  // *here it can parse <math-expr>, but for simple situation, not supported 
now*
+  def indexField: Parser[FieldDescOnly] = IndexNumber ^^ { IndexDesc(_) } | 
fieldQuote | AllSelection ^^ { AllFieldsDesc(_) }
+  // <field-quote> ::= ' <field-string> ' | " <field-string> "
+  def fieldQuote: Parser[FieldDesc] = (SQuote ~> FieldString <~ SQuote | 
DQuote ~> FieldString <~ DQuote) ^^ { FieldDesc(_) }
+  // <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <math-expr> "]"
+  def filterSelect: Parser[FilterSelectExpr] = SqBracketPair._1 ~> fieldQuote 
~ FilterCompareOpr ~ mathExpr <~ SqBracketPair._2 ^^ {
+    case field ~ compare ~ value => FilterSelectExpr(field, compare, value)
+  }
+
+  // -- math --
+  // <math-factor> ::= <literal> | <selection> | "(" <math-expr> ")"
+  def mathFactor: Parser[MathExpr] = (literal | selection | BracketPair._1 ~> 
mathExpr <~ BracketPair._2) ^^ { MathFactorExpr(_) }
+  // <math-expr> ::= [<unary-opr>] <math-factor> [<binary-opr> <math-factor>]+
+  // <unary-opr> ::= "+" | "-"
+  def unaryMathExpr: Parser[MathExpr] = rep(UnaryMathOpr) ~ mathFactor ^^ {
+    case Nil ~ a => a
+    case list ~ a => UnaryMathExpr(list, a)
+  }
+  // <binary-opr> ::= "+" | "-" | "*" | "/" | "%"
+  def binaryMathExpr1: Parser[MathExpr] = unaryMathExpr ~ rep(BinaryMathOpr1 ~ 
unaryMathExpr) ^^ {
+    case a ~ Nil => a
+    case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2)))
+  }
+  def binaryMathExpr2: Parser[MathExpr] = binaryMathExpr1 ~ rep(BinaryMathOpr2 
~ binaryMathExpr1) ^^ {
+    case a ~ Nil => a
+    case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2)))
+  }
+  def mathExpr: Parser[MathExpr] = binaryMathExpr2
+
+  // -- logical expression --
+  // <range-expr> ::= "(" [<math-expr>] [, <math-expr>]+ ")"
+  def rangeExpr: Parser[RangeDesc] = BracketPair._1 ~> repsep(mathExpr, Comma) 
<~ BracketPair._2 ^^ { RangeDesc(_) }
+  // <logical-expression> ::= <math-expr> (<compare-opr> <math-expr> | 
<range-opr> <range-expr>)
+  def logicalExpr: Parser[LogicalExpr] = mathExpr ~ CompareOpr ~ mathExpr ^^ {
+    case left ~ opr ~ right => LogicalCompareExpr(left, opr, right)
+  } | mathExpr ~ RangeOpr ~ rangeExpr ^^ {
+    case left ~ opr ~ range => LogicalRangeExpr(left, opr, range)
+  } | mathExpr ^^ { LogicalSimpleExpr(_) }
+
+  // -- logical statement --
+  def logicalFactor: Parser[LogicalExpr] = logicalExpr | BracketPair._1 ~> 
logicalStatement <~ BracketPair._2
+  def notLogicalStatement: Parser[LogicalExpr] = rep(NotLogicalOpr) ~ 
logicalFactor ^^ {
+    case Nil ~ a => a
+    case list ~ a => UnaryLogicalExpr(list, a)
+  }
+  def andLogicalStatement: Parser[LogicalExpr] = notLogicalStatement ~ 
rep(AndLogicalOpr ~ notLogicalStatement) ^^ {
+    case a ~ Nil => a
+    case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2)))
+  }
+  def orLogicalStatement: Parser[LogicalExpr] = andLogicalStatement ~ 
rep(OrLogicalOpr ~ andLogicalStatement) ^^ {
+    case a ~ Nil => a
+    case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2)))
+  }
+  // <logical-statement> ::= [NOT] <logical-expression> [(AND | OR) 
<logical-expression>]+ | "(" <logical-statement> ")"
+  def logicalStatement: Parser[LogicalExpr] = orLogicalStatement
+
+  // -- clause statement --
+  def whereClause: Parser[WhereClauseExpr] = logicalStatement ^^ { 
WhereClauseExpr(_) }
+  def whenClause: Parser[WhenClauseExpr] = WhenKeywords ~> logicalStatement ^^ 
{ WhenClauseExpr(_) }
+
+  // -- rule --
+  // <rule> ::= <logical-statement> [WHEN <logical-statement>]
+  def rule: Parser[StatementExpr] = whereClause ~ opt(whenClause) ^^ {
+    case a ~ b => StatementExpr(a, b)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/SchemaValueCombineUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/SchemaValueCombineUtil.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/SchemaValueCombineUtil.scala
new file mode 100644
index 0000000..ed3b3fc
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/SchemaValueCombineUtil.scala
@@ -0,0 +1,187 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule
+
+object SchemaValueCombineUtil {
+
+  // Map[String, List[(List[String], T)]]: Map[key, List[(path, value)]]
+  def cartesian[T](valuesMap: Map[String, List[(List[String], T)]]): 
List[Map[String, T]] = {
+    val fieldsList: List[(String, List[(List[String], T)])] = valuesMap.toList
+
+    // List[key, List[(path, value)]] to List[(path, (key, value))]
+    val valueList: List[(List[String], (String, T))] = fieldsList.flatMap { 
fields =>
+      val (key, list) = fields
+      list.map { pv =>
+        val (path, value) = pv
+        (path, (key, value))
+      }
+    }
+
+    // 1. generate tree from value list, and return root node
+    val root = TreeUtil.genRootTree(valueList)
+
+    // 2. deep first visit tree from root, merge datas into value map list
+    val valueMapList: List[Map[String, _]] = TreeUtil.mergeDatasIntoMap(root, 
Nil)
+
+    // 3. simple change
+    val result = valueMapList.map { mp =>
+      mp.map { kv =>
+        val (k, v) = kv
+        (k, v.asInstanceOf[T])
+      }
+    }
+
+    result
+
+  }
+
+
+  case class TreeNode(key: String, var datas: List[(String, _)]) {
+    var children = List[TreeNode]()
+    def addChild(node: TreeNode): Unit = children = children :+ node
+    def mergeSelf(node: TreeNode): Unit = datas = datas ::: node.datas
+  }
+
+  object TreeUtil {
+    private def genTree(path: List[String], datas: List[(String, _)]): 
Option[TreeNode] = {
+      path match {
+        case Nil => None
+        case head :: tail => {
+          genTree(tail, datas) match {
+            case Some(child) => {
+              val curNode = TreeNode(head, Nil)
+              curNode.addChild(child)
+              Some(curNode)
+            }
+            case _ => Some(TreeNode(head, datas))
+          }
+        }
+      }
+    }
+
+    private def mergeTrees(trees: List[TreeNode], newTreeOpt: 
Option[TreeNode]): List[TreeNode] = {
+      newTreeOpt match {
+        case Some(newTree) => {
+          trees.find(tree => tree.key == newTree.key) match {
+            case Some(tree) => {
+              // children merge
+              for (child <- newTree.children) {
+                tree.children = mergeTrees(tree.children, Some(child))
+              }
+              // self data merge
+              tree.mergeSelf(newTree)
+              trees
+            }
+            case _ => trees :+ newTree
+          }
+        }
+        case _ => trees
+      }
+    }
+
+    private def root(): TreeNode = TreeNode("", Nil)
+
+    def genRootTree(values: List[(List[String], (String, _))]): TreeNode = {
+      val rootNode = root()
+      val nodeOpts = values.map(value => genTree(value._1, value._2 :: Nil))
+      rootNode.children = nodeOpts.foldLeft(List[TreeNode]()) { (trees, 
treeOpt) =>
+        mergeTrees(trees, treeOpt)
+      }
+      rootNode
+    }
+
+    private def add(mapList1: List[Map[String, _]], mapList2: List[Map[String, 
_]]):  List[Map[String, _]] = {
+      mapList1 ::: mapList2
+    }
+    private def multiply(mapList1: List[Map[String, _]], mapList2: 
List[Map[String, _]]):  List[Map[String, _]] = {
+      mapList1.flatMap { map1 =>
+        mapList2.map { map2 =>
+          map1 ++ map2
+        }
+      }
+    }
+
+    private def keysList(mapList: List[Map[String, _]]): List[String] = {
+      val keySet = mapList match {
+        case Nil => Set[String]()
+        case head :: _ => head.keySet
+      }
+      keySet.toList
+    }
+
+    def mergeDatasIntoMap(root: TreeNode, mapDatas: List[Map[String, _]]): 
List[Map[String, _]] = {
+      val childrenKeysMapDatas = root.children.foldLeft(Map[List[String], 
List[Map[String, _]]]()) { (keysMap, child) =>
+        val childMdts = mergeDatasIntoMap(child, List[Map[String, _]]())
+        childMdts match {
+          case Nil => keysMap
+          case _ => {
+            val keys = keysList(childMdts)
+            val afterList = keysMap.get(keys) match {
+              case Some(list) => add(list, childMdts)
+              case _ => childMdts
+            }
+            keysMap + (keys -> afterList)
+          }
+        }
+      }
+      val childrenMergeMaps = 
childrenKeysMapDatas.values.foldLeft(List[Map[String, _]]()) { (originList, 
list) =>
+        originList match {
+          case Nil => list
+          case _ => multiply(originList, list)
+        }
+      }
+      val result = mergeNodeChildrenDatasIntoMap(root, childrenMergeMaps)
+      result
+    }
+
+    private def mergeNodeChildrenDatasIntoMap(node: TreeNode, mapDatas: 
List[Map[String, _]]): List[Map[String, _]] = {
+      val datas: List[(String, (String, Any))] = node.children.flatMap { child 
=>
+        child.datas.map(dt => (dt._1, (child.key, dt._2)))
+      }
+      val childrenDataKeys: Set[String] = datas.map(_._1).toSet
+      val childrenDataLists: Map[String, List[(String, _)]] = 
datas.foldLeft(childrenDataKeys.map(k => (k, List[(String, _)]())).toMap) { 
(maps, data) =>
+        maps.get(data._1) match {
+          case Some(list) => maps + (data._1 -> (list :+ data._2))
+          case _ => maps
+        }
+      }
+
+      // multiply different key datas
+      childrenDataLists.foldLeft(mapDatas) { (mdts, klPair) =>
+        val (key, list) = klPair
+        mdts match {
+          case Nil => list.map(pr => Map[String, Any]((key -> pr._2)))
+          case _ => {
+            list.flatMap { kvPair =>
+              val (path, value) = kvPair
+              mdts.map { mp =>
+                mp + (key -> value)
+              }
+            }
+          }
+        }
+      }
+
+    }
+  }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/AnalyzableExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/AnalyzableExpr.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/AnalyzableExpr.scala
new file mode 100644
index 0000000..aefcaad
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/AnalyzableExpr.scala
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.expr
+
+
+trait AnalyzableExpr extends Serializable {
+  def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = Nil
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Cacheable.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Cacheable.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Cacheable.scala
new file mode 100644
index 0000000..feb8156
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Cacheable.scala
@@ -0,0 +1,33 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.expr
+
+trait Cacheable extends DataSourceable {
+  protected def cacheUnit: Boolean = false
+  def cacheable(ds: String): Boolean = {
+    cacheUnit && !conflict() && ((ds.isEmpty && dataSources.isEmpty) || 
(ds.nonEmpty && contains(ds)))
+  }
+  protected def getCacheExprs(ds: String): Iterable[Cacheable]
+
+  protected def persistUnit: Boolean = false
+  def persistable(ds: String): Boolean = {
+    persistUnit && ((ds.isEmpty && dataSources.isEmpty) || (ds.nonEmpty && 
contains(ds)))
+  }
+  protected def getPersistExprs(ds: String): Iterable[Cacheable]
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Calculatable.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Calculatable.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Calculatable.scala
new file mode 100644
index 0000000..904e823
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Calculatable.scala
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.expr
+
+trait Calculatable extends Serializable {
+
+  def calculate(values: Map[String, Any]): Option[Any]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ClauseExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ClauseExpr.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ClauseExpr.scala
new file mode 100644
index 0000000..a56e0db
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ClauseExpr.scala
@@ -0,0 +1,109 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.expr
+
+
+trait ClauseExpr extends Expr with AnalyzableExpr {
+  def valid(values: Map[String, Any]): Boolean = true
+  override def cacheUnit: Boolean = true
+}
+
+case class WhereClauseExpr(expr: LogicalExpr) extends ClauseExpr {
+  def calculateOnly(values: Map[String, Any]): Option[Any] = 
expr.calculate(values)
+  val desc: String = expr.desc
+  val dataSources: Set[String] = expr.dataSources
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    expr.getCacheExprs(ds)
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    expr.getFinalCacheExprs(ds)
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    expr.getPersistExprs(ds)
+  }
+
+  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, 
Expr)] = expr.getGroupbyExprPairs(dsPair)
+}
+
+case class WhenClauseExpr(expr: LogicalExpr) extends ClauseExpr {
+  def calculateOnly(values: Map[String, Any]): Option[Any] = 
expr.calculate(values)
+  val desc: String = s"WHEN ${expr.desc}"
+  val dataSources: Set[String] = expr.dataSources
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    expr.getCacheExprs(ds)
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    expr.getFinalCacheExprs(ds)
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    expr.getPersistExprs(ds)
+  }
+
+  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, 
Expr)] = expr.getGroupbyExprPairs(dsPair)
+}
+
+case class StatementExpr(whereClause: WhereClauseExpr, whenClauseOpt: 
Option[WhenClauseExpr]) extends ClauseExpr {
+  def calculateOnly(values: Map[String, Any]): Option[Any] = 
whereClause.calculate(values)
+  val desc: String = {
+    whenClauseOpt match {
+      case Some(expr) => s"${whereClause.desc} ${expr.desc}"
+      case _ => whereClause.desc
+    }
+  }
+  val dataSources: Set[String] = whereClause.dataSources
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    whereClause.getCacheExprs(ds)
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    whereClause.getFinalCacheExprs(ds)
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    whereClause.getPersistExprs(ds)
+  }
+
+  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, 
Expr)] = whereClause.getGroupbyExprPairs(dsPair)
+}
+
+//case class WhenClauseStatementExpr(expr: LogicalExpr, whenExpr: LogicalExpr) 
extends ClauseExpr {
+//  def calculateOnly(values: Map[String, Any]): Option[Any] = 
expr.calculate(values)
+//  val desc: String = s"${expr.desc} when ${whenExpr.desc}"
+//
+//  override def valid(values: Map[String, Any]): Boolean = {
+//    whenExpr.calculate(values) match {
+//      case Some(r: Boolean) => r
+//      case _ => false
+//    }
+//  }
+//
+//  val dataSources: Set[String] = expr.dataSources ++ whenExpr.dataSources
+//  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+//    expr.getCacheExprs(ds) ++ whenExpr.getCacheExprs(ds)
+//  }
+//  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+//    expr.getFinalCacheExprs(ds) ++ whenExpr.getFinalCacheExprs(ds)
+//  }
+//  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+//    expr.getPersistExprs(ds) ++ whenExpr.getPersistExprs(ds)
+//  }
+//
+//  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, 
Expr)] = {
+//    expr.getGroupbyExprPairs(dsPair) ++ whenExpr.getGroupbyExprPairs(dsPair)
+//  }
+//  override def getWhenClauseExpr(): Option[LogicalExpr] = Some(whenExpr)
+//}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/DataSourceable.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/DataSourceable.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/DataSourceable.scala
new file mode 100644
index 0000000..e2cf172
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/DataSourceable.scala
@@ -0,0 +1,28 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.expr
+
+trait DataSourceable extends Serializable {
+  val dataSources: Set[String]
+  protected def conflict(): Boolean = dataSources.size > 1
+  def contains(ds: String): Boolean = dataSources.contains(ds)
+  def dataSourceOpt: Option[String] = {
+    if (dataSources.size == 1) Some(dataSources.head) else None
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Describable.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Describable.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Describable.scala
new file mode 100644
index 0000000..393d7a6
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Describable.scala
@@ -0,0 +1,33 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.expr
+
+trait Describable extends Serializable {
+
+  val desc: String
+
+  protected def describe(v: Any): String = {
+    v match {
+      case s: Describable => s"${s.desc}"
+      case s: String => s"'${s}'"
+      case a => s"${a}"
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Expr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Expr.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Expr.scala
new file mode 100644
index 0000000..726b5b6
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Expr.scala
@@ -0,0 +1,53 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.expr
+
+import org.apache.spark.sql.types.DataType
+
+trait Expr extends Serializable with Describable with Cacheable with 
Calculatable {
+
+  protected val _defaultId: String = ExprIdCounter.emptyId
+
+  val _id = ExprIdCounter.genId(_defaultId)
+
+  protected def getSubCacheExprs(ds: String): Iterable[Expr] = Nil
+  final def getCacheExprs(ds: String): Iterable[Expr] = {
+    if (cacheable(ds)) getSubCacheExprs(ds).toList :+ this else 
getSubCacheExprs(ds)
+  }
+
+  protected def getSubFinalCacheExprs(ds: String): Iterable[Expr] = Nil
+  final def getFinalCacheExprs(ds: String): Iterable[Expr] = {
+    if (cacheable(ds)) Nil :+ this else getSubFinalCacheExprs(ds)
+  }
+
+  protected def getSubPersistExprs(ds: String): Iterable[Expr] = Nil
+  final def getPersistExprs(ds: String): Iterable[Expr] = {
+    if (persistable(ds)) getSubPersistExprs(ds).toList :+ this else 
getSubPersistExprs(ds)
+  }
+
+  final def calculate(values: Map[String, Any]): Option[Any] = {
+    values.get(_id) match {
+      case Some(v) => Some(v)
+      case _ => calculateOnly(values)
+    }
+  }
+  protected def calculateOnly(values: Map[String, Any]): Option[Any]
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ExprDescOnly.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ExprDescOnly.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ExprDescOnly.scala
new file mode 100644
index 0000000..01b7e3c
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ExprDescOnly.scala
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.expr
+
+trait ExprDescOnly extends Describable {
+
+}
+
+
+case class SelectionHead(expr: String) extends ExprDescOnly {
+  private val headRegex = """\$(\w+)""".r
+  val head: String = expr match {
+    case headRegex(v) => v.toLowerCase
+    case _ => expr
+  }
+  val desc: String = "$" + head
+}
+
+case class RangeDesc(elements: Iterable[MathExpr]) extends ExprDescOnly {
+  val desc: String = {
+    val rangeDesc = elements.map(_.desc).mkString(", ")
+    s"(${rangeDesc})"
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ExprIdCounter.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ExprIdCounter.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ExprIdCounter.scala
new file mode 100644
index 0000000..ae76aef
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ExprIdCounter.scala
@@ -0,0 +1,60 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.expr
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable.{Set => MutableSet}
+
+object ExprIdCounter {
+
+  private val idCounter: AtomicLong = new AtomicLong(0L)
+
+  private val existIdSet: MutableSet[String] = MutableSet.empty[String]
+
+  private val invalidIdRegex = """^\d+$""".r
+
+  val emptyId: String = ""
+
+  def genId(defaultId: String): String = {
+    defaultId match {
+      case emptyId => increment.toString
+      case invalidIdRegex() => increment.toString
+//      case defId if (exist(defId)) => s"${increment}#${defId}"
+      case defId if (exist(defId)) => s"${defId}"
+      case _ => {
+        insertUserId(defaultId)
+        defaultId
+      }
+    }
+  }
+
+  private def exist(id: String): Boolean = {
+    existIdSet.contains(id)
+  }
+
+  private def insertUserId(id: String): Unit = {
+    existIdSet += id
+  }
+
+  private def increment(): Long = {
+    idCounter.incrementAndGet()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/FieldDescOnly.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/FieldDescOnly.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/FieldDescOnly.scala
new file mode 100644
index 0000000..dca037b
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/FieldDescOnly.scala
@@ -0,0 +1,58 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.expr
+
+import scala.util.{Success, Try}
+
+trait FieldDescOnly extends Describable with DataSourceable {
+
+}
+
+case class IndexDesc(expr: String) extends FieldDescOnly {
+  val index: Int = {
+    Try(expr.toInt) match {
+      case Success(v) => v
+      case _ => throw new Exception(s"${expr} is invalid index")
+    }
+  }
+  val desc: String = describe(index)
+  val dataSources: Set[String] = Set.empty[String]
+}
+
+case class FieldDesc(expr: String) extends FieldDescOnly {
+  val field: String = expr
+  val desc: String = describe(field)
+  val dataSources: Set[String] = Set.empty[String]
+}
+
+case class AllFieldsDesc(expr: String) extends FieldDescOnly {
+  val allFields: String = expr
+  val desc: String = allFields
+  val dataSources: Set[String] = Set.empty[String]
+}
+
+case class FieldRangeDesc(startField: FieldDescOnly, endField: FieldDescOnly) 
extends FieldDescOnly {
+  val desc: String = {
+    (startField, endField) match {
+      case (f1: IndexDesc, f2: IndexDesc) => s"(${f1.desc}, ${f2.desc})"
+      case _ => throw new Exception("invalid field range description")
+    }
+  }
+  val dataSources: Set[String] = Set.empty[String]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/LiteralExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/LiteralExpr.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/LiteralExpr.scala
new file mode 100644
index 0000000..acf1589
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/LiteralExpr.scala
@@ -0,0 +1,83 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.expr
+
+import org.apache.griffin.measure.utils.TimeUtil
+import org.apache.spark.sql.types._
+
+import scala.util.{Failure, Success, Try}
+
+trait LiteralExpr extends Expr {
+  val value: Option[Any]
+  def calculateOnly(values: Map[String, Any]): Option[Any] = value
+  val dataSources: Set[String] = Set.empty[String]
+}
+
+case class LiteralValueExpr(value: Option[Any]) extends LiteralExpr {
+  val desc: String = value.getOrElse("").toString
+}
+
+case class LiteralStringExpr(expr: String) extends LiteralExpr {
+  val value: Option[String] = Some(expr)
+  val desc: String = s"'${value.getOrElse("")}'"
+}
+
+case class LiteralNumberExpr(expr: String) extends LiteralExpr {
+  val value: Option[Any] = {
+    if (expr.contains(".")) {
+      Try (expr.toDouble) match {
+        case Success(v) => Some(v)
+        case _ => throw new Exception(s"${expr} is invalid number")
+      }
+    } else {
+      Try (expr.toLong) match {
+        case Success(v) => Some(v)
+        case _ => throw new Exception(s"${expr} is invalid number")
+      }
+    }
+  }
+  val desc: String = value.getOrElse("").toString
+}
+
+case class LiteralTimeExpr(expr: String) extends LiteralExpr {
+  final val TimeRegex = """(\d+)(d|h|m|s|ms)""".r
+  val value: Option[Long] = TimeUtil.milliseconds(expr)
+  val desc: String = expr
+}
+
+case class LiteralBooleanExpr(expr: String) extends LiteralExpr {
+  final val TrueRegex = """(?i)true""".r
+  final val FalseRegex = """(?i)false""".r
+  val value: Option[Boolean] = expr match {
+    case TrueRegex() => Some(true)
+    case FalseRegex() => Some(false)
+    case _ => throw new Exception(s"${expr} is invalid boolean")
+  }
+  val desc: String = value.getOrElse("").toString
+}
+
+case class LiteralNullExpr(expr: String) extends LiteralExpr {
+  val value: Option[Any] = Some(null)
+  val desc: String = "null"
+}
+
+case class LiteralNoneExpr(expr: String) extends LiteralExpr {
+  val value: Option[Any] = None
+  val desc: String = "none"
+}
\ No newline at end of file

Reply via email to