http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
new file mode 100644
index 0000000..6070189
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
@@ -0,0 +1,298 @@
+package org.apache.griffin.measure.batch.rule
+
+import org.apache.griffin.measure.batch.rule.expr._
+
+import scala.util.parsing.combinator._
+
+case class RuleParser() extends JavaTokenParsers with Serializable {
+
+  /**
+    * BNF representation for grammar as below:
+    *
+    * <rule> ::= <logical-statement> [WHEN <logical-statement>]
+    * rule: mapping-rule [WHEN when-rule]
+    * - mapping-rule: the first level opr should better not be OR | NOT, 
otherwise it can't automatically find the groupby column
+    * - when-rule: only contain the general info of data source, not the 
special info of each data row
+    *
+    * <logical-statement> ::= [NOT] <logical-expression> [(AND | OR) 
<logical-expression>]+ | "(" <logical-statement> ")"
+    * logical-statement: return boolean value
+    * logical-operator: "AND" | "&&", "OR" | "||", "NOT" | "!"
+    *
+    * <logical-expression> ::= <math-expr> (<compare-opr> <math-expr> | 
<range-opr> <range-expr>)
+    * logical-expression example: $source.id = $target.id, $source.page_id IN 
('3214', '4312', '60821')
+    *
+    * <compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">="
+    * <range-opr> ::= ["NOT"] "IN" | "BETWEEN"
+    * <range-expr> ::= "(" [<math-expr>] [, <math-expr>]+ ")"
+    * range-expr example: ('3214', '4312', '60821'), (10, 15), ()
+    *
+    * <math-expr> ::= [<unary-opr>] <math-factor> [<binary-opr> <math-factor>]+
+    * math-expr example: $source.price * $target.count, "hello" + " " + 
"world" + 123
+    *
+    * <binary-opr> ::= "+" | "-" | "*" | "/" | "%"
+    * <unary-opr> ::= "+" | "-"
+    *
+    * <math-factor> ::= <literal> | <selection> | "(" <math-expr> ")"
+    *
+    * <selection> ::= <selection-head> [ <field-sel> | <function-operation> | 
<index-field-range-sel> | <filter-sel> ]+
+    * selection example: $source.price, $source.json(), $source['state'], 
$source.numList[3], $target.json().mails['org' = 'apache'].names[*]
+    *
+    * <selection-head> ::= $source | $target
+    *
+    * <field-sel> ::= "." <field-string>
+    *
+    * <function-operation> ::= "." <function-name> "(" <arg> [, <arg>]+ ")"
+    * <function-name> ::= <name-string>
+    * <arg> ::= <math-expr>
+    *
+    * <index-field-range-sel> ::= "[" <index-field-range> [, 
<index-field-range>]+ "]"
+    * <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | 
"*"
+    * index-field-range: 2 means the 3rd item, (0, 3) means first 4 items, * 
means all items, 'age' means item 'age'
+    * <index-field> ::= <index> | <field-quote> | <all-selection>
+    * index: 0 ~ n means position from start, -1 ~ -n means position from end
+    * <field-quote> ::= ' <field-string> ' | " <field-string> "
+    *
+    * <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <math-expr> "]"
+    * <filter-compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">="
+    * filter-sel example: ['name' = 'URL'], $source.man['age' > 
$source.graduate_age + 5 ]
+    *
+    * When <math-expr> in the selection, it mustn't contain the different 
<selection-head>, for example:
+    * $source.tags[1+2]             valid
+    * $source.tags[$source.first]   valid
+    * $source.tags[$target.first]   invalid
+    * -- Such job is for validation, not for parser
+    *
+    *
+    * <literal> ::= <literal-string> | <literal-number> | <literal-time> | 
<literal-boolean>
+    * <literal-string> ::= <any-string>
+    * <literal-number> ::= <integer> | <double>
+    * <literal-time> ::= <integer> ("d"|"h"|"m"|"s"|"ms")
+    * <literal-boolean> ::= true | false
+    *
+    */
+
+  object Keyword {
+    def WhenKeywords: Parser[String] = """(?i)when""".r
+    def UnaryLogicalKeywords: Parser[String] = """(?i)not""".r
+    def BinaryLogicalKeywords: Parser[String] = """(?i)and|or""".r
+    def RangeKeywords: Parser[String] = """(?i)(not\s+)?(in|between)""".r
+    def DataSourceKeywords: Parser[String] = """(?i)\$(source|target)""".r
+    def Keywords: Parser[String] = WhenKeywords | UnaryLogicalKeywords | 
BinaryLogicalKeywords | RangeKeywords | DataSourceKeywords
+  }
+  import Keyword._
+
+  object Operator {
+    def NotLogicalOpr: Parser[String] = """(?i)not""".r | "!"
+    def AndLogicalOpr: Parser[String] = """(?i)and""".r | "&&"
+    def OrLogicalOpr: Parser[String] = """(?i)or""".r | "||"
+    def CompareOpr: Parser[String] = """!?==?""".r | """<=?""".r | """>=?""".r
+    def RangeOpr: Parser[String] = RangeKeywords
+
+    def UnaryMathOpr: Parser[String] = "+" | "-"
+    def BinaryMathOpr1: Parser[String] = "*" | "/" | "%"
+    def BinaryMathOpr2: Parser[String] = "+" | "-"
+
+    def FilterCompareOpr: Parser[String] = """!?==?""".r | """<=?""".r | 
""">=?""".r
+
+    def SqBracketPair: (Parser[String], Parser[String]) = ("[", "]")
+    def BracketPair: (Parser[String], Parser[String]) = ("(", ")")
+    def Dot: Parser[String] = "."
+    def AllSelection: Parser[String] = "*"
+    def SQuote: Parser[String] = "'"
+    def DQuote: Parser[String] = "\""
+    def Comma: Parser[String] = ","
+  }
+  import Operator._
+
+  object SomeString {
+    def AnyString: Parser[String] = """[^'\"{}\[\]()=<>.$@,;+\-*/\\]*""".r
+    def SimpleFieldString: Parser[String] = """\w+""".r
+    def FieldString: Parser[String] = """[\w\s]+""".r
+    def NameString: Parser[String] = """[a-zA-Z_]\w*""".r
+  }
+  import SomeString._
+
+  object SomeNumber {
+    def IntegerNumber: Parser[String] = """[+\-]?\d+""".r
+    def DoubleNumber: Parser[String] = """[+\-]?(\.\d+|\d+\.\d*)""".r
+    def IndexNumber: Parser[String] = IntegerNumber
+  }
+  import SomeNumber._
+
+  // -- literal --
+  def literal: Parser[LiteralExpr] = literialString | literialTime | 
literialNumber | literialBoolean
+  def literialString: Parser[LiteralStringExpr] = (SQuote ~> AnyString <~ 
SQuote | DQuote ~> AnyString <~ DQuote) ^^ { LiteralStringExpr(_) }
+  def literialNumber: Parser[LiteralNumberExpr] = (DoubleNumber | 
IntegerNumber) ^^ { LiteralNumberExpr(_) }
+  def literialTime: Parser[LiteralTimeExpr] = """(\d+(d|h|m|s|ms))+""".r ^^ { 
LiteralTimeExpr(_) }
+  def literialBoolean: Parser[LiteralBooleanExpr] = ("""(?i)true""".r | 
"""(?i)false""".r) ^^ { LiteralBooleanExpr(_) }
+
+  // -- selection --
+  // <selection> ::= <selection-head> [ <field-sel> | <function-operation> | 
<index-field-range-sel> | <filter-sel> ]+
+  def selection: Parser[SelectionExpr] = selectionHead ~ rep(selector) ^^ {
+    case head ~ selectors => SelectionExpr(head, selectors)
+  }
+  def selector: Parser[SelectExpr] = (fieldSelect | functionOperation | 
indexFieldRangeSelect | filterSelect)
+
+  def selectionHead: Parser[SelectionHead] = DataSourceKeywords ^^ { 
SelectionHead(_) }
+  // <field-sel> ::= "." <field-string>
+  def fieldSelect: Parser[IndexFieldRangeSelectExpr] = Dot ~> 
SimpleFieldString ^^ {
+    case field => IndexFieldRangeSelectExpr(FieldDesc(field) :: Nil)
+  }
+  // <function-operation> ::= "." <function-name> "(" <arg> [, <arg>]+ ")"
+  def functionOperation: Parser[FunctionOperationExpr] = Dot ~ NameString ~ 
BracketPair._1 ~ repsep(argument, Comma) ~ BracketPair._2 ^^ {
+    case _ ~ func ~ _ ~ args ~ _ => FunctionOperationExpr(func, args)
+  }
+  def argument: Parser[MathExpr] = mathExpr
+  // <index-field-range-sel> ::= "[" <index-field-range> [, 
<index-field-range>]+ "]"
+  def indexFieldRangeSelect: Parser[IndexFieldRangeSelectExpr] = 
SqBracketPair._1 ~> rep1sep(indexFieldRange, Comma) <~ SqBracketPair._2 ^^ {
+    case ifrs => IndexFieldRangeSelectExpr(ifrs)
+  }
+  // <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | 
"*"
+  def indexFieldRange: Parser[FieldDescOnly] = indexField | BracketPair._1 ~ 
indexField ~ Comma ~ indexField ~ BracketPair._2 ^^ {
+    case _ ~ if1 ~ _ ~ if2 ~ _ => FieldRangeDesc(if1, if2)
+  }
+  // <index-field> ::= <index> | <field-quote> | <all-selection>
+  // *here it can parse <math-expr>, but for simple situation, not supported 
now*
+  def indexField: Parser[FieldDescOnly] = IndexNumber ^^ { IndexDesc(_) } | 
fieldQuote | AllSelection ^^ { AllFieldsDesc(_) }
+  // <field-quote> ::= ' <field-string> ' | " <field-string> "
+  def fieldQuote: Parser[FieldDesc] = (SQuote ~> FieldString <~ SQuote | 
DQuote ~> FieldString <~ DQuote) ^^ { FieldDesc(_) }
+  // <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <math-expr> "]"
+  def filterSelect: Parser[FilterSelectExpr] = SqBracketPair._1 ~> fieldQuote 
~ FilterCompareOpr ~ mathExpr <~ SqBracketPair._2 ^^ {
+    case field ~ compare ~ value => FilterSelectExpr(field, compare, value)
+  }
+
+  // -- math --
+  // <math-factor> ::= <literal> | <selection> | "(" <math-expr> ")"
+  def mathFactor: Parser[MathExpr] = (literal | selection | BracketPair._1 ~> 
mathExpr <~ BracketPair._2) ^^ { MathFactorExpr(_) }
+  // <math-expr> ::= [<unary-opr>] <math-factor> [<binary-opr> <math-factor>]+
+  // <unary-opr> ::= "+" | "-"
+  def unaryMathExpr: Parser[MathExpr] = rep(UnaryMathOpr) ~ mathFactor ^^ {
+    case Nil ~ a => a
+    case list ~ a => UnaryMathExpr(list, a)
+  }
+  // <binary-opr> ::= "+" | "-" | "*" | "/" | "%"
+  def binaryMathExpr1: Parser[MathExpr] = unaryMathExpr ~ rep(BinaryMathOpr1 ~ 
unaryMathExpr) ^^ {
+    case a ~ Nil => a
+    case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2)))
+  }
+  def binaryMathExpr2: Parser[MathExpr] = binaryMathExpr1 ~ rep(BinaryMathOpr2 
~ binaryMathExpr1) ^^ {
+    case a ~ Nil => a
+    case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2)))
+  }
+  def mathExpr: Parser[MathExpr] = binaryMathExpr2
+
+  // -- logical expression --
+  // <range-expr> ::= "(" [<math-expr>] [, <math-expr>]+ ")"
+  def rangeExpr: Parser[RangeDesc] = BracketPair._1 ~> repsep(mathExpr, Comma) 
<~ BracketPair._2 ^^ { RangeDesc(_) }
+  // <logical-expression> ::= <math-expr> (<compare-opr> <math-expr> | 
<range-opr> <range-expr>)
+  def logicalExpr: Parser[LogicalExpr] = mathExpr ~ CompareOpr ~ mathExpr ^^ {
+    case left ~ opr ~ right => LogicalCompareExpr(left, opr, right)
+  } | mathExpr ~ RangeOpr ~ rangeExpr ^^ {
+    case left ~ opr ~ range => LogicalRangeExpr(left, opr, range)
+  }
+
+  // -- logical statement --
+  def logicalFactor: Parser[LogicalExpr] = logicalExpr | BracketPair._1 ~> 
logicalStatement <~ BracketPair._2
+  def notLogicalStatement: Parser[LogicalExpr] = rep(NotLogicalOpr) ~ 
logicalFactor ^^ {
+    case Nil ~ a => a
+    case list ~ a => UnaryLogicalExpr(list, a)
+  }
+  def andLogicalStatement: Parser[LogicalExpr] = notLogicalStatement ~ 
rep(AndLogicalOpr ~ notLogicalStatement) ^^ {
+    case a ~ Nil => a
+    case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2)))
+  }
+  def orLogicalStatement: Parser[LogicalExpr] = andLogicalStatement ~ 
rep(OrLogicalOpr ~ andLogicalStatement) ^^ {
+    case a ~ Nil => a
+    case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2)))
+  }
+  // <logical-statement> ::= [NOT] <logical-expression> [(AND | OR) 
<logical-expression>]+ | "(" <logical-statement> ")"
+  def logicalStatement: Parser[LogicalExpr] = orLogicalStatement
+
+  // -- rule --
+  // <rule> ::= <logical-statement> [WHEN <logical-statement>]
+  def rule: Parser[StatementExpr] = logicalStatement ~ opt(WhenKeywords ~> 
logicalStatement) ^^ {
+    case ls ~ Some(ws) => WhenClauseStatementExpr(ls, ws)
+    case ls ~ _ => SimpleStatementExpr(ls)
+  }
+
+  // for complie only
+//  case class NullStatementExpr(expression: String) extends StatementExpr {
+//    def genValue(values: Map[String, Any]): Option[Any] = None
+//    def getDataRelatedExprs(dataSign: String): Iterable[DataExpr] = Nil
+//  }
+//  def statementsExpr = mathExpr ^^ { NullStatementExpr(_) }
+
+
+//
+//  // basic
+//  val anyString: Parser[String] = """[^'{}\[\]()=<>.$@;+\-*/\\\"]*""".r
+//  val variable: Parser[String] = """[a-zA-Z_]\w*""".r
+//  val number: Parser[String] = """[+\-]?\d+""".r
+//  val time: Parser[String] = """\d+(y|M|w|d|h|m|s|ms)""".r
+//
+//  val numPosition: Parser[String] = """\d+""".r
+//  val anyPosition: Parser[String] = "*"
+//
+//  val filterOpr: Parser[String] = "=" | "!=" | ">" | "<" | ">=" | "<="
+//
+//  val opr1: Parser[String] = "*" | "/" | "%"
+//  val opr2: Parser[String] = "+" | "-"
+//
+//  val assignOpr: Parser[String] = "="
+//  val compareOpr: Parser[String] = "==" | "!=" | ">" | "<" | ">=" | "<="
+//  val mappingOpr: Parser[String] = "==="
+//
+//  val exprSep: Parser[String] = ";"
+//
+//  // simple
+//  def variableString: Parser[VariableExpr] = (("'" ~> anyString <~ "'") | 
("\"" ~> anyString <~ "\"")) ^^ { VariableStringExpr(_) }
+//  def constString: Parser[ConstExpr] = (("'" ~> anyString <~ "'") | ("\"" ~> 
anyString <~ "\"")) ^^ { ConstStringExpr(_) }
+//  def constValue: Parser[ConstExpr] = time ^^ { ConstTimeExpr(_) } | number 
^^ { ConstNumberExpr(_)} | constString
+//  def variableValue: Parser[VariableExpr] = variable ^^ { 
VariableStringExpr(_) }
+//  def quoteVariableValue: Parser[QuoteVariableExpr] = "${" ~> variable <~ 
"}" ^^ { QuoteVariableExpr(_) }
+//  def position: Parser[SelectExpr] = anyPosition ^^ { AnyPositionExpr(_) } | 
"""\d+""".r ^^ { NumPositionExpr(_) } | (("'" ~> anyString <~ "'") | ("\"" ~> 
anyString <~ "\"")) ^^ { StringPositionExpr(_) }
+//  def argument: Parser[ConstExpr] = constValue
+//  def annotationExpr: Parser[AnnotationExpr] = "@" ~> variable ^^ { 
AnnotationExpr(_) }
+//
+//  // selector
+//  def filterOpration: Parser[SelectExpr] = (variableString ~ filterOpr ~ 
constString) ^^ {
+//    case v ~ opr ~ c => FilterOprExpr(opr, v, c)
+//  }
+//  def positionExpr: Parser[SelectExpr] = "[" ~> (filterOpration | position) 
<~ "]"
+//  def functionExpr: Parser[SelectExpr] = "." ~ variable ~ "(" ~ 
repsep(argument, ",") ~ ")" ^^ {
+//    case _ ~ v ~ _ ~ args ~ _ => FunctionExpr(v, args)
+//  }
+//  def selectorExpr: Parser[SelectExpr] = positionExpr | functionExpr
+//
+//  // data
+//  def selectorsExpr: Parser[DataExpr] = quoteVariableValue ~ 
rep(selectorExpr) ^^ {
+//    case q ~ tails => SelectionExpr(q, tails)
+//  }
+//
+//  // calculation
+//  def factor: Parser[ElementExpr] = (constValue | selectorsExpr | "(" ~> 
expr <~ ")") ^^ { FactorExpr(_) }
+//  def term: Parser[ElementExpr] = factor ~ rep(opr1 ~ factor) ^^ {
+//    case a ~ Nil => a
+//    case a ~ list => CalculationExpr(a, list.map(c => (c._1, c._2)))
+//  }
+//  def expr: Parser[ElementExpr] = term ~ rep(opr2 ~ term) ^^ {
+//    case a ~ Nil => a
+//    case a ~ list => CalculationExpr(a, list.map(c => (c._1, c._2)))
+//  }
+//
+//  // statement
+//  def assignExpr: Parser[StatementExpr] = variableValue ~ assignOpr ~ expr 
^^ {
+//    case v ~ opr ~ c => AssignExpr(opr, v, c)
+//  }
+//  def conditionExpr: Parser[StatementExpr] = rep(annotationExpr) ~ expr ~ 
compareOpr ~ expr ^^ {
+//    case anos ~ le ~ opr ~ re => ConditionExpr(opr, le, re, anos)
+//  }
+//  def mappingExpr: Parser[StatementExpr] = rep(annotationExpr) ~ expr ~ 
mappingOpr ~ expr ^^ {
+//    case anos ~ le ~ opr ~ re => MappingExpr(opr, le, re, anos)
+//  }
+//  def statementExpr: Parser[StatementExpr] = assignExpr | conditionExpr | 
mappingExpr
+//
+//  // statements
+//  def statementsExpr: Parser[StatementExpr] = repsep(statementExpr, exprSep) 
^^ { StatementsExpr(_) }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParserDescription.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParserDescription.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParserDescription.scala
new file mode 100644
index 0000000..18810b3
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParserDescription.scala
@@ -0,0 +1,84 @@
+/**
+  * BNF representation for grammar as below:
+  *
+  * <rule> ::= <logical-statement> [WHEN <logical-statement>]
+  * rule: mapping-rule [WHEN when-rule]
+  * - mapping-rule: the first level opr should better not be OR | NOT, 
otherwise it can't automatically find the groupby column
+  * - when-rule: only contain the general info of data source, not the special 
info of each data row
+  *
+  * <logical-statement> ::= [NOT] <logical-expression> [(AND | OR) 
<logical-expression>]+ | "(" <logical-statement> ")"
+  * logical-statement: return boolean value
+  * logical-operator: "AND" | "&&", "OR" | "||", "NOT" | "!"
+  *
+  * <logical-expression> ::= <math-expr> (<compare-opr> <math-expr> | 
<range-opr> <range-expr>)
+  * logical-expression example: $source.id = $target.id, $source.page_id IN 
('3214', '4312', '60821')
+  *
+  * <compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">="
+  * <range-opr> ::= ["NOT"] "IN" | "BETWEEN"
+  * <range-expr> ::= "(" [<math-expr>] [, <math-expr>]+ ")"
+  * range-expr example: ('3214', '4312', '60821'), (10, 15), ()
+  *
+  * <math-expr> ::= [<unary-opr>] <math-factor> [<binary-opr> <math-factor>]+
+  * math-expr example: $source.price * $target.count, "hello" + " " + "world" 
+ 123
+  *
+  * <binary-opr> ::= "+" | "-" | "*" | "/" | "%"
+  * <unary-opr> ::= "+" | "-"
+  *
+  * <math-factor> ::= <literal> | <selection> | "(" <math-expr> ")"
+  *
+  * <selection> ::= <selection-head> [ <field-sel> | <function-operation> | 
<index-field-range-sel> | <filter-sel> ]+
+  * selection example: $source.price, $source.json(), $source['state'], 
$source.numList[3], $target.json().mails['org' = 'apache'].names[*]
+  *
+  * <selection-head> ::= $source | $target
+  *
+  * <field-sel> ::= "." <field-string>
+  *
+  * <function-operation> ::= "." <function-name> "(" <arg> [, <arg>]+ ")"
+  * <function-name> ::= <name-string>
+  * <arg> ::= <math-expr>
+  *
+  * <index-field-range-sel> ::= "[" <index-field-range> [, 
<index-field-range>]+ "]"
+  * <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | 
"*"
+  * index-field-range: 2 means the 3rd item, (0, 3) means first 4 items, * 
means all items, 'age' means item 'age'
+  * <index-field> ::= <index> | <field-quote> | <all-selection>
+  * index: 0 ~ n means position from start, -1 ~ -n means position from end
+  * <field-quote> ::= ' <field-string> ' | " <field-string> "
+  *
+  * <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <math-expr> "]"
+  * <filter-compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">="
+  * filter-sel example: ['name' = 'URL'], $source.man['age' > 
$source.graduate_age + 5 ]
+  *
+  * When <math-expr> in the selection, it mustn't contain the different 
<selection-head>, for example:
+  * $source.tags[1+2]             valid
+  * $source.tags[$source.first]   valid
+  * $source.tags[$target.first]   invalid
+  * -- Such job is for validation, not for parser
+  *
+  *
+  * <literal> ::= <literal-string> | <literal-number> | <literal-time> | 
<literal-boolean>
+  * <literal-string> ::= <any-string>
+  * <literal-number> ::= <integer> | <double>
+  * <literal-time> ::= <integer> ("d"|"h"|"m"|"s"|"ms")
+  * <literal-boolean> ::= true | false
+  *
+  */
+
+
+/**
+  * BACK_UP
+  *
+  * <index-field-range-sel> ::= "[" <index-field-range> [, 
<index-field-range>]+ "]"
+  * <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | 
"*"
+  * index-field-range: 2 means the 3rd item, (0, 3) means first 4 items, * 
means all items, 'age' means item 'age'
+  * <index-field> ::= <index> | <field-quote> | <math-result>
+  * <field-quote> ::= ' <any-string> ' | \" <any-string> \"
+  * <index> ::= <integer>
+  * index: 0 ~ n means position from start, -1 ~ -n means position from end
+  * <math-result> ::= "${" <math-expr> "}"
+  *
+  * <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <filter-value> "]"
+  * <filter-compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">="
+  * <filter-value> ::= <literal> | <math-result>
+  * filter-sel example: ['name' = 'URL'], $source.man['age' > ${ 
$source.graduate_age + 5 }]
+  *
+  */

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala
new file mode 100644
index 0000000..0d54707
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala
@@ -0,0 +1,7 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+
+trait AnalyzableExpr extends Serializable {
+  def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, MathExpr)] 
= Nil
+  def getWhenClauseExpr(): Option[LogicalExpr] = None
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala
new file mode 100644
index 0000000..e062376
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala
@@ -0,0 +1,15 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+trait Cacheable extends DataSourceable {
+  protected def cacheUnit: Boolean = false
+  def cacheable(ds: String): Boolean = {
+    cacheUnit && !conflict() && ((ds.isEmpty && dataSources.isEmpty) || 
(ds.nonEmpty && contains(ds)))
+  }
+  protected def getCacheExprs(ds: String): Iterable[Cacheable]
+
+  protected def persistUnit: Boolean = false
+  def persistable(ds: String): Boolean = {
+    persistUnit && ((ds.isEmpty && dataSources.isEmpty) || (ds.nonEmpty && 
contains(ds)))
+  }
+  protected def getPersistExprs(ds: String): Iterable[Cacheable]
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala
new file mode 100644
index 0000000..8018c19
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala
@@ -0,0 +1,7 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+trait Calculatable extends Serializable {
+
+  def calculate(values: Map[String, Any]): Option[Any]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala
new file mode 100644
index 0000000..f18798a
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala
@@ -0,0 +1,10 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+trait DataSourceable extends Serializable {
+  val dataSources: Set[String]
+  protected def conflict(): Boolean = dataSources.size > 1
+  def contains(ds: String): Boolean = dataSources.contains(ds)
+  def dataSourceOpt: Option[String] = {
+    if (dataSources.size == 1) Some(dataSources.head) else None
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala
new file mode 100644
index 0000000..38758a2
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala
@@ -0,0 +1,15 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+trait Describable extends Serializable {
+
+  val desc: String
+
+  protected def describe(v: Any): String = {
+    v match {
+      case s: Describable => s"${s.desc}"
+      case s: String => s"'${s}'"
+      case a => s"${a}"
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala
new file mode 100644
index 0000000..d7810aa
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala
@@ -0,0 +1,33 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+trait Expr extends Serializable with Describable with Cacheable with 
Calculatable {
+
+  protected val _defaultId: String = ExprIdCounter.emptyId
+
+  val _id = ExprIdCounter.genId(_defaultId)
+
+  protected def getSubCacheExprs(ds: String): Iterable[Expr] = Nil
+  final def getCacheExprs(ds: String): Iterable[Expr] = {
+    if (cacheable(ds)) getSubCacheExprs(ds).toList :+ this else 
getSubCacheExprs(ds)
+  }
+
+  protected def getSubFinalCacheExprs(ds: String): Iterable[Expr] = Nil
+  final def getFinalCacheExprs(ds: String): Iterable[Expr] = {
+    if (cacheable(ds)) Nil :+ this else getSubFinalCacheExprs(ds)
+  }
+
+  protected def getSubPersistExprs(ds: String): Iterable[Expr] = Nil
+  final def getPersistExprs(ds: String): Iterable[Expr] = {
+    if (persistable(ds)) getSubPersistExprs(ds).toList :+ this else 
getSubPersistExprs(ds)
+  }
+
+  final def calculate(values: Map[String, Any]): Option[Any] = {
+    values.get(_id) match {
+      case Some(v) => Some(v)
+      case _ => calculateOnly(values)
+    }
+  }
+  protected def calculateOnly(values: Map[String, Any]): Option[Any]
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala
new file mode 100644
index 0000000..0bb5085
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala
@@ -0,0 +1,22 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+trait ExprDescOnly extends Describable {
+
+}
+
+
+case class SelectionHead(expr: String) extends ExprDescOnly {
+  private val headRegex = """\$(\w+)""".r
+  val head: String = expr match {
+    case headRegex(v) => v.toLowerCase
+    case _ => expr
+  }
+  val desc: String = "$" + head
+}
+
+case class RangeDesc(elements: Iterable[MathExpr]) extends ExprDescOnly {
+  val desc: String = {
+    val rangeDesc = elements.map(_.desc).mkString(", ")
+    s"(${rangeDesc})"
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala
new file mode 100644
index 0000000..56e7daa
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala
@@ -0,0 +1,42 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable.{Set => MutableSet}
+
+object ExprIdCounter {
+
+  private val idCounter: AtomicLong = new AtomicLong(0L)
+
+  private val existIdSet: MutableSet[String] = MutableSet.empty[String]
+
+  private val invalidIdRegex = """^\d+$""".r
+
+  val emptyId: String = ""
+
+  def genId(defaultId: String): String = {
+    defaultId match {
+      case emptyId => increment.toString
+      case invalidIdRegex() => increment.toString
+//      case defId if (exist(defId)) => s"${increment}#${defId}"
+      case defId if (exist(defId)) => s"${defId}"
+      case _ => {
+        insertUserId(defaultId)
+        defaultId
+      }
+    }
+  }
+
+  private def exist(id: String): Boolean = {
+    existIdSet.contains(id)
+  }
+
+  private def insertUserId(id: String): Unit = {
+    existIdSet += id
+  }
+
+  private def increment(): Long = {
+    idCounter.incrementAndGet()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala
new file mode 100644
index 0000000..f13f15a
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala
@@ -0,0 +1,40 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+import scala.util.{Success, Try}
+
+trait FieldDescOnly extends Describable with DataSourceable {
+
+}
+
+case class IndexDesc(expr: String) extends FieldDescOnly {
+  val index: Int = {
+    Try(expr.toInt) match {
+      case Success(v) => v
+      case _ => throw new Exception(s"${expr} is invalid index")
+    }
+  }
+  val desc: String = describe(index)
+  val dataSources: Set[String] = Set.empty[String]
+}
+
+case class FieldDesc(expr: String) extends FieldDescOnly {
+  val field: String = expr
+  val desc: String = describe(field)
+  val dataSources: Set[String] = Set.empty[String]
+}
+
+case class AllFieldsDesc(expr: String) extends FieldDescOnly {
+  val allFields: String = expr
+  val desc: String = allFields
+  val dataSources: Set[String] = Set.empty[String]
+}
+
+case class FieldRangeDesc(startField: FieldDescOnly, endField: FieldDescOnly) 
extends FieldDescOnly {
+  val desc: String = {
+    (startField, endField) match {
+      case (f1: IndexDesc, f2: IndexDesc) => s"(${f1.desc}, ${f2.desc})"
+      case _ => throw new Exception("invalid field range description")
+    }
+  }
+  val dataSources: Set[String] = Set.empty[String]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala
new file mode 100644
index 0000000..020ddc2
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala
@@ -0,0 +1,68 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+import scala.util.{Failure, Success, Try}
+
+trait LiteralExpr extends Expr {
+  val value: Option[Any]
+  def calculateOnly(values: Map[String, Any]): Option[Any] = value
+  val dataSources: Set[String] = Set.empty[String]
+}
+
+case class LiteralStringExpr(expr: String) extends LiteralExpr {
+  val value: Option[String] = Some(expr)
+  val desc: String = value.getOrElse("")
+}
+
+case class LiteralNumberExpr(expr: String) extends LiteralExpr {
+  val value: Option[Any] = {
+    if (expr.contains(".")) {
+      Try (expr.toDouble) match {
+        case Success(v) => Some(v)
+        case _ => throw new Exception(s"${expr} is invalid number")
+      }
+    } else {
+      Try (expr.toLong) match {
+        case Success(v) => Some(v)
+        case _ => throw new Exception(s"${expr} is invalid number")
+      }
+    }
+  }
+  val desc: String = value.getOrElse("").toString
+}
+
+case class LiteralTimeExpr(expr: String) extends LiteralExpr {
+  final val TimeRegex = """(\d+)(d|h|m|s|ms)""".r
+  val value: Option[Long] = {
+    Try {
+      expr match {
+        case TimeRegex(time, unit) => {
+          val t = time.toLong
+          unit match {
+            case "d" => t * 24 * 60 * 60 * 1000
+            case "h" => t * 60 * 60 * 1000
+            case "m" => t * 60 * 1000
+            case "s" => t * 1000
+            case "ms" => t
+            case _ => throw new Exception(s"${expr} is invalid time format")
+          }
+        }
+        case _ => throw new Exception(s"${expr} is invalid time format")
+      }
+    } match {
+      case Success(v) => Some(v)
+      case Failure(ex) => throw ex
+    }
+  }
+  val desc: String = expr
+}
+
+case class LiteralBooleanExpr(expr: String) extends LiteralExpr {
+  final val TrueRegex = """(?i)true""".r
+  final val FalseRegex = """(?i)false""".r
+  val value: Option[Boolean] = expr match {
+    case TrueRegex() => Some(true)
+    case FalseRegex() => Some(false)
+    case _ => throw new Exception(s"${expr} is invalid boolean")
+  }
+  val desc: String = value.getOrElse("").toString
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala
new file mode 100644
index 0000000..74e80e3
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala
@@ -0,0 +1,149 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+import org.apache.griffin.measure.batch.utils.CalculationUtil._
+
+trait LogicalExpr extends Expr with AnalyzableExpr {
+  override def cacheUnit: Boolean = true
+}
+
+case class LogicalCompareExpr(left: MathExpr, compare: String, right: 
MathExpr) extends LogicalExpr {
+  private val (eqOpr, neqOpr, btOpr, bteOpr, ltOpr, lteOpr) = ("""==?""".r, 
"""!==?""".r, ">", ">=", "<", "<=")
+  def calculateOnly(values: Map[String, Any]): Option[Any] = {
+    val (lv, rv) = (left.calculate(values), right.calculate(values))
+    compare match {
+      case this.eqOpr() => lv === rv
+      case this.neqOpr() => lv =!= rv
+      case this.btOpr => lv > rv
+      case this.bteOpr => lv >= rv
+      case this.ltOpr => lv < rv
+      case this.lteOpr => lv <= rv
+      case _ => None
+    }
+  }
+  val desc: String = s"${left.desc} ${compare} ${right.desc}"
+  val dataSources: Set[String] = left.dataSources ++ right.dataSources
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    left.getCacheExprs(ds) ++ right.getCacheExprs(ds)
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    left.getFinalCacheExprs(ds) ++ right.getFinalCacheExprs(ds)
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    left.getPersistExprs(ds) ++ right.getPersistExprs(ds)
+  }
+
+  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, 
MathExpr)] = {
+    if (compare == "=" || compare == "==") {
+      (left.dataSourceOpt, right.dataSourceOpt) match {
+        case (Some(dsPair._1), Some(dsPair._2)) => (left, right) :: Nil
+        case (Some(dsPair._2), Some(dsPair._1)) => (right, left) :: Nil
+        case _ => Nil
+      }
+    } else Nil
+  }
+}
+
+case class LogicalRangeExpr(left: MathExpr, rangeOpr: String, range: 
RangeDesc) extends LogicalExpr {
+  private val (inOpr, ninOpr, btwnOpr, nbtwnOpr) = ("""(?i)in""".r, 
"""(?i)not\s+in""".r, """(?i)between""".r, """(?i)not\s+between""".r)
+  def calculateOnly(values: Map[String, Any]): Option[Any] = {
+    val (lv, rvs) = (left.calculate(values), 
range.elements.map(_.calculate(values)))
+    rangeOpr match {
+      case this.inOpr() => lv in rvs
+      case this.ninOpr() => lv not_in rvs
+      case this.btwnOpr() => lv between rvs
+      case this.nbtwnOpr() => lv not_between rvs
+      case _ => None
+    }
+  }
+  val desc: String = s"${left.desc} ${rangeOpr} ${range.desc}"
+  val dataSources: Set[String] = left.dataSources ++ 
range.elements.flatMap(_.dataSources).toSet
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    left.getCacheExprs(ds) ++ range.elements.flatMap(_.getCacheExprs(ds))
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    left.getFinalCacheExprs(ds) ++ 
range.elements.flatMap(_.getFinalCacheExprs(ds))
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    left.getPersistExprs(ds) ++ range.elements.flatMap(_.getPersistExprs(ds))
+  }
+}
+
+// -- logical statement --
+//case class LogicalFactorExpr(self: LogicalExpr) extends LogicalExpr {
+//  def calculate(values: Map[String, Any]): Option[Any] = 
self.calculate(values)
+//  val desc: String = self.desc
+//}
+
+case class UnaryLogicalExpr(oprList: Iterable[String], factor: LogicalExpr) 
extends LogicalExpr {
+  private val notOpr = """(?i)not|!""".r
+  def calculateOnly(values: Map[String, Any]): Option[Any] = {
+    val fv = factor.calculate(values)
+    oprList.foldRight(fv) { (opr, v) =>
+      opr match {
+        case this.notOpr() => !v
+        case _ => None
+      }
+    }
+  }
+  val desc: String = oprList.foldRight(factor.desc) { (prev, ex) => 
s"${prev}${ex}" }
+  val dataSources: Set[String] = factor.dataSources
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    factor.getCacheExprs(ds)
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    factor.getFinalCacheExprs(ds)
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    factor.getPersistExprs(ds)
+  }
+
+  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, 
MathExpr)] = {
+    val notOprList = oprList.filter { opr =>
+      opr match {
+        case this.notOpr() => true
+        case _ => false
+      }
+    }
+    if (notOprList.size % 2 == 0) factor.getGroupbyExprPairs(dsPair) else Nil
+  }
+}
+
+case class BinaryLogicalExpr(first: LogicalExpr, others: Iterable[(String, 
LogicalExpr)]) extends LogicalExpr {
+  private val (andOpr, orOpr) = ("""(?i)and|&&""".r, """(?i)or|\|\|""".r)
+  def calculateOnly(values: Map[String, Any]): Option[Any] = {
+    val fv = first.calculate(values)
+    others.foldLeft(fv) { (v, pair) =>
+      val (opr, next) = pair
+      val nv = next.calculate(values)
+      opr match {
+        case this.andOpr() => v && nv
+        case this.orOpr() => v || nv
+        case _ => None
+      }
+    }
+  }
+  val desc: String = others.foldLeft(first.desc) { (ex, next) => s"${ex} 
${next._1} ${next._2.desc}" }
+  val dataSources: Set[String] = first.dataSources ++ 
others.flatMap(_._2.dataSources).toSet
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    first.getCacheExprs(ds) ++ others.flatMap(_._2.getCacheExprs(ds))
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    first.getFinalCacheExprs(ds) ++ others.flatMap(_._2.getFinalCacheExprs(ds))
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    first.getPersistExprs(ds) ++ others.flatMap(_._2.getPersistExprs(ds))
+  }
+
+  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, 
MathExpr)] = {
+    if (others.isEmpty) first.getGroupbyExprPairs(dsPair)
+    else {
+      val isAnd = others.exists(_._1 match {
+        case this.andOpr() => true
+        case _ => false
+      })
+      if (isAnd) {
+        first.getGroupbyExprPairs(dsPair) ++ 
others.flatMap(_._2.getGroupbyExprPairs(dsPair))
+      } else Nil
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala
new file mode 100644
index 0000000..db09a0c
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala
@@ -0,0 +1,79 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+import org.apache.griffin.measure.batch.utils.CalculationUtil._
+
+trait MathExpr extends Expr {
+
+}
+
+case class MathFactorExpr(self: Expr) extends MathExpr {
+  def calculateOnly(values: Map[String, Any]): Option[Any] = 
self.calculate(values)
+  val desc: String = self.desc
+  val dataSources: Set[String] = self.dataSources
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    self.getCacheExprs(ds)
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    self.getFinalCacheExprs(ds)
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    self.getPersistExprs(ds)
+  }
+}
+
+case class UnaryMathExpr(oprList: Iterable[String], factor: Expr) extends 
MathExpr {
+  private val (posOpr, negOpr) = ("+", "-")
+  def calculateOnly(values: Map[String, Any]): Option[Any] = {
+    val fv = factor.calculate(values)
+    oprList.foldRight(fv) { (opr, v) =>
+      opr match {
+        case this.posOpr => v
+        case this.negOpr => -v
+        case _ => None
+      }
+    }
+  }
+  val desc: String = oprList.foldRight(factor.desc) { (prev, ex) => 
s"${prev}${ex}" }
+  val dataSources: Set[String] = factor.dataSources
+  override def cacheUnit: Boolean = true
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    factor.getCacheExprs(ds)
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    factor.getFinalCacheExprs(ds)
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    factor.getPersistExprs(ds)
+  }
+}
+
+case class BinaryMathExpr(first: MathExpr, others: Iterable[(String, 
MathExpr)]) extends MathExpr {
+  private val (addOpr, subOpr, mulOpr, divOpr, modOpr) = ("+", "-", "*", "/", 
"%")
+  def calculateOnly(values: Map[String, Any]): Option[Any] = {
+    val fv = first.calculate(values)
+    others.foldLeft(fv) { (v, pair) =>
+      val (opr, next) = pair
+      val nv = next.calculate(values)
+      opr match {
+        case this.addOpr => v + nv
+        case this.subOpr => v - nv
+        case this.mulOpr => v * nv
+        case this.divOpr => v / nv
+        case this.modOpr => v % nv
+        case _ => None
+      }
+    }
+  }
+  val desc: String = others.foldLeft(first.desc) { (ex, next) => s"${ex} 
${next._1} ${next._2.desc}" }
+  val dataSources: Set[String] = first.dataSources ++ 
others.flatMap(_._2.dataSources).toSet
+  override def cacheUnit: Boolean = true
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    first.getCacheExprs(ds) ++ others.flatMap(_._2.getCacheExprs(ds))
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    first.getFinalCacheExprs(ds) ++ others.flatMap(_._2.getFinalCacheExprs(ds))
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    first.getPersistExprs(ds) ++ others.flatMap(_._2.getPersistExprs(ds))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala
new file mode 100644
index 0000000..52ebe21
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala
@@ -0,0 +1,53 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+trait SelectExpr extends Expr {
+  def calculateOnly(values: Map[String, Any]): Option[Any] = None
+}
+
+case class IndexFieldRangeSelectExpr(fields: Iterable[FieldDescOnly]) extends 
SelectExpr {
+  val desc: String = s"[${fields.map(_.desc).mkString(",")}]"
+  val dataSources: Set[String] = Set.empty[String]
+}
+
+case class FunctionOperationExpr(func: String, args: Iterable[MathExpr]) 
extends SelectExpr {
+  val desc: String = s".${func}(${args.map(_.desc).mkString(",")})"
+  val dataSources: Set[String] = args.flatMap(_.dataSources).toSet
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = 
args.flatMap(_.getCacheExprs(ds))
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = 
args.flatMap(_.getFinalCacheExprs(ds))
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = 
args.flatMap(_.getPersistExprs(ds))
+}
+
+case class FilterSelectExpr(field: FieldDesc, compare: String, value: 
MathExpr) extends SelectExpr {
+  val desc: String = s"[${field.desc}${compare}${value.desc}]"
+  val dataSources: Set[String] = value.dataSources
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = 
value.getCacheExprs(ds)
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = 
value.getFinalCacheExprs(ds)
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = 
value.getPersistExprs(ds)
+}
+
+// -- selection --
+case class SelectionExpr(head: SelectionHead, selectors: Iterable[SelectExpr]) 
extends Expr {
+  def calculateOnly(values: Map[String, Any]): Option[Any] = values.get(_id)
+
+  val desc: String = {
+    val argsString = selectors.map(_.desc).mkString("")
+    s"${head.desc}${argsString}"
+  }
+  val dataSources: Set[String] = {
+    val selectorDataSources = selectors.flatMap(_.dataSources).toSet
+    selectorDataSources + head.head
+  }
+
+  override def cacheUnit: Boolean = true
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    selectors.flatMap(_.getCacheExprs(ds))
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    selectors.flatMap(_.getFinalCacheExprs(ds))
+  }
+
+  override def persistUnit: Boolean = true
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    selectors.flatMap(_.getPersistExprs(ds))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala
new file mode 100644
index 0000000..3ae5139
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala
@@ -0,0 +1,50 @@
+package org.apache.griffin.measure.batch.rule.expr
+
+
+trait StatementExpr extends Expr with AnalyzableExpr {
+  def valid(values: Map[String, Any]): Boolean = true
+  override def cacheUnit: Boolean = true
+}
+
+case class SimpleStatementExpr(expr: LogicalExpr) extends StatementExpr {
+  def calculateOnly(values: Map[String, Any]): Option[Any] = 
expr.calculate(values)
+  val desc: String = expr.desc
+  val dataSources: Set[String] = expr.dataSources
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    expr.getCacheExprs(ds)
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    expr.getFinalCacheExprs(ds)
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    expr.getPersistExprs(ds)
+  }
+
+  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, 
MathExpr)] = expr.getGroupbyExprPairs(dsPair)
+}
+
+case class WhenClauseStatementExpr(expr: LogicalExpr, whenExpr: LogicalExpr) 
extends StatementExpr {
+  def calculateOnly(values: Map[String, Any]): Option[Any] = 
expr.calculate(values)
+  val desc: String = s"${expr.desc} when ${whenExpr.desc}"
+
+  override def valid(values: Map[String, Any]): Boolean = {
+    whenExpr.calculate(values) match {
+      case Some(r: Boolean) => r
+      case _ => false
+    }
+  }
+
+  val dataSources: Set[String] = expr.dataSources ++ whenExpr.dataSources
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    expr.getCacheExprs(ds) ++ whenExpr.getCacheExprs(ds)
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    expr.getFinalCacheExprs(ds) ++ whenExpr.getFinalCacheExprs(ds)
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    expr.getPersistExprs(ds) ++ whenExpr.getPersistExprs(ds)
+  }
+
+  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, 
MathExpr)] = expr.getGroupbyExprPairs(dsPair)
+  override def getWhenClauseExpr(): Option[LogicalExpr] = Some(whenExpr)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/AnnotationExpr.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/AnnotationExpr.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/AnnotationExpr.scala
new file mode 100644
index 0000000..2152bf9
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/AnnotationExpr.scala
@@ -0,0 +1,16 @@
+package org.apache.griffin.measure.batch.rule.expr_old
+
+case class AnnotationExpr(expression: String) extends Expr with Calculatable {
+
+  val Key = """^(?i)Key$""".r
+
+  def isKey: Boolean = {
+    expression match {
+      case Key() => true
+      case _ => false
+    }
+  }
+
+  def genValue(values: Map[String, Any]): Option[String] = Some(expression)
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Calculatable.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Calculatable.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Calculatable.scala
new file mode 100644
index 0000000..9b8a577
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Calculatable.scala
@@ -0,0 +1,7 @@
+package org.apache.griffin.measure.batch.rule.expr_old
+
+trait Calculatable extends Serializable {
+
+  def genValue(values: Map[String, Any]): Option[Any]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ConstExpr.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ConstExpr.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ConstExpr.scala
new file mode 100644
index 0000000..c8e623b
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ConstExpr.scala
@@ -0,0 +1,59 @@
+package org.apache.griffin.measure.batch.rule.expr_old
+
+import scala.util.{Success, Try}
+
+trait ConstExpr extends Expr with Calculatable {
+
+  val value: Any
+
+}
+
+
+case class ConstStringExpr(expression: String) extends ConstExpr {
+
+  val value: String = expression
+  def genValue(values: Map[String, Any]): Option[String] = Some(value)
+
+}
+
+case class ConstTimeExpr(expression: String) extends ConstExpr {
+
+  val TimeRegex = """(\d+)(y|M|w|d|h|m|s|ms)""".r
+
+  val value: Long = expression match {
+    case TimeRegex(time, unit) => {
+      val t = time.toLong
+      val r = unit match {
+        case "y" => t * 365 * 30 * 24 * 60 * 60 * 1000
+        case "M" => t * 30 * 24 * 60 * 60 * 1000
+        case "w" => t * 7 * 24 * 60 * 60 * 1000
+        case "d" => t * 24 * 60 * 60 * 1000
+        case "h" => t * 60 * 60 * 1000
+        case "m" => t * 60 * 1000
+        case "s" => t * 1000
+        case "ms" => t
+        case _ => t
+      }
+      r
+    }
+    case _ => 0L
+  }
+
+  def genValue(values: Map[String, Any]): Option[Long] = Some(value)
+
+}
+
+case class ConstNumberExpr(expression: String) extends ConstExpr {
+
+  val value: Long = {
+    Try {
+      expression.toLong
+    } match {
+      case Success(v) => v
+      case _ => 0L
+    }
+  }
+
+  def genValue(values: Map[String, Any]): Option[Long] = Some(value)
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/DataExpr.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/DataExpr.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/DataExpr.scala
new file mode 100644
index 0000000..e0f24ef
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/DataExpr.scala
@@ -0,0 +1,27 @@
+package org.apache.griffin.measure.batch.rule.expr_old
+
+trait DataExpr extends Expr with Calculatable with Recordable {
+
+  def head: QuoteVariableExpr
+  def args: Iterable[SelectExpr]
+
+}
+
+
+case class SelectionExpr(head: QuoteVariableExpr, args: Iterable[SelectExpr]) 
extends {
+
+  val recordName = {
+    val argsString = args.map(_.recordName).mkString("")
+    s"${head.recordName}${argsString}"
+  }
+
+  override protected val _defaultId = recordName
+
+} with DataExpr {
+
+  val expression: String = ""
+
+  def genValue(values: Map[String, Any]): Option[Any] = values.get(_id)
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ElementExpr.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ElementExpr.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ElementExpr.scala
new file mode 100644
index 0000000..999c1ee
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ElementExpr.scala
@@ -0,0 +1,52 @@
+package org.apache.griffin.measure.batch.rule.expr_old
+
+import org.apache.griffin.measure.batch.utils.CalculationUtil._
+
+trait ElementExpr extends Expr with ExprAnalyzable with Calculatable {
+
+}
+
+
+// self: const | selection | calculation
+case class FactorExpr(self: Expr with Calculatable) extends ElementExpr {
+
+  val expression: String = self.expression
+
+  def genValue(values: Map[String, Any]): Option[Any] = self.genValue(values)
+
+  def getDataRelatedExprs(dataSign: String): Iterable[DataExpr] = {
+    self match {
+      case expr: DataExpr => if (expr.head.name == dataSign) expr :: Nil else 
Nil
+      case expr: CalculationExpr => expr.getDataRelatedExprs(dataSign)
+      case _ => Nil
+    }
+  }
+
+}
+
+case class CalculationExpr(first: ElementExpr, others: Iterable[(String, 
ElementExpr)]) extends ElementExpr {
+
+  val expression: String = others.foldLeft(first.expression) { (ex, next) => 
s"${ex} ${next._1} ${next._2}" }
+
+  def genValue(values: Map[String, Any]): Option[Any] = {
+    others.foldLeft(first.genValue(values)) { (v, next) =>
+      val (opr, ele) = next
+      val eleValue = ele.genValue(values)
+      opr match {
+        case "+" => v + eleValue
+        case "-" => v - eleValue
+        case "*" => v * eleValue
+        case "/" => v / eleValue
+        case "%" => v % eleValue
+        case _ => v
+      }
+    }
+  }
+
+  def getDataRelatedExprs(dataSign: String): Iterable[DataExpr] = {
+    others.foldLeft(first.getDataRelatedExprs(dataSign)) { (origin, next) =>
+      origin ++ next._2.getDataRelatedExprs(dataSign)
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Expr.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Expr.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Expr.scala
new file mode 100644
index 0000000..17adf16
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Expr.scala
@@ -0,0 +1,11 @@
+package org.apache.griffin.measure.batch.rule.expr_old
+
+trait Expr extends Serializable {
+
+  val expression: String
+
+  protected val _defaultId: String = ""
+
+  val _id = ExprIdCounter.genId(_defaultId)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ExprAnalyzable.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ExprAnalyzable.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ExprAnalyzable.scala
new file mode 100644
index 0000000..29c47cd
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ExprAnalyzable.scala
@@ -0,0 +1,8 @@
+package org.apache.griffin.measure.batch.rule.expr_old
+
+
+trait ExprAnalyzable extends Serializable {
+
+  def getDataRelatedExprs(dataSign: String): Iterable[DataExpr]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ExprIdCounter.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ExprIdCounter.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ExprIdCounter.scala
new file mode 100644
index 0000000..c24f617
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ExprIdCounter.scala
@@ -0,0 +1,39 @@
+package org.apache.griffin.measure.batch.rule.expr_old
+
+import java.util.concurrent.atomic.AtomicLong
+import scala.collection.mutable.{Set => MutableSet}
+
+object ExprIdCounter {
+
+  private val idCounter: AtomicLong = new AtomicLong(0L)
+
+  private val existIdSet: MutableSet[String] = MutableSet.empty[String]
+
+  private val invalidIdRegex = """^\d+$""".r
+
+  def genId(defaultId: String): String = {
+    defaultId match {
+      case "" => increment.toString
+      case invalidIdRegex() => increment.toString
+//      case defId if (exist(defId)) => s"${increment}#${defId}"
+      case defId if (exist(defId)) => s"${defId}"
+      case _ => {
+        insertUserId(defaultId)
+        defaultId
+      }
+    }
+  }
+
+  private def exist(id: String): Boolean = {
+    existIdSet.contains(id)
+  }
+
+  private def insertUserId(id: String): Unit = {
+    existIdSet += id
+  }
+
+  private def increment(): Long = {
+    idCounter.incrementAndGet()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Recordable.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Recordable.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Recordable.scala
new file mode 100644
index 0000000..27e5a48
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Recordable.scala
@@ -0,0 +1,15 @@
+package org.apache.griffin.measure.batch.rule.expr_old
+
+
+trait Recordable extends Serializable {
+
+  val recordName: String
+
+  protected def value2RecordString(v: Any): String = {
+    v match {
+      case s: String => s"'${s}'"
+      case a => s"${a}"
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/SelectExpr.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/SelectExpr.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/SelectExpr.scala
new file mode 100644
index 0000000..4dae38d
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/SelectExpr.scala
@@ -0,0 +1,48 @@
+package org.apache.griffin.measure.batch.rule.expr_old
+
+trait SelectExpr extends Expr with Recordable {
+
+}
+
+
+case class NumPositionExpr(expression: String) extends SelectExpr {
+
+  val index: Int = expression.toInt
+
+  val recordName = s"[${value2RecordString(index)}]"
+
+}
+
+case class StringPositionExpr(expression: String) extends SelectExpr {
+
+  val field: String = expression
+
+  val recordName = s"[${value2RecordString(field)}]"
+
+}
+
+case class AnyPositionExpr(expression: String) extends SelectExpr {
+
+  val recordName = s"[${expression}]"
+
+}
+
+case class FilterOprExpr(expression: String, left: VariableExpr, right: 
ConstExpr) extends SelectExpr {
+
+  val field: String = left.name
+  val value: Any = right.value
+
+  val recordName = {
+    s"[${value2RecordString(field)}${expression}${value2RecordString(value)}]"
+  }
+
+}
+
+case class FunctionExpr(expression: String, args: Iterable[ConstExpr]) extends 
SelectExpr {
+
+  val recordName = {
+    val argsStr = args.map(value2RecordString(_)).mkString(",")
+    s".${expression}(${argsStr})"
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/StatementAnalyzable.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/StatementAnalyzable.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/StatementAnalyzable.scala
new file mode 100644
index 0000000..57968e9
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/StatementAnalyzable.scala
@@ -0,0 +1,14 @@
+package org.apache.griffin.measure.batch.rule.expr_old
+
+
+trait StatementAnalyzable extends Serializable {
+
+  def getAssigns(): Iterable[AssignExpr] = Nil
+
+  def getConditions(): Iterable[ConditionExpr] = Nil
+
+  def getMappings(): Iterable[MappingExpr] = Nil
+
+  def getKeyMappings(): Iterable[MappingExpr] = Nil
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/StatementExpr.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/StatementExpr.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/StatementExpr.scala
new file mode 100644
index 0000000..6669f83
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/StatementExpr.scala
@@ -0,0 +1,85 @@
+package org.apache.griffin.measure.batch.rule.expr_old
+
+trait StatementExpr extends Expr with StatementAnalyzable with ExprAnalyzable 
with Calculatable {
+
+}
+
+case class AssignExpr(expression: String, left: VariableExpr, right: 
ElementExpr) extends StatementExpr {
+
+  def genValue(values: Map[String, Any]): Option[Any] = right.genValue(values)
+
+  def getDataRelatedExprs(dataSign: String): Iterable[DataExpr] = 
right.getDataRelatedExprs(dataSign)
+
+  override def getAssigns(): Iterable[AssignExpr] = this :: Nil
+
+}
+
+case class ConditionExpr(expression: String, left: ElementExpr, right: 
ElementExpr, annotations: Iterable[AnnotationExpr]) extends StatementExpr {
+
+  def genValue(values: Map[String, Any]): Option[Boolean] = {
+    Some(true) // fixme: not done, need calculation
+  }
+
+  def getDataRelatedExprs(dataSign: String): Iterable[DataExpr] = {
+    left.getDataRelatedExprs(dataSign) ++ right.getDataRelatedExprs(dataSign)
+  }
+
+  override def getConditions(): Iterable[ConditionExpr] = this :: Nil
+
+}
+
+case class MappingExpr(expression: String, left: ElementExpr, right: 
ElementExpr, annotations: Iterable[AnnotationExpr]) extends StatementExpr {
+
+  def isKey: Boolean = {
+    annotations.exists { e =>
+      e match {
+        case a: AnnotationExpr => a.isKey
+        case _ => false
+      }
+    }
+  }
+
+  def genValue(values: Map[String, Any]): Option[Boolean] = {
+    (left.genValue(values), right.genValue(values)) match {
+      case (Some(v1), Some(v2)) => Some(v1 == v2)
+      case (None, None) => Some(true)
+      case _ => Some(false)
+    }
+  }
+
+  def getDataRelatedExprs(dataSign: String): Iterable[DataExpr] = {
+    left.getDataRelatedExprs(dataSign) ++ right.getDataRelatedExprs(dataSign)
+  }
+
+  override def getMappings(): Iterable[MappingExpr] = this :: Nil
+  override def getKeyMappings(): Iterable[MappingExpr] = if (isKey) this :: 
Nil else Nil
+
+}
+
+case class StatementsExpr(statements: Iterable[StatementExpr]) extends 
StatementExpr {
+
+  val expression: String = statements.map(_.expression).mkString("\n")
+
+  def genValue(values: Map[String, Any]): Option[Any] = None
+
+  def getDataRelatedExprs(dataSign: String): Iterable[DataExpr] = {
+    statements.flatMap(_.getDataRelatedExprs(dataSign))
+  }
+
+  override def getAssigns(): Iterable[AssignExpr] = {
+    statements.flatMap(_.getAssigns())
+  }
+
+  override def getConditions(): Iterable[ConditionExpr] = {
+    statements.flatMap(_.getConditions())
+  }
+
+  override def getMappings(): Iterable[MappingExpr] = {
+    statements.flatMap(_.getMappings())
+  }
+
+  override def getKeyMappings(): Iterable[MappingExpr] = {
+    statements.flatMap(_.getKeyMappings())
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/VariableExpr.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/VariableExpr.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/VariableExpr.scala
new file mode 100644
index 0000000..87424fc
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/VariableExpr.scala
@@ -0,0 +1,26 @@
+package org.apache.griffin.measure.batch.rule.expr_old
+
+trait VariableExpr extends Expr with Recordable {
+
+  val name: String
+
+}
+
+
+case class VariableStringExpr(expression: String) extends VariableExpr {
+
+  val name = expression
+
+  val recordName = name
+
+}
+
+case class QuoteVariableExpr(expression: String) extends VariableExpr with 
Calculatable {
+
+  val name = expression
+
+  val recordName = name
+
+  def genValue(values: Map[String, Any]): Option[Any] = values.get(name)
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/CalculationUtil.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/CalculationUtil.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/CalculationUtil.scala
new file mode 100644
index 0000000..2d10fb4
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/CalculationUtil.scala
@@ -0,0 +1,259 @@
+package org.apache.griffin.measure.batch.utils
+
+import scala.util.{Success, Try}
+
+
+object CalculationUtil {
+
+  implicit def option2CalculationValue(v: Option[_]): CalculationValue = 
CalculationValue(v)
+
+  case class CalculationValue(value: Option[_]) extends Serializable {
+
+    def + (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (Some(v1: String), Some(v2)) => Some(v1 + v2.toString)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 + v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 + v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 + v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 + v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 + v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 + v2.toString.toDouble)
+          case (None, Some(v2)) => other
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => value
+      }
+    }
+
+    def - (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (Some(v1: Byte), Some(v2)) => Some(v1 - v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 - v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 - v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 - v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 - v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 - v2.toString.toDouble)
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => value
+      }
+    }
+
+    def * (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (Some(s1: String), Some(n2: Int)) => Some(s1 * n2)
+          case (Some(s1: String), Some(n2: Long)) => Some(s1 * n2.toInt)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 * v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 * v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 * v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 * v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 * v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 * v2.toString.toDouble)
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => value
+      }
+    }
+
+    def / (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (Some(v1: Byte), Some(v2)) => Some(v1 / v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 / v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 / v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 / v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 / v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 / v2.toString.toDouble)
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => value
+      }
+    }
+
+    def % (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (Some(v1: Byte), Some(v2)) => Some(v1 % v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 % v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 % v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 % v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 % v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 % v2.toString.toDouble)
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => value
+      }
+    }
+
+    def unary_- (): Option[_] = {
+      value match {
+        case Some(v: String) => Some(v.reverse.toString)
+        case Some(v: Boolean) => Some(!v)
+        case Some(v: Byte) => Some(-v)
+        case Some(v: Short) => Some(-v)
+        case Some(v: Int) => Some(-v)
+        case Some(v: Long) => Some(-v)
+        case Some(v: Float) => Some(-v)
+        case Some(v: Double) => Some(-v)
+        case Some(v) => Some(v)
+        case _ => None
+      }
+    }
+
+
+    def === (other: Option[_]): Option[Boolean] = {
+      (value, other) match {
+        case (Some(v1), Some(v2)) => Some(v1 == v2)
+        case _ => Some(false)
+      }
+    }
+
+    def =!= (other: Option[_]): Option[Boolean] = {
+      (value, other) match {
+        case (Some(v1), Some(v2)) => Some(v1 != v2)
+        case _ => Some(true)
+      }
+    }
+
+    def > (other: Option[_]): Option[Boolean] = {
+      Try {
+        (value, other) match {
+          case (Some(v1: String), Some(v2: String)) => Some(v1 > v2)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Short), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Int), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Long), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Float), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Double), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case _ => Some(false)
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def >= (other: Option[_]): Option[Boolean] = {
+      Try {
+        (value, other) match {
+          case (Some(v1: String), Some(v2: String)) => Some(v1 >= v2)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Short), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Int), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Long), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Float), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Double), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case _ => Some(false)
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def < (other: Option[_]): Option[Boolean] = {
+      Try {
+        (value, other) match {
+          case (Some(v1: String), Some(v2: String)) => Some(v1 < v2)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Short), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Int), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Long), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Float), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Double), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case _ => Some(false)
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def <= (other: Option[_]): Option[Boolean] = {
+      Try {
+        (value, other) match {
+          case (Some(v1: String), Some(v2: String)) => Some(v1 <= v2)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Short), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Int), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Long), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Float), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Double), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case _ => Some(false)
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+
+    def in (other: Iterable[Option[_]]): Option[Boolean] = {
+      Some(other.foldLeft(false) { (res, next) =>
+        res || ===(next).getOrElse(false)
+      })
+    }
+
+    def not_in (other: Iterable[Option[_]]): Option[Boolean] = {
+      Some(other.foldLeft(true) { (res, next) =>
+        res && =!=(next).getOrElse(false)
+      })
+    }
+
+    def between (other: Iterable[Option[_]]): Option[Boolean] = {
+      if (other.size < 2) None else {
+        val (begin, end) = (other.head, other.tail.head)
+        (>=(begin), <=(end)) match {
+          case (Some(b1), Some(b2)) => Some(b1 && b2)
+          case _ => None
+        }
+      }
+    }
+
+    def not_between (other: Iterable[Option[_]]): Option[Boolean] = {
+      if (other.size < 2) None else {
+        val (begin, end) = (other.head, other.tail.head)
+        (<(begin), >(end)) match {
+          case (Some(b1), Some(b2)) => Some(b1 || b2)
+          case _ => None
+        }
+      }
+    }
+
+    def unary_! (): Option[Boolean] = {
+      value match {
+        case Some(v: Boolean) => Some(!v)
+        case Some(v) => Some(false)
+        case _ => None
+      }
+    }
+
+    def && (other: Option[_]): Option[Boolean] = {
+      (value, other) match {
+        case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 && b2)
+        case _ => None
+      }
+    }
+
+    def || (other: Option[_]): Option[Boolean] = {
+      (value, other) match {
+        case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 || b2)
+        case _ => None
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala
new file mode 100644
index 0000000..b48478a
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala
@@ -0,0 +1,62 @@
+package org.apache.griffin.measure.batch.utils
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, 
FileSystem, Path}
+
+object HdfsUtil {
+
+  private val seprator = "/"
+
+  private val conf = new Configuration()
+  conf.set("dfs.support.append", "true")
+
+  private val dfs = FileSystem.get(conf)
+
+  def existPath(filePath: String): Boolean = {
+    val path = new Path(filePath)
+    dfs.exists(path)
+  }
+
+  def createFile(filePath: String): FSDataOutputStream = {
+    val path = new Path(filePath)
+    if (dfs.exists(path)) dfs.delete(path, true)
+    return dfs.create(path)
+  }
+
+  def appendOrCreateFile(filePath: String): FSDataOutputStream = {
+    val path = new Path(filePath)
+    if (dfs.exists(path)) dfs.append(path) else createFile(filePath)
+  }
+
+  def openFile(filePath: String): FSDataInputStream = {
+    val path = new Path(filePath)
+    dfs.open(path)
+  }
+
+  def writeContent(filePath: String, message: String): Unit = {
+    val out = createFile(filePath)
+    out.write(message.getBytes("utf-8"))
+    out.close
+  }
+
+  def appendContent(filePath: String, message: String): Unit = {
+    val out = appendOrCreateFile(filePath)
+    out.write(message.getBytes("utf-8"))
+    out.close
+  }
+
+  def createEmptyFile(filePath: String): Unit = {
+    val out = createFile(filePath)
+    out.close
+  }
+
+
+  def getHdfsFilePath(parentPath: String, fileName: String): String = {
+    if (parentPath.endsWith(seprator)) parentPath + fileName else parentPath + 
seprator + fileName
+  }
+
+  def deleteHdfsPath(dirPath: String): Unit = {
+    val path = new Path(dirPath)
+    if (dfs.exists(path)) dfs.delete(path, true)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala
new file mode 100644
index 0000000..747d0fa
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala
@@ -0,0 +1,30 @@
+package org.apache.griffin.measure.batch.utils
+
+import scalaj.http._
+
+object HttpUtil {
+
+  val GET_REGEX = """^(?i)get$""".r
+  val POST_REGEX = """^(?i)post$""".r
+  val PUT_REGEX = """^(?i)put$""".r
+  val DELETE_REGEX = """^(?i)delete$""".r
+
+  def postData(url: String, params: Map[String, Object], headers: Map[String, 
Object], data: String): String = {
+    val response = 
Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers)).postData(data).asString
+    response.code.toString
+  }
+
+  def httpRequest(url: String, method: String, params: Map[String, Object], 
headers: Map[String, Object], data: String): String = {
+    val httpReq = 
Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers))
+    method match {
+      case POST_REGEX() => httpReq.postData(data).asString.code.toString
+      case PUT_REGEX() => httpReq.put(data).asString.code.toString
+      case _ => "wrong method"
+    }
+  }
+
+  private def convertObjMap2StrMap(map: Map[String, Object]): Map[String, 
String] = {
+    map.map(pair => pair._1 -> pair._2.toString)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala
new file mode 100644
index 0000000..cdd470a
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala
@@ -0,0 +1,32 @@
+package org.apache.griffin.measure.batch.utils
+
+import java.io.InputStream
+
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+import scala.reflect._
+
+object JsonUtil {
+  val mapper = new ObjectMapper()
+  mapper.registerModule(DefaultScalaModule)
+  mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+
+  def toJson(value: Map[Symbol, Any]): String = {
+    toJson(value map { case (k,v) => k.name -> v})
+  }
+
+  def toJson(value: Any): String = {
+    mapper.writeValueAsString(value)
+  }
+
+  def toMap[V](json:String)(implicit m: Manifest[V]) = 
fromJson[Map[String,V]](json)
+
+  def fromJson[T: ClassTag](json: String)(implicit m : Manifest[T]): T = {
+    mapper.readValue[T](json, classTag[T].runtimeClass.asInstanceOf[Class[T]])
+  }
+
+  def fromJson[T: ClassTag](is: InputStream)(implicit m : Manifest[T]): T = {
+    mapper.readValue[T](is, classTag[T].runtimeClass.asInstanceOf[Class[T]])
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/StringParseUtil.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/StringParseUtil.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/StringParseUtil.scala
new file mode 100644
index 0000000..7f2b355
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/StringParseUtil.scala
@@ -0,0 +1,10 @@
+package org.apache.griffin.measure.batch.utils
+
+object StringParseUtil {
+
+  def sepStrings(str: String, sep: String): Iterable[String] = {
+    val strings = str.split(sep)
+    strings.map(_.trim)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/test/resources/config.json
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/test/resources/config.json 
b/griffin-measure/griffin-measure-batch/src/test/resources/config.json
new file mode 100644
index 0000000..65e0ed9
--- /dev/null
+++ b/griffin-measure/griffin-measure-batch/src/test/resources/config.json
@@ -0,0 +1,25 @@
+{
+  "name": "accu1",
+  "type": "accuracy",
+
+  "source": {
+    "type": "avro",
+    "version": "1.7",
+    "config": {
+      "file.name": "src/test/resources/users_info_src.avro"
+    }
+  },
+
+  "target": {
+    "type": "avro",
+    "version": "1.7",
+    "config": {
+      "file.name": "src/test/resources/users_info_target.avro"
+    }
+  },
+
+  "evaluateRule": {
+    "sampleRatio": 1,
+    "rules": "$source.user_id > 10020 AND $source.user_id + 5 = 
$target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + 
(10 + 2) AND $source.last_name = $target.last_name AND $source.address = 
$target.address AND $source.email = $target.email AND $source.phone = 
$target.phone AND $source.post_code = $target.post_code"
+  }
+}
\ No newline at end of file


Reply via email to